Source code for parsers.breakdancer

# -*- coding: UTF-8 -*-

import re
import os
import collections

import cnv_struct
import merge_cnvs
import parsers

# This file is part of CNVAnalysisToolkit.
# 
# CNVAnalysisToolkit is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# 
# CNVAnalysisToolkit is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
# 
# You should have received a copy of the GNU General Public License
# along with CNVAnalysisToolkit.  If not, see <http://www.gnu.org/licenses/>.

__author__ = "Marc-Andre Legault (StatGen)"
__copyright__ = "Copyright (C) 2013 StatGen"
__license__ = "GNU General Public License v3 (GPL-3)"

[docs]class Parser(parsers.ParentParser): """Creates the CNV dictionary for a given family for calls from the BreakDancer algorithm. K. Chen, JW. Wallis, MD. McLellan, `et al.` (2009). BreakDancer: an algorithm for high-resolution mapping of genomic structural variation. `Nat. Methods`, **6**:677–81. """ def __init__(self, family_root): super(Parser, self).__init__(family_root) def get_cnvs(self, cnvs_only = True, confidence_threshold = -1): # Used to convert the break-dancer SVs to a version # standardized in cnv_struct. # Also, inter-chromosomal translocations are ignored. sv_types = { "INS": "gain", "DEL": "loss", "INV": "inversion", } for sample in self.paths: self.cnvs[sample] = {} for i in xrange(1, 23): self.cnvs[sample][i] = [] sample_file = os.path.join(self.family_root, self.paths[sample], "calls") sample_fn = [i for i in os.listdir(sample_file) if i.endswith("ctx")][0] sample_file = os.path.join(sample_file, sample_fn) cols = [] with open(sample_file) as f: for line in f: add_to_list = True if line.startswith("#Chr1"): # Line is the column header, # memorize the col ids. cols = line.rstrip("\r\n") cols = cols.lstrip("#") cols = cols.split("\t") if line.startswith("#"): # Line is a header, ignore it. continue line = line.rstrip("\r\n") line = line.split("\t") fields = dict(zip(cols, line)) # Chr1 # Pos1 # Orientation1 # Chr2 # Pos2 # Orientation2 # Type # Size # Score # num_Reads # num_Reads_lib # bam_name sv_type = sv_types.get(fields["Type"]) if not sv_type: add_to_list = False elif cnvs_only: if sv_type not in ("gain", "loss"): add_to_list = False if sv_type: assert fields["Chr1"] == fields["Chr2"] start = min(int(fields["Pos1"]), int(fields["Pos2"])) end = max(int(fields["Pos1"]), int(fields["Pos2"])) pos = "chr{}:{}-{}".format( fields["Chr1"], start, end ) confidence = float(fields["Score"]) if fields["Chr1"] not in ("X", "Y", "MT"): try: chromo = float(fields["Chr1"]) except ValueError: # Couldn't parse a valid chromosome. add_to_list = False else: add_to_list = False if confidence_threshold: if confidence < confidence_threshold: add_to_list = False if add_to_list: cnv = cnv_struct.cnv( pos = pos, type = sv_type, algo = "breakdancer", source = sample, confidence = float(fields["Score"]), ) self.cnvs[sample][chromo].append(cnv) # Save the last one. num_before = count_cnvs(self.cnvs) num_dups = count_duplicates(self.cnvs) self.cnvs = clean(self.cnvs) num_after = count_cnvs(self.cnvs) assert (num_before - num_after) == num_dups # Merge overlapping self.cnvs = merge_cnvs.merge(self.cnvs, threshold = 0) return self.cnvs
def count_duplicates(dic): i = 0 for sample in dic: for chromo in dic[sample]: pos_list = [e.start for e in dic[sample][chromo]] counter = collections.Counter(pos_list) for pos, count in dict(counter).iteritems(): if count > 1: i += count - 1 return i
[docs]def clean(cnvs): """Cleans cnvs by removing instances with the same start position by keeping the CNV with the largest confidence score. This method is automatically called when using the ``get_cnvs`` method. """ cleaned = {} for sample in cnvs: cleaned[sample] = {} for chromo in cnvs[sample]: cleaned[sample][chromo] = [] sorted_cnvs = sorted(cnvs[sample][chromo], key = lambda x: x.start) prev_cnv = sorted_cnvs[0] buffered_cnv = prev_cnv i = 1 while i < len(sorted_cnvs): cnv = sorted_cnvs[i] if cnv.start != prev_cnv.start: # Save buffer cleaned[sample][chromo].append(buffered_cnv) buffered_cnv = cnv else: # We have the same start position. # Keep the largest confidence. if cnv.confidence > buffered_cnv.confidence: buffered_cnv = cnv elif cnv.confidence == buffered_cnv.confidence: # If same confidence, take largest. if cnv.size > buffered_cnv.size: buffered_cnv = cnv prev_cnv = cnv i += 1 cleaned[sample][chromo].append(buffered_cnv) return cleaned
def count_cnvs(d): i = 0 for sample in d: for chromo in d[sample]: for cnv in d[sample][chromo]: i += 1 return i