Source code for merge_cnvs

#!/usr/bin/env python2.7

import argparse

import numpy as np
import matplotlib.pyplot as plt

import parsers
import cnv_struct
import pickle
import validation

# This file is part of CNVAnalysisToolkit.
# 
# CNVAnalysisToolkit is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# 
# CNVAnalysisToolkit is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
# 
# You should have received a copy of the GNU General Public License
# along with CNVAnalysisToolkit.  If not, see <http://www.gnu.org/licenses/>.

__author__ = "Marc-Andre Legault (StatGen)"
__copyright__ = "Copyright (C) 2013 StatGen"
__license__ = "GNU General Public License v3 (GPL-3)"

cnv_params = {
                "pos": None,
                "type": None,
                "algo": None,
                "confidence": None,
                "source": "ref",
             }

[docs]def main(args): """Script to merge CNVs that are adjacent, if they are separated by a distance under a given threshold. Such a script is very useful to fix the problems arising from call fragmentation, a phenomenon which occurs when a genotyping algorithms doesn't test the region between the calls to extend the breakpoints of the CNVs. `e.g.` Given a distance threshold of 5kb, all the CNVs that are separated by less than 5kb will be merged together. This script will generate BED files for easy visualisation of the resulting calls using UCSC Genome Browser, and a pickle file for easy data analysis on the resulting set. """ cnv_params["algo"] = args.format p = validation.get_parser_if_valid(args) cnvs = p.get_cnvs() threshold = args.threshold merged_cnvs = merge(cnvs, threshold, args.size_over) for sample in cnvs: # Write bed file for UCSC viz write_bed(merged_cnvs, threshold, p.family_id) write_pickle(merged_cnvs, threshold, p.family_id)
[docs]def merge(cnvs, threshold = 5, size_over = None): """Merges the CNVs in the list if they are separated by less than a threshold value. :param cnvs: The list of CNVs to merge. :type cnvs: list :param threshold: The distance threshold in kilobase. All the CNVs separated by less than this value will be merged. :type threshold: float :param size_over: A minimum size for the resulting CNVs to be added to the list. This allows the easy size-based filtering of the CNVs. (optional) :type size_over: float :return: A list of merged CNVs. :rtype: list The ``plot_distance_distribution.py`` script is a good tool to determine if a given dataset needs to be merged to avoid fragmentation. """ merged_cnvs = {} for sample in cnvs: merged_cnvs[sample] = {} # Average the doc ratio for merged cnvs docs = [] confidences = [] for chromo in cnvs[sample]: merged_cnvs[sample][chromo] = [] i = 0 li = cnvs[sample][chromo] while i < len(li) - 1: j = i first_cnv = li[i] # Walk until end of block cnv = li[j] next_cnv = li[j + 1] dist = (next_cnv.start - cnv.end) / 1000.0 if first_cnv.doc: docs.append(first_cnv.doc) if first_cnv.confidence: confidences.append(first_cnv.confidence) while (dist <= threshold and cnv.type == next_cnv.type and cnv.chr == next_cnv.chr and j + 1 < len(li) - 1): if first_cnv.doc: docs.append(cnv.doc) if first_cnv.confidence: confidences.append(cnv.confidence) j += 1 cnv = li[j] next_cnv = li[j + 1] dist = (next_cnv.start - cnv.end) / 1000.0 # New CNV should start at first_cnv and end at cnv. cnv_params["type"] = first_cnv.type cnv_params["algo"] = first_cnv.algo cnv_params["source"] = sample cnv_params["pos"] = "chr{chromo}:{start}-{end}".format( chromo = chromo, start = first_cnv.start, end = cnv.end, ) if len(docs) > 1: cnv_params["doc"] = np.mean(np.array(docs)) if len(confidences) > 1: cnv_params["confidence"] = np.mean(np.array(confidences)) docs = [] confidences = [] add = True if size_over: if (cnv.end - first_cnv.start) / 1000.0 < size_over: add = False if add: merged_cnvs[sample][chromo].append(cnv_struct.cnv(**cnv_params)) i = j + 1 return merged_cnvs
[docs]def write_pickle(cnvs, threshold, fam): """Simple utility function to write the CNVs list as a pickle file. """ with open("merg_cnv_{}_{}kb.pickle".format(fam, threshold), "w") as f: pickle.dump(cnvs, f)
[docs]def write_bed(cnvs, threshold, fam): """ Write the bed file for the merged samples. """ for sample in cnvs: fn = "merg_{}kb_{}_{}.bed".format(threshold, fam, sample) f = open(fn, "w") header = ("track name=\"{fn}\" " "description=\"{fn}\" " "visibility=2 itemRgb=\"On\"\n").format(fn = fn) f.write(header) line_template = ("chr{chr}\t{start}\t{end}\t" "{info}\t0\t+\t{start}\t{end}\t" "{color}\n") for chromo in cnvs[sample]: for cnv in cnvs[sample][chromo]: if cnv.type == "loss": col = "247,56,68" else: col = "84,175,234" i = { "chr": cnv.chr, "start": cnv.start, "end": cnv.end, "color": col, "info": "type={}".format(cnv.type) } f.write(line_template.format(**i))
if __name__ == "__main__": PARSER = argparse.ArgumentParser(description=("Sums and merges CNVs " "from the twins.")) PARSER = validation.add_pickle_args(PARSER) PARSER.add_argument("--format", type=str, choices=parsers.__all__, help="The file format for the parser.") PARSER.add_argument("--base_dir", type=str, help=("The base directory for the calls. A " "directory with four subfolders starting " "with father, mother, twin1 " "and twin2 are expected.")) PARSER.add_argument("--threshold", "-t", type=float, default=5, help="Distance threshold for merging in kb.") PARSER.add_argument("--size_over", type=float, default=None, help="Size threshold for CNVs.") main(PARSER.parse_args())