Source code for cnv_struct

import re
import sys

# This file is part of CNVAnalysisToolkit.
# 
# CNVAnalysisToolkit is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# 
# CNVAnalysisToolkit is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
# 
# You should have received a copy of the GNU General Public License
# along with CNVAnalysisToolkit.  If not, see <http://www.gnu.org/licenses/>.

__author__ = "Marc-Andre Legault (StatGen)"
__copyright__ = "Copyright (C) 2013 StatGen"
__license__ = "GNU General Public License v3 (GPL-3)"

CNV_TYPES = ("gain", "loss")

[docs]class cnv(object): """Build a CNV object. :param chr: The chromosome (`e.g.` 1, 13, X, Y) :type chr: str :param start: The start of the CNV. :type start: int :param end: The end of the CNV. :type end: int :param size: The size of the CNV (end - start). :type size: int :param pos: The position of the CNV in genome coordinate format of the form ``chr#:start-end``. (`e.g.` chr3:13515-14539) :type pos: str :param type: The type of the CNV ("gain" or "loss") :type type: str :param cn: The copy number: <2 for deletions, >2 for gains. :type cnv: int :param confidence: A confidence value (specific to a given algorithm). :type float: :param algo: The genotyping algorithm. :type algo: str :param source: Either "father", "mother", "twin1" or "twin2". :type source: str :param doc: The depth of coverage ratio if available. :type doc: float Two initilization methods are available. One can either use the (chr, start, end) parameters to initilize the CNV object, or the more straight forward ``pos`` attribute. All the object are normalized to insure integrity. .. note:: The ``pos`` kwarg is never stored as an attribute, but it can be retrieved using the :py:func:`cnv_struct.cnv.get_pos` function. """ # pos is an accepted kwarg parameter, but it won't be stored as an attribute # value: it will be parsed into a start, end, chr. attributes = ["start", "end", "size", "chr", "type", "cn", "confidence", "algo", "source", "doc"] def __init__(self, **kwargs): # Will be set if the algorithm provides # depth of coverage DOC values. for attr, value in kwargs.iteritems(): if attr in cnv.attributes: setattr(self, attr, value) pos = kwargs.get('pos') self.normalize_attributes(pos)
[docs] def normalize_attributes(self, pos = None): """Normalizes the different fields for consistency regardless of the method of initialization. All the missing attributes will be set to ``None``, the ``type`` parameter will be inferred from the copy number, if available. The CNV ``type`` is checked against expected values. The ``chr``, ``start`` and ``end`` are parsed from the ``pos`` if such an attribute is given. given """ for attr in cnv.attributes: if not hasattr(self, attr): setattr(self, attr, None) # We don't have a copy number, but we know the type so we set the cn to # 1 for loss and 3 for gain. if (not self.cn) and self.type: if self.type == "gain": self.cn = 3 elif self.type == "loss": self.cn = 1 else: self.cn = 2 # Alternatively, if we have the copy number but not the type, we set the # type to loss if cn < 2 and to gain if cn > 2. if not self.type and self.cn: if self.cn < 2: self.type = "loss" elif self.cn > 2: self.type = "gain" # Make sure the type is somethig we expect. if self.type not in CNV_TYPES: sys.stderr.write( "Warning: Invalid CNV type. Valid types are: {}\n".format( CNV_TYPES.__repr__() ) ) self.type = None # Parse start, end, chr from a position string. if pos is not None: match = re.search(r"chr([0-9]{1,2}):([0-9]+)-([0-9]+)", pos) if not match: sys.stderr.write("Warning: Invalid position '{}'\n".format(pos)) else: (self.chr, self.start, self.end) = match.groups() self.chr = int(self.chr) self.start = int(self.start) self.end = int(self.end) self.size = self.end - self.start + 1 if not self.size and None not in (self.start, self.end): self.size = self.end - self.start # Normalize types for common fields self.chr = str(self.chr) try: self.chr = str(int(float(self.chr))) except ValueError: pass self.start = int(self.start) self.end = int(self.end)
[docs] def get_pos(self): """Gets the position string from the ``chr``, ``start`` and ``end`` attributes. :return: A position string of the form "chrXX:start-end". :rtype: str """ return "chr{chr}:{start}-{end}".format(chr = self.chr, start = self.start, end = self.end)
def __repr__(self): return "<class 'cnv_struc.cnv': {}/{} ({})>".format( self.get_pos(), self.type, self.source ) def __hash__(self): # Implemented for use in sets return hash((self.start, self.end, self.chr, self.type)) def __eq__(self, other): # We will define equality if start, end, chr and type are the same. return self.__hash__() == other.__hash__() def __dict__(self): return dict([(i, getattr(self, i)) for i in cnv.attributes]) def __getstate__(self): return tuple([getattr(self, i) for i in cnv.attributes]) def __setstate__(self, attr_states): for attr, val in zip(cnv.attributes, attr_states): setattr(self, attr, val) self.normalize_attributes()
class AmbiguousOverlap(Exception): def __init__(self, cnv1, cnv2): self.cnv1 = cnv1 self.cnv2 = cnv2 def __str__(self): return ("Two types of overlap " "are possible for {} and {}".format(self.cnv1.__repr__(), self.cnv2.__repr__()))
[docs]def overlap(cnv1, cnv2, threshold = None, global_ov = False): """Checks the overlap between two CNVs. :param cnv1: A :py:class:`cnv_struct.cnv` object that will be compared to cnv2 for overlaps. :type cnv1: :py:class:`cnv_struct.cnv` :param cnv2: The other :py:class:`cnv_struct.cnv` :type cnv2: :py:class:`cnv_struct.cnv` :param threshold: A bool that will influence the type of data returned. :type threshold: bool :returns: Either returns a tuple of overlap ratios for both CNVs or, if a theshold is provided, a bool that is True if the overlap is sufficiently high. """ s1 = cnv1.start s2 = cnv2.start e1 = cnv1.end e2 = cnv2.end if cnv1.type != cnv2.type: raise Exception("CNVs to compare are not of same type (gain-loss).") if cnv1.chr != cnv2.chr: return None overlap = None found_overlap_type = False # 6 scenarios are possible. # Regular overlaps if __seq(s1, s2, e1, e2): overlap = e1 - s2 + 1 if __seq(s2, s1, e2, e1): overlap = e2 - s1 + 1 # Full overlaps if __seq(s1, s2, e2, e1): overlap = e2 - s2 + 1 if __seq(s2, s1, e1, e2): overlap = e1 - s1 + 1 # No overlaps if __seq(s1, e1, s2, e2): if overlap is None: overlap = 0 if __seq(s2, e2, s1, e1): if overlap is None: overlap = 0 assert overlap is not None overlap1 = float(overlap) / cnv1.size overlap2 = float(overlap) / cnv2.size if threshold: # Return a bool. return (overlap1 >= threshold and overlap2 >= threshold) elif global_ov: # Return a single value: # 2 * overlap / (size1 + size2) return 2.0 * overlap / (cnv1.size + cnv2.size) else: # Return a tuple of overlap ratios. return (overlap1, overlap2)
[docs]def ro_to_best_in_list(cnv, li, return_cnv = False): """Takes a cnv and finds the reciprocal overlap with the best matching cnv in the provided list. :param cnv: A cnv (e.g. from a twin) :type cnv: :py:class:`cnv`. :param li: A list of :py:class:`cnv` to find the best reciprocal overlapping cnv. :type li: list. :returns: Returns a 2-uple of the respective overlaps for both cnvs. Since there is rarely more than 1000 CNV per chromosome per sample, it is not necessary to sort the CNV list. A simple brute-force approach is deemed acceptable. """ # The metric for a good overlap is the minimum # overlap value. We will try to maximize this. li = [i for i in li if (i.type == cnv.type and i.chr == cnv.chr)] overlapping_cnv = None best_ov = None max_overlap = 0 for other_cnv in li: ov = overlap(cnv, other_cnv) min_ov = min(ov) if min_ov > max_overlap: max_overlap = min_ov best_ov = ov overlapping_cnv = other_cnv if best_ov is None: best_ov = (0, 0) if return_cnv: return best_ov, overlapping_cnv return best_ov
def __seq(*args): """Returns True if the list is non-decreasing. """ for i in xrange(len(args) - 1): if args[i] > args[i + 1]: return False return True
[docs]def family_union(cnvs1, cnvs2, ro): """Takes two family dicts (sample -> chromosome -> cnv list) and returns a new dict. corresponding to their union. :param cnvs1: CNVs family dict (e.g. from a given family/algorithm). :type cnvs1: dict :param cnvs2: CNVs family dict (e.g. from another algorithm, same family). :type cnvs2: dict :param ro: Reciprocal overlap threshold for identity. :type ro: float :returns: A family dict with the union of the two CNV lists. :rtype: dict """ union = {} for sample in cnvs1: union[sample] = {} for chromo in cnvs1[sample]: union[sample][chromo] = [] # Compute the union li = cnvs1[sample][chromo] + cnvs2[sample][chromo] # Sort by start position for easier merging. li = sorted(li, key=lambda x: x.start) # Walk, merge ov > RO, and fill the union dict. start = 0 i = 0 j = 0 while i < len(li) - 1: while (j < len(li) and li[i].type == li[j].type and overlap(li[i], li[j], ro)): i = j j += 1 if i != j: region = merge_cnvs(li[start], li[j - 1]) union[sample][chromo].append(region) else: union[sample][chromo].append(li[start]) start = j i = start j += 1 if j == len(li): # Add last region. union[sample][chromo].append(li[start]) return union
[docs]def family_intersection(cnvs1, cnvs2, ro): """Takes two family dicts (sample -> chromosome -> cnv list) and returns a new dict. corresponding to their intersection. :param cnvs1: CNVs family dict (e.g. from a given family/algorithm). :type cnvs1: dict :param cnvs2: CNVs family dict (e.g. from another algorithm, same family). :type cnvs2: dict :param ro: Reciprocal overlap threshold for identity. :type ro: float :returns: A family dict with the intersection of the two CNV lists. :rtype: dict """ intersect = {} for sample in cnvs1: intersect[sample] = {} for chromo in cnvs1[sample]: intersect[sample][chromo] = [] for cnv in cnvs1[sample][chromo]: ov, cnv2 = ro_to_best_in_list(cnv, cnvs2[sample][chromo], True) if min(ov) >= ro: intersect[sample][chromo].append(merge_cnvs(cnv, cnv2)) # Note that there would probably be a smarter way of doing this, perhaps # by sorting both lists. return intersect
[docs]def merge_cnvs(cnv1, cnv2): """Merges two CNVs into a new object spanning the whole region. :param cnv1: The first cnv. :type cnv1: :py:class:`cnv_struct.cnv` :param cnv2: The second cnv. :type cnv2: :py:class:`cnv_struct.cnv` :returns: The merged CNV object. :rtype: :py:class:`cnv_struct.cnv` """ assert cnv1.chr == cnv2.chr and cnv1.type == cnv2.type params = { "chr": cnv1.chr, "start": min(cnv1.start, cnv2.start), "end": max(cnv1.end, cnv2.end), "type": cnv1.type, "algo": "{}_{}".format(cnv1.algo, cnv2.algo), "source": cnv1.source if cnv1.source == cnv2.source else "consensus", } return cnv(**params)
def compare_lists(li1, li2): no_overlap = True for cnv1 in li1: ov, cnv2 = ro_to_best_in_list(cnv1, li2, return_cnv = True) if min(ov) != 0: no_overlap = False print "Overlapping CNVs: " print "\t{}".format(cnv1) print "\t{}".format(cnv2) print if no_overlap: print "There is no overlap between given lists."