#!/usr/bin/env python2.7
import argparse
import numpy as np
import matplotlib.pyplot as plt
import parsers
import cnv_struct
import pickle
import validation
# This file is part of CNVAnalysisToolkit.
#
# CNVAnalysisToolkit is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# CNVAnalysisToolkit is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with CNVAnalysisToolkit. If not, see <http://www.gnu.org/licenses/>.
__author__ = "Marc-Andre Legault (StatGen)"
__copyright__ = "Copyright (C) 2013 StatGen"
__license__ = "GNU General Public License v3 (GPL-3)"
cnv_params = {
"pos": None,
"type": None,
"algo": None,
"confidence": None,
"source": "ref",
}
[docs]def main(args):
"""Script to merge CNVs that are adjacent, if they are separated by a
distance under a given threshold.
Such a script is very useful to fix the problems arising from call fragmentation,
a phenomenon which occurs when a genotyping algorithms doesn't test the region
between the calls to extend the breakpoints of the CNVs.
`e.g.` Given a distance threshold of 5kb, all the CNVs that are separated
by less than 5kb will be merged together.
This script will generate BED files for easy visualisation of the resulting
calls using UCSC Genome Browser, and a pickle file for easy data analysis
on the resulting set.
"""
cnv_params["algo"] = args.format
p = validation.get_parser_if_valid(args)
cnvs = p.get_cnvs()
threshold = args.threshold
merged_cnvs = merge(cnvs, threshold, args.size_over)
for sample in cnvs:
# Write bed file for UCSC viz
write_bed(merged_cnvs, threshold, p.family_id)
write_pickle(merged_cnvs, threshold, p.family_id)
[docs]def merge(cnvs, threshold = 5, size_over = None):
"""Merges the CNVs in the list if they are separated by less than a threshold
value.
:param cnvs: The list of CNVs to merge.
:type cnvs: list
:param threshold: The distance threshold in kilobase. All the CNVs separated
by less than this value will be merged.
:type threshold: float
:param size_over: A minimum size for the resulting CNVs to be added to the
list. This allows the easy size-based filtering of the
CNVs. (optional)
:type size_over: float
:return: A list of merged CNVs.
:rtype: list
The ``plot_distance_distribution.py`` script is a good tool to determine
if a given dataset needs to be merged to avoid fragmentation.
"""
merged_cnvs = {}
for sample in cnvs:
merged_cnvs[sample] = {}
# Average the doc ratio for merged cnvs
docs = []
confidences = []
for chromo in cnvs[sample]:
merged_cnvs[sample][chromo] = []
i = 0
li = cnvs[sample][chromo]
while i < len(li) - 1:
j = i
first_cnv = li[i]
# Walk until end of block
cnv = li[j]
next_cnv = li[j + 1]
dist = (next_cnv.start - cnv.end) / 1000.0
if first_cnv.doc:
docs.append(first_cnv.doc)
if first_cnv.confidence:
confidences.append(first_cnv.confidence)
while (dist <= threshold and
cnv.type == next_cnv.type and
cnv.chr == next_cnv.chr and
j + 1 < len(li) - 1):
if first_cnv.doc:
docs.append(cnv.doc)
if first_cnv.confidence:
confidences.append(cnv.confidence)
j += 1
cnv = li[j]
next_cnv = li[j + 1]
dist = (next_cnv.start - cnv.end) / 1000.0
# New CNV should start at first_cnv and end at cnv.
cnv_params["type"] = first_cnv.type
cnv_params["algo"] = first_cnv.algo
cnv_params["source"] = sample
cnv_params["pos"] = "chr{chromo}:{start}-{end}".format(
chromo = chromo,
start = first_cnv.start,
end = cnv.end,
)
if len(docs) > 1:
cnv_params["doc"] = np.mean(np.array(docs))
if len(confidences) > 1:
cnv_params["confidence"] = np.mean(np.array(confidences))
docs = []
confidences = []
add = True
if size_over:
if (cnv.end - first_cnv.start) / 1000.0 < size_over:
add = False
if add:
merged_cnvs[sample][chromo].append(cnv_struct.cnv(**cnv_params))
i = j + 1
return merged_cnvs
[docs]def write_pickle(cnvs, threshold, fam):
"""Simple utility function to write the CNVs list as a pickle file. """
with open("merg_cnv_{}_{}kb.pickle".format(fam, threshold), "w") as f:
pickle.dump(cnvs, f)
[docs]def write_bed(cnvs, threshold, fam):
""" Write the bed file for the merged samples. """
for sample in cnvs:
fn = "merg_{}kb_{}_{}.bed".format(threshold, fam, sample)
f = open(fn, "w")
header = ("track name=\"{fn}\" "
"description=\"{fn}\" "
"visibility=2 itemRgb=\"On\"\n").format(fn = fn)
f.write(header)
line_template = ("chr{chr}\t{start}\t{end}\t"
"{info}\t0\t+\t{start}\t{end}\t"
"{color}\n")
for chromo in cnvs[sample]:
for cnv in cnvs[sample][chromo]:
if cnv.type == "loss":
col = "247,56,68"
else:
col = "84,175,234"
i = {
"chr": cnv.chr,
"start": cnv.start,
"end": cnv.end,
"color": col,
"info": "type={}".format(cnv.type)
}
f.write(line_template.format(**i))
if __name__ == "__main__":
PARSER = argparse.ArgumentParser(description=("Sums and merges CNVs "
"from the twins."))
PARSER = validation.add_pickle_args(PARSER)
PARSER.add_argument("--format",
type=str,
choices=parsers.__all__,
help="The file format for the parser.")
PARSER.add_argument("--base_dir",
type=str,
help=("The base directory for the calls. A "
"directory with four subfolders starting "
"with father, mother, twin1 "
"and twin2 are expected."))
PARSER.add_argument("--threshold", "-t",
type=float,
default=5,
help="Distance threshold for merging in kb.")
PARSER.add_argument("--size_over",
type=float,
default=None,
help="Size threshold for CNVs.")
main(PARSER.parse_args())