接上篇, 解决数据录入问题后, 就可以着手做查询计算的逻辑. 做法很简单, 就是从mongodb中调出非空的统计结果, 将统计结果分类计算.

import functools
import os
import sys
import time

import pandas as pd
from pymongo import MongoClient


def calc_rs(rs_no):
    snp_info = {
        "snp": rs_no,
        "gg": {

        },
        "g": {

        },
        "total": 0,
        "hits": 0
    }

    client = MongoClient()
    db = client.test
    collection = db.snp_info
    snp_info["total"] = collection.count()
    cursor = collection.find({rs_no: {"$ne": None}}, {rs_no: 1})
    counts = cursor.count()
    client.close()
    if counts:
        snp_info["hits"] = counts
    else:
        sys.exit("{} not found.".format(rs_no))

    lst = list(map(lambda x: x[rs_no], cursor))
    for i in lst:
        gg = snp_info["gg"]
        if gg.get(i):
            gg[i] += 1
        else:
            gg[i] = 1

    g_str = "".join(lst)
    for i in set(g_str):
        snp_info["g"][i] = g_str.count(i)

    return snp_info


def calc_sp(sp):
    client = MongoClient()
    db = client.test
    collection = db.snp_info
    cursor = collection.find_one({"Sample_ID": sp}, {"_id": 0})
    client.close()
    if not cursor:
        sys.exit("{} not found.".format(sp))
    return {k: v for k, v in cursor.items() if v}


def usage():
    sys.exit("Usage: python {} <snp_no | sample_id>".format(os.path.basename(sys.argv[0])))


def query_item(s):
    st = s.lower()
    print("-"*30)
    if st.startswith("rs"):
        snp = calc_rs(st)
        print("RS No.\t{}".format(snp["snp"]))
        print("TOTAL\t{}".format(snp["total"]))
        hits = snp["hits"]
        print("HITS\t{}".format(hits))
        print("")
        print("基因型\t计数\t频率")
        gg = snp["gg"]
        for i in gg:
            print("{}\t{}\t{}%".format(i, gg[i], round(100 * gg[i] / hits, 2)))
        print("")
        print("基因\t计数\t频率")
        g = snp["g"]
        for i in g:
            print("{}\t{}\t{}%".format(i, g[i], round(100 * 0.5 * g[i] / hits, 2)))
        print("")
        if "D" in g:
            print("字母D表示单个碱基缺失")
            print("")
    elif st.startswith("bm"):
        sn = st.upper()
        sample = calc_sp(sn)
        df = pd.DataFrame().from_dict({"": sample}, orient="index")
        df = df.reindex_axis(sorted(df.columns), axis=1)
        df.to_csv("{}.csv".format(sn), index=None)
        print("信息已写入: {}.csv".format(sn))
    else:
        usage()


def time_it(func):
    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        t1 = time.time()
        rt = func(*args, **kwargs)
        print("计算用时: {}s".format(round(time.time() - t1, 2)))
        return rt

    return wrapper


@time_it
def main():
    list(map(query_item, set(sys.argv[1:])))


if __name__ == "__main__":
    if len(sys.argv) < 2:
        usage()
    main()

初步实现了输入一个或者多个rs号计算对应基因/型频率的功能, 另外亦可接受Sample_ID输入, 输出对应Sample_ID的检测信息

测试样例:

python query.py rs12526453
------------------------------
RS No.  rs12526453
TOTAL   7272
HITS    161

基因型  计数    频率
CC      155     96.27%
CG      6       3.73%

基因    计数    频率
G       6       1.86%
C       316     98.14%

计算用时: 0.52s