
第五章:高性能数据处理与分析
5.1 使用Pandas进行临床数据清洗与特征工程
1import pandas as pd 2import numpy as np 3from sklearn.impute import SimpleImputer 4from sklearn.preprocessing import StandardScaler, OneHotEncoder 5 6# 加载模拟临床数据 7df = pd.read_csv('clinical_cohort.csv') 8 9# 处理缺失值 10num_imputer = SimpleImputer(strategy='median') 11cat_imputer = SimpleImputer(strategy='most_frequent') 12 13num_cols = df.select_dtypes(include=np.number).columns.tolist() 14cat_cols = df.select_dtypes(include='object').columns.tolist() 15 16df[num_cols] = num_imputer.fit_transform(df[num_cols]) 17df[cat_cols] = cat_imputer.fit_transform(df[cat_cols]) 18 19# 构建药物特征: 合并多条处方为一个特征向量 20prescriptions = pd.DataFrame({ 21 22 23 'patient_id': [1,1,2,2], 24 'drug_name': ['Metformin', 'Glipizide', 'Metformin', 'Insulin'], 25 'dose': [500, 5, 1000, 10] 26}) 27# One-hot编码药物类型 28drug_dummies = pd.get_dummies(prescriptions['drug_name']) 29drug_features = drug_dummies.groupby(prescriptions['patient_id']).sum() 30df = df.merge(drug_features, left_on='patient_id', right_index=True, how='left') 31 32# 时间窗口特征 33labs = pd.DataFrame({ 34 35 36 'patient_id': [1,1,2], 37 'lab_date': pd.to_datetime(['2024-01-10', '2024-02-15', '2024-03-01']), 38 'hba1c': [7.2, 6.8, 8.1] 39}) 40# 计算每个患者最近一次HbA1c与变化趋势 41labs_sorted = labs.sort_values(['patient_id', 'lab_date']) 42labs_sorted['prev_hba1c'] = labs_sorted.groupby('patient_id')['hba1c'].shift(1) 43labs_sorted['hba1c_trend'] = labs_sorted['hba1c'] - labs_sorted['prev_hba1c'] 44latest_labs = labs_sorted.groupby('patient_id').last().reset_index() 45df = df.merge(latest_labs[['patient_id', 'hba1c', 'hba1c_trend']], on='patient_id', how='left') 46 47print(df.head()) 48
5.2 分布式处理:Dask与PySpark
当数据超过单机内存时,使用Dask(类似Pandas API)或PySpark。
Dask DataFrame
1import dask.dataframe as dd 2import dask.array as da 3from dask.distributed import Client 4 5client = Client(n_workers=4, threads_per_worker=2, memory_limit='4GB') 6print(client) 7 8# 读取大型CSV 9ddf = dd.read_csv('s3://my-bucket/genomics/phenotypes/*.csv', 10 storage_options={ 11 12 13 'key': 'minioadmin', 14 'secret': 'minioadmin', 15 'client_kwargs': { 16 17 'endpoint_url': 'http://minio:9000'} 18 }) 19# 过滤和聚合 20filtered = ddf[ddf['age'] > 18] 21grouped = filtered.groupby('disease').agg({ 22 23 'cost': 'mean', 'patient_id': 'count'}) 24result = grouped.compute
《精准医学的数据平台化与Python编程实战(中)》 是转载文章,点击查看原文。
