从检测结果数据库统计基因频率工具

接上篇, 解决数据录入问题后, 就可以着手做查询计算的逻辑. 做法很简单, 就是从mongodb中调出非空的统计结果, 将统计结果分类计算.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
import functools
import os
import sys
import time

import pandas as pd
from pymongo import MongoClient


def calc_rs(rs_no):
    snp_info = {
        "snp": rs_no,
        "gg": {

        },
        "g": {

        },
        "total": 0,
        "hits": 0
    }

    client = MongoClient()
    db = client.test
    collection = db.snp_info
    snp_info["total"] = collection.count()
    cursor = collection.find({rs_no: {"$ne": None}}, {rs_no: 1})
    counts = cursor.count()
    client.close()
    if counts:
        snp_info["hits"] = counts
    else:
        sys.exit("{} not found.".format(rs_no))

    lst = list(map(lambda x: x[rs_no], cursor))
    for i in lst:
        gg = snp_info["gg"]
        if gg.get(i):
            gg[i] += 1
        else:
            gg[i] = 1

    g_str = "".join(lst)
    for i in set(g_str):
        snp_info["g"][i] = g_str.count(i)

    return snp_info


def calc_sp(sp):
    client = MongoClient()
    db = client.test
    collection = db.snp_info
    cursor = collection.find_one({"Sample_ID": sp}, {"_id": 0})
    client.close()
    if not cursor:
        sys.exit("{} not found.".format(sp))
    return {k: v for k, v in cursor.items() if v}


def usage():
    sys.exit("Usage: python {} <snp_no | sample_id>".format(os.path.basename(sys.argv[0])))


def query_item(s):
    st = s.lower()
    print("-"*30)
    if st.startswith("rs"):
        snp = calc_rs(st)
        print("RS No.\t{}".format(snp["snp"]))
        print("TOTAL\t{}".format(snp["total"]))
        hits = snp["hits"]
        print("HITS\t{}".format(hits))
        print("")
        print("基因型\t计数\t频率")
        gg = snp["gg"]
        for i in gg:
            print("{}\t{}\t{}%".format(i, gg[i], round(100 * gg[i] / hits, 2)))
        print("")
        print("基因\t计数\t频率")
        g = snp["g"]
        for i in g:
            print("{}\t{}\t{}%".format(i, g[i], round(100 * 0.5 * g[i] / hits, 2)))
        print("")
        if "D" in g:
            print("字母D表示单个碱基缺失")
            print("")
    elif st.startswith("bm"):
        sn = st.upper()
        sample = calc_sp(sn)
        df = pd.DataFrame().from_dict({"": sample}, orient="index")
        df = df.reindex_axis(sorted(df.columns), axis=1)
        df.to_csv("{}.csv".format(sn), index=None)
        print("信息已写入: {}.csv".format(sn))
    else:
        usage()


def time_it(func):
    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        t1 = time.time()
        rt = func(*args, **kwargs)
        print("计算用时: {}s".format(round(time.time() - t1, 2)))
        return rt

    return wrapper


@time_it
def main():
    list(map(query_item, set(sys.argv[1:])))


if __name__ == "__main__":
    if len(sys.argv) < 2:
        usage()
    main()

初步实现了输入一个或者多个rs号计算对应基因/型频率的功能, 另外亦可接受Sample_ID输入, 输出对应Sample_ID的检测信息

测试样例:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
python query.py rs12526453
------------------------------
RS No.  rs12526453
TOTAL   7272
HITS    161

基因型  计数    频率
CC      155     96.27%
CG      6       3.73%

基因    计数    频率
G       6       1.86%
C       316     98.14%

计算用时: 0.52s