Source code for parsers.cnvnator

import re
import os
import collections

import numpy as np

import cnv_struct
import merge_cnvs
import parsers

# This file is part of CNVAnalysisToolkit.
# 
# CNVAnalysisToolkit is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# 
# CNVAnalysisToolkit is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
# 
# You should have received a copy of the GNU General Public License
# along with CNVAnalysisToolkit.  If not, see <http://www.gnu.org/licenses/>.

__author__ = "Marc-Andre Legault (StatGen)"
__copyright__ = "Copyright (C) 2013 StatGen"
__license__ = "GNU General Public License v3 (GPL-3)"

[docs]class Parser(parsers.ParentParser): """Creates the CNV dictionary for a given family for calls from the CNVnator algorithm. Abyzov A, Urban AE, Snyder M, Gerstein M (2011). CNVnator: an approach to discover, genotype, and characterize typical and atypical CNVs from family and population genome sequencing. `Genome Res`, **21** (6):974-84. """ def __init__(self, family_root): super(Parser, self).__init__(family_root) def get_cnvs(self, confidence_threshold = None): for sample in self.paths: self.cnvs[sample] = {} for i in xrange(1, 23): self.cnvs[sample][i] = [] # Build the path to the calls directory for this sample. sample_file = os.path.join(self.family_root, self.paths[sample], "calls") # One .events file by calls directory. sample_fn = [i for i in os.listdir(sample_file) if i.endswith(".cnv")][0] sample_file = os.path.join(sample_file, sample_fn) cols = ["type", "pos", "size", "norm_rd", "p1", "p2", "p3", "p4", "q0"] type_map = { "duplication": "gain", "deletion": "loss", } with open(sample_file) as f: for line in f: add_to_list = True line = line.rstrip("\r\n") line = line.split("\t") fields = dict(zip(cols, line)) confidence = float(fields["p1"]) if confidence != 0: confidence = -10.0 * np.log10(confidence) else: # Arbitraire pour eviter la division par zero. confidence = 1000 if confidence_threshold: if confidence < confidence_threshold: add_to_list = False chromo = re.search(r"chr([0-9]+):", fields["pos"]) if chromo is not None: chromo = chromo.group(1) chromo = int(chromo) else: add_to_list = False if add_to_list: cnv = cnv_struct.cnv( pos = fields["pos"], doc = float(fields["norm_rd"]), type = type_map[fields["type"]], algo = "cnvnator", source = sample, confidence = confidence, ) self.cnvs[sample][chromo].append(cnv) # Merge overlapping # self.cnvs = merge_cnvs.merge(self.cnvs, threshold = 0) return self.cnvs