import re
import sys
# This file is part of CNVAnalysisToolkit.
#
# CNVAnalysisToolkit is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# CNVAnalysisToolkit is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with CNVAnalysisToolkit. If not, see <http://www.gnu.org/licenses/>.
__author__ = "Marc-Andre Legault (StatGen)"
__copyright__ = "Copyright (C) 2013 StatGen"
__license__ = "GNU General Public License v3 (GPL-3)"
CNV_TYPES = ("gain", "loss")
[docs]class cnv(object):
"""Build a CNV object.
:param chr: The chromosome (`e.g.` 1, 13, X, Y)
:type chr: str
:param start: The start of the CNV.
:type start: int
:param end: The end of the CNV.
:type end: int
:param size: The size of the CNV (end - start).
:type size: int
:param pos: The position of the CNV in genome coordinate format of the form
``chr#:start-end``. (`e.g.` chr3:13515-14539)
:type pos: str
:param type: The type of the CNV ("gain" or "loss")
:type type: str
:param cn: The copy number: <2 for deletions, >2 for gains.
:type cnv: int
:param confidence: A confidence value (specific to a given algorithm).
:type float:
:param algo: The genotyping algorithm.
:type algo: str
:param source: Either "father", "mother", "twin1" or "twin2".
:type source: str
:param doc: The depth of coverage ratio if available.
:type doc: float
Two initilization methods are available. One can either use the (chr, start,
end) parameters to initilize the CNV object, or the more straight forward
``pos`` attribute. All the object are normalized to insure integrity.
.. note::
The ``pos`` kwarg is never stored as an attribute, but it can be
retrieved using the :py:func:`cnv_struct.cnv.get_pos` function.
"""
# pos is an accepted kwarg parameter, but it won't be stored as an attribute
# value: it will be parsed into a start, end, chr.
attributes = ["start", "end", "size", "chr", "type", "cn", "confidence",
"algo", "source", "doc"]
def __init__(self, **kwargs):
# Will be set if the algorithm provides
# depth of coverage DOC values.
for attr, value in kwargs.iteritems():
if attr in cnv.attributes:
setattr(self, attr, value)
pos = kwargs.get('pos')
self.normalize_attributes(pos)
[docs] def normalize_attributes(self, pos = None):
"""Normalizes the different fields for consistency regardless of the
method of initialization.
All the missing attributes will be set to ``None``, the ``type``
parameter will be inferred from the copy number, if available.
The CNV ``type`` is checked against expected values.
The ``chr``, ``start`` and ``end`` are parsed from the ``pos`` if such
an attribute is given.
given
"""
for attr in cnv.attributes:
if not hasattr(self, attr):
setattr(self, attr, None)
# We don't have a copy number, but we know the type so we set the cn to
# 1 for loss and 3 for gain.
if (not self.cn) and self.type:
if self.type == "gain":
self.cn = 3
elif self.type == "loss":
self.cn = 1
else:
self.cn = 2
# Alternatively, if we have the copy number but not the type, we set the
# type to loss if cn < 2 and to gain if cn > 2.
if not self.type and self.cn:
if self.cn < 2:
self.type = "loss"
elif self.cn > 2:
self.type = "gain"
# Make sure the type is somethig we expect.
if self.type not in CNV_TYPES:
sys.stderr.write(
"Warning: Invalid CNV type. Valid types are: {}\n".format(
CNV_TYPES.__repr__()
)
)
self.type = None
# Parse start, end, chr from a position string.
if pos is not None:
match = re.search(r"chr([0-9]{1,2}):([0-9]+)-([0-9]+)", pos)
if not match:
sys.stderr.write("Warning: Invalid position '{}'\n".format(pos))
else:
(self.chr, self.start, self.end) = match.groups()
self.chr = int(self.chr)
self.start = int(self.start)
self.end = int(self.end)
self.size = self.end - self.start + 1
if not self.size and None not in (self.start, self.end):
self.size = self.end - self.start
# Normalize types for common fields
self.chr = str(self.chr)
try:
self.chr = str(int(float(self.chr)))
except ValueError:
pass
self.start = int(self.start)
self.end = int(self.end)
[docs] def get_pos(self):
"""Gets the position string from the ``chr``, ``start`` and ``end``
attributes.
:return: A position string of the form "chrXX:start-end".
:rtype: str
"""
return "chr{chr}:{start}-{end}".format(chr = self.chr,
start = self.start, end = self.end)
def __repr__(self):
return "<class 'cnv_struc.cnv': {}/{} ({})>".format(
self.get_pos(),
self.type,
self.source
)
def __hash__(self):
# Implemented for use in sets
return hash((self.start, self.end, self.chr, self.type))
def __eq__(self, other):
# We will define equality if start, end, chr and type are the same.
return self.__hash__() == other.__hash__()
def __dict__(self):
return dict([(i, getattr(self, i)) for i in cnv.attributes])
def __getstate__(self):
return tuple([getattr(self, i) for i in cnv.attributes])
def __setstate__(self, attr_states):
for attr, val in zip(cnv.attributes, attr_states):
setattr(self, attr, val)
self.normalize_attributes()
class AmbiguousOverlap(Exception):
def __init__(self, cnv1, cnv2):
self.cnv1 = cnv1
self.cnv2 = cnv2
def __str__(self):
return ("Two types of overlap "
"are possible for {} and {}".format(self.cnv1.__repr__(),
self.cnv2.__repr__()))
[docs]def overlap(cnv1, cnv2, threshold = None, global_ov = False):
"""Checks the overlap between two CNVs.
:param cnv1: A :py:class:`cnv_struct.cnv` object that will be compared to
cnv2 for overlaps.
:type cnv1: :py:class:`cnv_struct.cnv`
:param cnv2: The other :py:class:`cnv_struct.cnv`
:type cnv2: :py:class:`cnv_struct.cnv`
:param threshold: A bool that will influence the type of data returned.
:type threshold: bool
:returns: Either returns a tuple of overlap ratios for both CNVs or, if a
theshold is provided, a bool that is True if the overlap is
sufficiently high.
"""
s1 = cnv1.start
s2 = cnv2.start
e1 = cnv1.end
e2 = cnv2.end
if cnv1.type != cnv2.type:
raise Exception("CNVs to compare are not of same type (gain-loss).")
if cnv1.chr != cnv2.chr:
return None
overlap = None
found_overlap_type = False
# 6 scenarios are possible.
# Regular overlaps
if __seq(s1, s2, e1, e2):
overlap = e1 - s2 + 1
if __seq(s2, s1, e2, e1):
overlap = e2 - s1 + 1
# Full overlaps
if __seq(s1, s2, e2, e1):
overlap = e2 - s2 + 1
if __seq(s2, s1, e1, e2):
overlap = e1 - s1 + 1
# No overlaps
if __seq(s1, e1, s2, e2):
if overlap is None:
overlap = 0
if __seq(s2, e2, s1, e1):
if overlap is None:
overlap = 0
assert overlap is not None
overlap1 = float(overlap) / cnv1.size
overlap2 = float(overlap) / cnv2.size
if threshold:
# Return a bool.
return (overlap1 >= threshold and overlap2 >= threshold)
elif global_ov:
# Return a single value:
# 2 * overlap / (size1 + size2)
return 2.0 * overlap / (cnv1.size + cnv2.size)
else:
# Return a tuple of overlap ratios.
return (overlap1, overlap2)
[docs]def ro_to_best_in_list(cnv, li, return_cnv = False):
"""Takes a cnv and finds the reciprocal overlap with the best matching cnv in the provided list.
:param cnv: A cnv (e.g. from a twin)
:type cnv: :py:class:`cnv`.
:param li: A list of :py:class:`cnv` to find the best reciprocal overlapping cnv.
:type li: list.
:returns: Returns a 2-uple of the respective overlaps for both cnvs.
Since there is rarely more than 1000 CNV per chromosome per sample, it is not
necessary to sort the CNV list. A simple brute-force approach is deemed acceptable.
"""
# The metric for a good overlap is the minimum
# overlap value. We will try to maximize this.
li = [i for i in li if (i.type == cnv.type and i.chr == cnv.chr)]
overlapping_cnv = None
best_ov = None
max_overlap = 0
for other_cnv in li:
ov = overlap(cnv, other_cnv)
min_ov = min(ov)
if min_ov > max_overlap:
max_overlap = min_ov
best_ov = ov
overlapping_cnv = other_cnv
if best_ov is None:
best_ov = (0, 0)
if return_cnv:
return best_ov, overlapping_cnv
return best_ov
def __seq(*args):
"""Returns True if the list is non-decreasing. """
for i in xrange(len(args) - 1):
if args[i] > args[i + 1]:
return False
return True
[docs]def family_union(cnvs1, cnvs2, ro):
"""Takes two family dicts (sample -> chromosome -> cnv list) and returns a new dict. corresponding to their union.
:param cnvs1: CNVs family dict (e.g. from a given family/algorithm).
:type cnvs1: dict
:param cnvs2: CNVs family dict (e.g. from another algorithm, same family).
:type cnvs2: dict
:param ro: Reciprocal overlap threshold for identity.
:type ro: float
:returns: A family dict with the union of the two CNV lists.
:rtype: dict
"""
union = {}
for sample in cnvs1:
union[sample] = {}
for chromo in cnvs1[sample]:
union[sample][chromo] = []
# Compute the union
li = cnvs1[sample][chromo] + cnvs2[sample][chromo]
# Sort by start position for easier merging.
li = sorted(li, key=lambda x: x.start)
# Walk, merge ov > RO, and fill the union dict.
start = 0
i = 0
j = 0
while i < len(li) - 1:
while (j < len(li) and
li[i].type == li[j].type and
overlap(li[i], li[j], ro)):
i = j
j += 1
if i != j:
region = merge_cnvs(li[start], li[j - 1])
union[sample][chromo].append(region)
else:
union[sample][chromo].append(li[start])
start = j
i = start
j += 1
if j == len(li):
# Add last region.
union[sample][chromo].append(li[start])
return union
[docs]def family_intersection(cnvs1, cnvs2, ro):
"""Takes two family dicts (sample -> chromosome -> cnv list) and returns a new dict. corresponding to their intersection.
:param cnvs1: CNVs family dict (e.g. from a given family/algorithm).
:type cnvs1: dict
:param cnvs2: CNVs family dict (e.g. from another algorithm, same family).
:type cnvs2: dict
:param ro: Reciprocal overlap threshold for identity.
:type ro: float
:returns: A family dict with the intersection of the two CNV lists.
:rtype: dict
"""
intersect = {}
for sample in cnvs1:
intersect[sample] = {}
for chromo in cnvs1[sample]:
intersect[sample][chromo] = []
for cnv in cnvs1[sample][chromo]:
ov, cnv2 = ro_to_best_in_list(cnv, cnvs2[sample][chromo], True)
if min(ov) >= ro:
intersect[sample][chromo].append(merge_cnvs(cnv, cnv2))
# Note that there would probably be a smarter way of doing this, perhaps
# by sorting both lists.
return intersect
[docs]def merge_cnvs(cnv1, cnv2):
"""Merges two CNVs into a new object spanning the whole region.
:param cnv1: The first cnv.
:type cnv1: :py:class:`cnv_struct.cnv`
:param cnv2: The second cnv.
:type cnv2: :py:class:`cnv_struct.cnv`
:returns: The merged CNV object.
:rtype: :py:class:`cnv_struct.cnv`
"""
assert cnv1.chr == cnv2.chr and cnv1.type == cnv2.type
params = {
"chr": cnv1.chr,
"start": min(cnv1.start, cnv2.start),
"end": max(cnv1.end, cnv2.end),
"type": cnv1.type,
"algo": "{}_{}".format(cnv1.algo, cnv2.algo),
"source": cnv1.source if cnv1.source == cnv2.source else "consensus",
}
return cnv(**params)
def compare_lists(li1, li2):
no_overlap = True
for cnv1 in li1:
ov, cnv2 = ro_to_best_in_list(cnv1, li2, return_cnv = True)
if min(ov) != 0:
no_overlap = False
print "Overlapping CNVs: "
print "\t{}".format(cnv1)
print "\t{}".format(cnv2)
print
if no_overlap:
print "There is no overlap between given lists."