Source code for samples_db
#!/usr/bin/env python2.7
import sys
import os
import os.path
import sqlite3
DB_FILE = os.path.join(os.path.split(os.path.abspath(__file__))[0], "samples.db")
# This file is part of CNVAnalysisToolkit.
# 
# CNVAnalysisToolkit is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# 
# CNVAnalysisToolkit is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
# 
# You should have received a copy of the GNU General Public License
# along with CNVAnalysisToolkit.  If not, see <http://www.gnu.org/licenses/>.
__author__ = "Marc-Andre Legault (StatGen)"
__copyright__ = "Copyright (C) 2013 StatGen"
__license__ = "GNU General Public License v3 (GPL-3)"
[docs]def create_db(trios_file, coverage_file, delete=False):
    """Creates a database containing trio information.
    :param trios_file: The path to a file containing trio information.
    :type trios_file: str
    :param coverage_file: The path to a file containing mean coverage for
                          samples.
    :type coverage_file: str
    :param delete: Flag that determines if the sqlite database file will be
                   overriden when running this function.
    :type delete: bool
    
    Samples files are available in the ``sample_db_files`` directory from this
    repository.
    """
    db_file = DB_FILE
    if delete and os.path.isfile(db_file):
        os.remove(db_file)
        
    elif os.path.isfile(db_file):
        raise Exception("Database file already exists. run with "
                        "delete=True to overwrie or delete the samples.db "
                        "file manually.")
    # Create the database
    conn = sqlite3.connect(db_file)
    c = conn.cursor()
    c.execute(("CREATE TABLE quatuors (id text, family text, "
               "status text, avg_coverage float)"))
    # Parse the coverage file into a dict.
    coverage = {}
    with open(coverage_file) as f:
        for line in f:
            line = line.rstrip("\r\n")
            line = line.split(",")
            # Sample -> coverage dict.
            coverage[line[0]] = line[1]
    # Insert the corresponding rows
    samples_info = []
    with open(trios_file) as f:
        header = f.readline()
        header = header.rstrip("\r\n")
        header = header.split("\t")
        for line in f:
            line = line.rstrip("\r\n")
            line = line.split("\t")
            sample_id = line[header.index("#ID")]
            family_id = line[header.index("Family")]
            status    = line[header.index("Link")]
            cov = coverage[sample_id]
            status = status.lower()
            if line[header.index("Project")] == "Twins":
                samples_info.append((sample_id, family_id, status, cov))
    c.executemany("INSERT INTO quatuors VALUES (?,?,?,?)", samples_info)
    # Save and exit
    conn.commit()
    conn.close()
 
[docs]def get_sample_ids_for_family(f):
    """Executes a query over the database to find the samples from a given family.
    :param f: The family (e.g. 1443)
    :type f: str
    :returns: A dictionary containing the familial status (mother, father,
              twin1 or twin2 as a key and the sample id as a value.
    """
    family = {}
    c = __get_cursor()
    response = c.execute("SELECT id, status FROM quatuors WHERE family=?", (f, ))
    for sample_id, status in response:
        family[status] = sample_id
    return family
 
[docs]def get_coverage_for_sample(s):
    """Executes a query over the datavase to find the coverage for a given sample.
    :param s: The sample (e.g. LP6005057-DNA_F03)
    :type s: str
    :returns: The mean coverage for this sample.
    :rtype: str (could be casted to a float)
    """
    c = __get_cursor()
    coverage = c.execute("SELECT avg_coverage FROM quatuors WHERE id=?", (s, )).fetchone()
    return coverage[0]
 
def __get_cursor():
    conn = sqlite3.connect(DB_FILE)
    c = conn.cursor()
    return c
    
if __name__ == "__main__":
    if len(sys.argv) != 3:
        print "Usage: {} trios coverage".format(__file__)
        print ("Where `trios` is the `complete_trios` file to create the database "
               "and `coverage` is the coverage csv file to get the average coverage "
               "values from.")
        print "For examples, see the `sample_db_files` directory."
        sys.exit()
    create_db(sys.argv[1], sys.argv[2])