Source code for parsers.breakdancer
# -*- coding: UTF-8 -*-
import re
import os
import collections
import cnv_struct
import merge_cnvs
import parsers
# This file is part of CNVAnalysisToolkit.
#
# CNVAnalysisToolkit is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# CNVAnalysisToolkit is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with CNVAnalysisToolkit. If not, see <http://www.gnu.org/licenses/>.
__author__ = "Marc-Andre Legault (StatGen)"
__copyright__ = "Copyright (C) 2013 StatGen"
__license__ = "GNU General Public License v3 (GPL-3)"
[docs]class Parser(parsers.ParentParser):
"""Creates the CNV dictionary for a given family for calls from the BreakDancer algorithm.
K. Chen, JW. Wallis, MD. McLellan, `et al.` (2009). BreakDancer: an algorithm for high-resolution mapping of genomic structural variation. `Nat. Methods`, **6**:677–81.
"""
def __init__(self, family_root):
super(Parser, self).__init__(family_root)
def get_cnvs(self, cnvs_only = True, confidence_threshold = -1):
# Used to convert the break-dancer SVs to a version
# standardized in cnv_struct.
# Also, inter-chromosomal translocations are ignored.
sv_types = {
"INS": "gain",
"DEL": "loss",
"INV": "inversion",
}
for sample in self.paths:
self.cnvs[sample] = {}
for i in xrange(1, 23):
self.cnvs[sample][i] = []
sample_file = os.path.join(self.family_root,
self.paths[sample],
"calls")
sample_fn = [i for i in os.listdir(sample_file) if
i.endswith("ctx")][0]
sample_file = os.path.join(sample_file, sample_fn)
cols = []
with open(sample_file) as f:
for line in f:
add_to_list = True
if line.startswith("#Chr1"):
# Line is the column header,
# memorize the col ids.
cols = line.rstrip("\r\n")
cols = cols.lstrip("#")
cols = cols.split("\t")
if line.startswith("#"):
# Line is a header, ignore it.
continue
line = line.rstrip("\r\n")
line = line.split("\t")
fields = dict(zip(cols, line))
# Chr1
# Pos1
# Orientation1
# Chr2
# Pos2
# Orientation2
# Type
# Size
# Score
# num_Reads
# num_Reads_lib
# bam_name
sv_type = sv_types.get(fields["Type"])
if not sv_type:
add_to_list = False
elif cnvs_only:
if sv_type not in ("gain", "loss"):
add_to_list = False
if sv_type:
assert fields["Chr1"] == fields["Chr2"]
start = min(int(fields["Pos1"]),
int(fields["Pos2"]))
end = max(int(fields["Pos1"]),
int(fields["Pos2"]))
pos = "chr{}:{}-{}".format(
fields["Chr1"],
start,
end
)
confidence = float(fields["Score"])
if fields["Chr1"] not in ("X", "Y", "MT"):
try:
chromo = float(fields["Chr1"])
except ValueError:
# Couldn't parse a valid chromosome.
add_to_list = False
else:
add_to_list = False
if confidence_threshold:
if confidence < confidence_threshold:
add_to_list = False
if add_to_list:
cnv = cnv_struct.cnv(
pos = pos,
type = sv_type,
algo = "breakdancer",
source = sample,
confidence =
float(fields["Score"]),
)
self.cnvs[sample][chromo].append(cnv)
# Save the last one.
num_before = count_cnvs(self.cnvs)
num_dups = count_duplicates(self.cnvs)
self.cnvs = clean(self.cnvs)
num_after = count_cnvs(self.cnvs)
assert (num_before - num_after) == num_dups
# Merge overlapping
self.cnvs = merge_cnvs.merge(self.cnvs, threshold = 0)
return self.cnvs
def count_duplicates(dic):
i = 0
for sample in dic:
for chromo in dic[sample]:
pos_list = [e.start for e in dic[sample][chromo]]
counter = collections.Counter(pos_list)
for pos, count in dict(counter).iteritems():
if count > 1:
i += count - 1
return i
[docs]def clean(cnvs):
"""Cleans cnvs by removing instances with the same start position by keeping
the CNV with the largest confidence score.
This method is automatically called when using the ``get_cnvs`` method.
"""
cleaned = {}
for sample in cnvs:
cleaned[sample] = {}
for chromo in cnvs[sample]:
cleaned[sample][chromo] = []
sorted_cnvs = sorted(cnvs[sample][chromo],
key = lambda x: x.start)
prev_cnv = sorted_cnvs[0]
buffered_cnv = prev_cnv
i = 1
while i < len(sorted_cnvs):
cnv = sorted_cnvs[i]
if cnv.start != prev_cnv.start:
# Save buffer
cleaned[sample][chromo].append(buffered_cnv)
buffered_cnv = cnv
else:
# We have the same start position.
# Keep the largest confidence.
if cnv.confidence > buffered_cnv.confidence:
buffered_cnv = cnv
elif cnv.confidence == buffered_cnv.confidence:
# If same confidence, take largest.
if cnv.size > buffered_cnv.size:
buffered_cnv = cnv
prev_cnv = cnv
i += 1
cleaned[sample][chromo].append(buffered_cnv)
return cleaned
def count_cnvs(d):
i = 0
for sample in d:
for chromo in d[sample]:
for cnv in d[sample][chromo]:
i += 1
return i