接上篇, 解决数据录入问题后, 就可以着手做查询计算的逻辑. 做法很简单, 就是从mongodb中调出非空的统计结果, 将统计结果分类计算.
import functools
import os
import sys
import time
import pandas as pd
from pymongo import MongoClient
def calc_rs(rs_no):
snp_info = {
"snp": rs_no,
"gg": {
},
"g": {
},
"total": 0,
"hits": 0
}
client = MongoClient()
db = client.test
collection = db.snp_info
snp_info["total"] = collection.count()
cursor = collection.find({rs_no: {"$ne": None}}, {rs_no: 1})
counts = cursor.count()
client.close()
if counts:
snp_info["hits"] = counts
else:
sys.exit("{} not found.".format(rs_no))
lst = list(map(lambda x: x[rs_no], cursor))
for i in lst:
gg = snp_info["gg"]
if gg.get(i):
gg[i] += 1
else:
gg[i] = 1
g_str = "".join(lst)
for i in set(g_str):
snp_info["g"][i] = g_str.count(i)
return snp_info
def calc_sp(sp):
client = MongoClient()
db = client.test
collection = db.snp_info
cursor = collection.find_one({"Sample_ID": sp}, {"_id": 0})
client.close()
if not cursor:
sys.exit("{} not found.".format(sp))
return {k: v for k, v in cursor.items() if v}
def usage():
sys.exit("Usage: python {} <snp_no | sample_id>".format(os.path.basename(sys.argv[0])))
def query_item(s):
st = s.lower()
print("-"*30)
if st.startswith("rs"):
snp = calc_rs(st)
print("RS No.\t{}".format(snp["snp"]))
print("TOTAL\t{}".format(snp["total"]))
hits = snp["hits"]
print("HITS\t{}".format(hits))
print("")
print("基因型\t计数\t频率")
gg = snp["gg"]
for i in gg:
print("{}\t{}\t{}%".format(i, gg[i], round(100 * gg[i] / hits, 2)))
print("")
print("基因\t计数\t频率")
g = snp["g"]
for i in g:
print("{}\t{}\t{}%".format(i, g[i], round(100 * 0.5 * g[i] / hits, 2)))
print("")
if "D" in g:
print("字母D表示单个碱基缺失")
print("")
elif st.startswith("bm"):
sn = st.upper()
sample = calc_sp(sn)
df = pd.DataFrame().from_dict({"": sample}, orient="index")
df = df.reindex_axis(sorted(df.columns), axis=1)
df.to_csv("{}.csv".format(sn), index=None)
print("信息已写入: {}.csv".format(sn))
else:
usage()
def time_it(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
t1 = time.time()
rt = func(*args, **kwargs)
print("计算用时: {}s".format(round(time.time() - t1, 2)))
return rt
return wrapper
@time_it
def main():
list(map(query_item, set(sys.argv[1:])))
if __name__ == "__main__":
if len(sys.argv) < 2:
usage()
main()初步实现了输入一个或者多个rs号计算对应基因/型频率的功能, 另外亦可接受Sample_ID输入, 输出对应Sample_ID的检测信息
测试样例:
python query.py rs12526453
------------------------------
RS No. rs12526453
TOTAL 7272
HITS 161
基因型 计数 频率
CC 155 96.27%
CG 6 3.73%
基因 计数 频率
G 6 1.86%
C 316 98.14%
计算用时: 0.52s