最近项目需要, 把所有历史资料(Excel格式)汇总分析, 于是祭出pandas神器, 上万份Excel表格轻松搞定.

先放一段Excel转入MongoDB的代码

import os
import sys
import time
from functools import reduce, wraps
from shutil import copy2

import pandas as pd
import pymongo
from bson.json_util import loads

g_set = {'A', 'C', 'T', 'G'}
gg_set = {'AC', 'AG', 'AT', 'CG', 'CT', 'GT'}
summary = {
    "ok": 0,
    "skip": 0,
    "error": 0,
    "tmp": 0
}
db_name = "test"
collection_name = "snp_info"
index_name = "Sample_ID"


def bak_file(fp, ds="tmp", ext="bak", rename=True):
    fn = os.path.basename(fp)
    if not os.path.exists(ds):
        os.mkdir(ds)
    # 复制到临时目录
    copy2(fp, os.path.join(ds, fn))
    # 原路径加.bak后缀名
    if rename:
        os.rename(fp, "{}.{}".format(fp, ext))


def pick_rules(fp):
    # 文件名中包含LH-R及LH-I的均为其他项目, 跳过不处理
    fp_base = os.path.basename(fp)
    if "LH-R" in fp_base or "LH-I" in fp_base:
        bak_file(fp, "skip")
        summary["skip"] += 1
        print("{}\t0\t0\tskip".format(fp_base))
        return False
    elif fp_base.startswith("~"):  # 跳过office临时文件
        bak_file(fp, rename=False)
        summary["tmp"] += 1
        print("{}\t0\t0\ttmp".format(fp_base))
        return False
    else:
        return True


def get_files(dp, ext="xlsx"):
    """
    获取目标目录下指定后缀名文件列表
    :param dp: 目标目录
    :param ext: 后缀名, 默认为xlsx
    :return: 指定后缀名文件路径列表
    """
    # 绝对路径中有空格的情况会有bug, 所以暂时还是用相对路径
    # abspath = os.path.abspath(dp)
    lst = []
    for root, dirs, files in os.walk(dp):
        for file in files:
            if file.endswith(".{}".format(ext)):
                lst.append(os.path.join(root, file))
    return lst


def xlsx2df(fp, sn="Sample_ID"):
    """
    将xlsx内带有检测信息的结果转化为dataframe返回
    可以处理以下四种情况
    1. Sample_ID在首行, 紧接着是样本信息, 中间没有空行
    2. Sample_ID在首行, 紧接着两行首行为空, 第四行开始才是正常样本信息
    3. 1, 2两行首列为空, Sample_ID在第三行, 后面才是样本信息
    4. 没有Sample_ID, 首行首列为空, 紧接着就是样本信息
    :param fp: Excel文件路径
    :param sn: header name默认是Sample_ID
    :return: 整理后的dataframe
    """
    df = pd.read_excel(fp, header=None)
    # 添加header name
    is_format_ok = False
    for i in range(df.shape[0]):
        _t = str(df.iloc[i, 0])
        if _t == sn:
            # Sample_ID在首行的情况(大多数情况)
            df.columns = [i for i in df.iloc[i].values]
            df = df.iloc[(i + 1):]  # 跳过Sample_ID行
            is_format_ok = True
            break
        if _t.startswith("B"):
            # 没有Sample_ID的情况(极少)
            # 以B开头即表示正常ID, 则取上一行内容为header name, 第一个应该是sn即Sample_ID
            df.columns = [sn] + [i for i in df.iloc[i - 1].values][1:]
            is_format_ok = True
            break
    if not is_format_ok:
        # 文件无法识别, 备份源文件并返回空df
        print("{}:\t0\t0\terror".format(os.path.basename(fp)))
        bak_file(fp, "error")
        summary["error"] += 1
        return pd.DataFrame()
    # 删除Sample_ID为空的行
    df = df[pd.notnull(df[sn])]

    # 删除空列
    df = df.dropna(axis=1, how="all")

    # 删除重复rs号
    df = df.loc[:, ~df.columns.duplicated()]

    rows, cols = df.shape
    for i in range(rows):
        for j in range(1, cols):  # 跳过第一列Sample_ID不作处理
            v = df.iloc[i, j]
            if isinstance(v, str):
                v = v.strip().upper()  # 去空格&转大写
                if v in g_set:
                    # 纯合子加倍
                    df.iloc[i, j] = v * 2
                elif len(v) == 2:
                    # 杂合子排序
                    _v = "".join(sorted(v))
                    df.iloc[i, j] = _v if _v in gg_set else None
                elif v == "DEL":
                    # 单个碱基缺失, 用DD表示
                    df.iloc[i, j] = "DD"
                elif "." in v:
                    # 杂合缺失, 将DEL替换为D
                    _s = sorted(v.split("."), key=lambda x: len(x))[0]  # 提取出单个碱基
                    df.iloc[i, j] = "".join(sorted((_s, "D"))) if _s in g_set else None
                else:
                    # 空值或者超长字符, 填Nan
                    df.iloc[i, j] = None
            else:
                # 非string值填Nan
                df.iloc[i, j] = None
    summary["ok"] += 1
    print("{}\t{}\t{}\tok".format(os.path.basename(fp), rows, cols))
    return df


def insert_many(collection, docs=None, update=True):
    if not docs:
        return
    # $set 的时候, 会更新数据, setOnInsert只插入不更新
    update_key = "$set" if update else "$setOnInsert"
    bulk = pymongo.bulk.BulkOperationBuilder(collection, ordered=False)
    for i in docs:
        if i.get(index_name):
            bulk.find({index_name: i[index_name]}).upsert().update_one({update_key: i})
        else:
            bulk.insert(i)
    result = bulk.execute()
    return result


def df2db(df):
    # 写入数据库
    client = pymongo.MongoClient()
    db = client[db_name]
    collection = db[collection_name]
    try:
        collection.index_information()
    except pymongo.errors.OperationFailure:
        # 索引Sample_ID
        collection.create_index(index_name, unique=True)
    data = loads(df.T.to_json()).values()
    rs = insert_many(collection, data)
    # collection.insert_many(loads(df.T.to_json()).values())
    print("-" * 30)
    print("summary(files):")
    print("\tok:\t\t{}".format(summary['ok']))
    print("\terror:\t\t{}".format(summary['error']))
    print("\ttmp:\t\t{}".format(summary['tmp']))
    print("\tskip:\t\t{}".format(summary['skip']))
    print("\ttotal:\t\t{}".format(sum(summary.values())))
    print("-" * 30)
    print("summary(samples):")
    print("\tupdate:\t\t{}".format(rs.get("nModified")))
    print("\tinsert:\t\t{}".format(rs.get("nUpserted") + rs.get("nInserted")))
    print("\ttotal:\t\t{}".format(collection.count()))
    client.close()


def time_it(func):
    @wraps(func)
    def wrapper(*args, **kwargs):
        t1 = time.time()
        rt = func(*args, **kwargs)
        print("cost: {}s".format(round((time.time() - t1), 2)))
        return rt

    return wrapper


@time_it
def main(dp):
    # 读入Excel, 处理后合并输出到df
    df = reduce(
        lambda df1, df2: df1.append(df2, ignore_index=True),
        map(
            xlsx2df, filter(
                pick_rules,
                get_files(dp)
            )
        )
    )
    # 删除重复ID
    df = df.drop_duplicates(subset="Sample_ID")
    # 删除空列
    df = df.dropna(axis=1, how="all")
    df2db(df)
    print("-" * 30)
    print("data importing complete!")


if __name__ == "__main__":
    if len(sys.argv) != 2:
        sys.exit("Usage: {} <samples_dir>".format(os.path.basename(sys.argv[0])))
    main(sys.argv[1])

运行结果:

------------------------------
summary(files):
        ok:             7
        error:          0
        tmp:            0
        skip:           0
        total:          7
------------------------------
summary(samples):
        update:         0
        insert:         0
        total:          7272
------------------------------
data importing complete!
cost: 1.26s