Source code for parsers.cnvnator
import re
import os
import collections
import numpy as np
import cnv_struct
import merge_cnvs
import parsers
# This file is part of CNVAnalysisToolkit.
#
# CNVAnalysisToolkit is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# CNVAnalysisToolkit is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with CNVAnalysisToolkit. If not, see <http://www.gnu.org/licenses/>.
__author__ = "Marc-Andre Legault (StatGen)"
__copyright__ = "Copyright (C) 2013 StatGen"
__license__ = "GNU General Public License v3 (GPL-3)"
[docs]class Parser(parsers.ParentParser):
"""Creates the CNV dictionary for a given family for calls from the CNVnator algorithm.
Abyzov A, Urban AE, Snyder M, Gerstein M (2011). CNVnator: an approach to discover, genotype, and characterize typical and atypical CNVs from family and population genome sequencing. `Genome Res`, **21** (6):974-84.
"""
def __init__(self, family_root):
super(Parser, self).__init__(family_root)
def get_cnvs(self, confidence_threshold = None):
for sample in self.paths:
self.cnvs[sample] = {}
for i in xrange(1, 23):
self.cnvs[sample][i] = []
# Build the path to the calls directory for this sample.
sample_file = os.path.join(self.family_root,
self.paths[sample],
"calls")
# One .events file by calls directory.
sample_fn = [i for i in os.listdir(sample_file) if
i.endswith(".cnv")][0]
sample_file = os.path.join(sample_file, sample_fn)
cols = ["type", "pos", "size", "norm_rd", "p1", "p2", "p3", "p4",
"q0"]
type_map = {
"duplication": "gain",
"deletion": "loss",
}
with open(sample_file) as f:
for line in f:
add_to_list = True
line = line.rstrip("\r\n")
line = line.split("\t")
fields = dict(zip(cols, line))
confidence = float(fields["p1"])
if confidence != 0:
confidence = -10.0 * np.log10(confidence)
else:
# Arbitraire pour eviter la division par zero.
confidence = 1000
if confidence_threshold:
if confidence < confidence_threshold:
add_to_list = False
chromo = re.search(r"chr([0-9]+):", fields["pos"])
if chromo is not None:
chromo = chromo.group(1)
chromo = int(chromo)
else:
add_to_list = False
if add_to_list:
cnv = cnv_struct.cnv(
pos = fields["pos"],
doc = float(fields["norm_rd"]),
type = type_map[fields["type"]],
algo = "cnvnator",
source = sample,
confidence = confidence,
)
self.cnvs[sample][chromo].append(cnv)
# Merge overlapping
# self.cnvs = merge_cnvs.merge(self.cnvs, threshold = 0)
return self.cnvs