pandas to mongodb
Jul 13, 2017最近项目需要, 把所有历史资料(Excel格式)汇总分析, 于是祭出pandas神器, 上万份Excel表格轻松搞定.
先放一段Excel转入MongoDB的代码
import os
import sys
import time
from functools import reduce, wraps
from shutil import copy2
import pandas as pd
import pymongo
from bson.json_util import loads
g_set = {'A', 'C', 'T', 'G'}
gg_set = {'AC', 'AG', 'AT', 'CG', 'CT', 'GT'}
summary = {
"ok": 0,
"skip": 0,
"error": 0,
"tmp": 0
}
db_name = "test"
collection_name = "snp_info"
index_name = "Sample_ID"
def bak_file(fp, ds="tmp", ext="bak", rename=True):
fn = os.path.basename(fp)
if not os.path.exists(ds):
os.mkdir(ds)
# 复制到临时目录
copy2(fp, os.path.join(ds, fn))
# 原路径加.bak后缀名
if rename:
os.rename(fp, "{}.{}".format(fp, ext))
def pick_rules(fp):
# 文件名中包含LH-R及LH-I的均为其他项目, 跳过不处理
fp_base = os.path.basename(fp)
if "LH-R" in fp_base or "LH-I" in fp_base:
bak_file(fp, "skip")
summary["skip"] += 1
print("{}\t0\t0\tskip".format(fp_base))
return False
elif fp_base.startswith("~"): # 跳过office临时文件
bak_file(fp, rename=False)
summary["tmp"] += 1
print("{}\t0\t0\ttmp".format(fp_base))
return False
else:
return True
def get_files(dp, ext="xlsx"):
"""
获取目标目录下指定后缀名文件列表
:param dp: 目标目录
:param ext: 后缀名, 默认为xlsx
:return: 指定后缀名文件路径列表
"""
# 绝对路径中有空格的情况会有bug, 所以暂时还是用相对路径
# abspath = os.path.abspath(dp)
lst = []
for root, dirs, files in os.walk(dp):
for file in files:
if file.endswith(".{}".format(ext)):
lst.append(os.path.join(root, file))
return lst
def xlsx2df(fp, sn="Sample_ID"):
"""
将xlsx内带有检测信息的结果转化为dataframe返回
可以处理以下四种情况
1. Sample_ID在首行, 紧接着是样本信息, 中间没有空行
2. Sample_ID在首行, 紧接着两行首行为空, 第四行开始才是正常样本信息
3. 1, 2两行首列为空, Sample_ID在第三行, 后面才是样本信息
4. 没有Sample_ID, 首行首列为空, 紧接着就是样本信息
:param fp: Excel文件路径
:param sn: header name默认是Sample_ID
:return: 整理后的dataframe
"""
df = pd.read_excel(fp, header=None)
# 添加header name
is_format_ok = False
for i in range(df.shape[0]):
_t = str(df.iloc[i, 0])
if _t == sn:
# Sample_ID在首行的情况(大多数情况)
df.columns = [i for i in df.iloc[i].values]
df = df.iloc[(i + 1):] # 跳过Sample_ID行
is_format_ok = True
break
if _t.startswith("B"):
# 没有Sample_ID的情况(极少)
# 以B开头即表示正常ID, 则取上一行内容为header name, 第一个应该是sn即Sample_ID
df.columns = [sn] + [i for i in df.iloc[i - 1].values][1:]
is_format_ok = True
break
if not is_format_ok:
# 文件无法识别, 备份源文件并返回空df
print("{}:\t0\t0\terror".format(os.path.basename(fp)))
bak_file(fp, "error")
summary["error"] += 1
return pd.DataFrame()
# 删除Sample_ID为空的行
df = df[pd.notnull(df[sn])]
# 删除空列
df = df.dropna(axis=1, how="all")
# 删除重复rs号
df = df.loc[:, ~df.columns.duplicated()]
rows, cols = df.shape
for i in range(rows):
for j in range(1, cols): # 跳过第一列Sample_ID不作处理
v = df.iloc[i, j]
if isinstance(v, str):
v = v.strip().upper() # 去空格&转大写
if v in g_set:
# 纯合子加倍
df.iloc[i, j] = v * 2
elif len(v) == 2:
# 杂合子排序
_v = "".join(sorted(v))
df.iloc[i, j] = _v if _v in gg_set else None
elif v == "DEL":
# 单个碱基缺失, 用DD表示
df.iloc[i, j] = "DD"
elif "." in v:
# 杂合缺失, 将DEL替换为D
_s = sorted(v.split("."), key=lambda x: len(x))[0] # 提取出单个碱基
df.iloc[i, j] = "".join(sorted((_s, "D"))) if _s in g_set else None
else:
# 空值或者超长字符, 填Nan
df.iloc[i, j] = None
else:
# 非string值填Nan
df.iloc[i, j] = None
summary["ok"] += 1
print("{}\t{}\t{}\tok".format(os.path.basename(fp), rows, cols))
return df
def insert_many(collection, docs=None, update=True):
if not docs:
return
# $set 的时候, 会更新数据, setOnInsert只插入不更新
update_key = "$set" if update else "$setOnInsert"
bulk = pymongo.bulk.BulkOperationBuilder(collection, ordered=False)
for i in docs:
if i.get(index_name):
bulk.find({index_name: i[index_name]}).upsert().update_one({update_key: i})
else:
bulk.insert(i)
result = bulk.execute()
return result
def df2db(df):
# 写入数据库
client = pymongo.MongoClient()
db = client[db_name]
collection = db[collection_name]
try:
collection.index_information()
except pymongo.errors.OperationFailure:
# 索引Sample_ID
collection.create_index(index_name, unique=True)
data = loads(df.T.to_json()).values()
rs = insert_many(collection, data)
# collection.insert_many(loads(df.T.to_json()).values())
print("-" * 30)
print("summary(files):")
print("\tok:\t\t{}".format(summary['ok']))
print("\terror:\t\t{}".format(summary['error']))
print("\ttmp:\t\t{}".format(summary['tmp']))
print("\tskip:\t\t{}".format(summary['skip']))
print("\ttotal:\t\t{}".format(sum(summary.values())))
print("-" * 30)
print("summary(samples):")
print("\tupdate:\t\t{}".format(rs.get("nModified")))
print("\tinsert:\t\t{}".format(rs.get("nUpserted") + rs.get("nInserted")))
print("\ttotal:\t\t{}".format(collection.count()))
client.close()
def time_it(func):
@wraps(func)
def wrapper(*args, **kwargs):
t1 = time.time()
rt = func(*args, **kwargs)
print("cost: {}s".format(round((time.time() - t1), 2)))
return rt
return wrapper
@time_it
def main(dp):
# 读入Excel, 处理后合并输出到df
df = reduce(
lambda df1, df2: df1.append(df2, ignore_index=True),
map(
xlsx2df, filter(
pick_rules,
get_files(dp)
)
)
)
# 删除重复ID
df = df.drop_duplicates(subset="Sample_ID")
# 删除空列
df = df.dropna(axis=1, how="all")
df2db(df)
print("-" * 30)
print("data importing complete!")
if __name__ == "__main__":
if len(sys.argv) != 2:
sys.exit("Usage: {} <samples_dir>".format(os.path.basename(sys.argv[0])))
main(sys.argv[1])
运行结果:
------------------------------
summary(files):
ok: 7
error: 0
tmp: 0
skip: 0
total: 7
------------------------------
summary(samples):
update: 0
insert: 0
total: 7272
------------------------------
data importing complete!
cost: 1.26s