使用Pandas优化千万级数据分析流程
【摘要】 使用Pandas优化千万级数据分析流程Pandas是Python数据分析的核心库,但在处理大规模数据时需要进行优化。本文将介绍如何优化Pandas以高效处理千万级数据分析任务。 Pandas性能优化策略处理大规模数据的关键策略:数据类型优化:使用更高效的dtype分块处理:避免一次性加载所有数据向量化操作:避免循环,使用内置函数并行处理:利用多核加速 Pandas优化核心实现"""Pand...
使用Pandas优化千万级数据分析流程
Pandas是Python数据分析的核心库,但在处理大规模数据时需要进行优化。本文将介绍如何优化Pandas以高效处理千万级数据分析任务。
Pandas性能优化策略
处理大规模数据的关键策略:
- 数据类型优化:使用更高效的dtype
- 分块处理:避免一次性加载所有数据
- 向量化操作:避免循环,使用内置函数
- 并行处理:利用多核加速
Pandas优化核心实现
"""
Pandas千万级数据分析优化
包含数据类型优化、分块处理、向量化操作等
"""
import pandas as pd
import numpy as np
import time
from typing import Callable, List, Iterator
import gc
class PandasOptimizer:
"""Pandas优化工具类"""
def __init__(self):
self.results = []
def benchmark(self, func: Callable, *args, iterations: int = 3, name: str = "") -> float:
"""性能测试"""
times = []
for _ in range(iterations):
gc.collect()
start = time.perf_counter()
result = func(*args)
end = time.perf_counter()
times.append(end - start)
avg_time = np.mean(times)
print(f"{name or func.__name__}: {avg_time:.4f}s")
return avg_time
def optimize_dtypes(self, df: pd.DataFrame) -> pd.DataFrame:
"""优化数据类型"""
df_optimized = df.copy()
# 优化整数类型
for col in df_optimized.select_dtypes(include=['int']).columns:
col_min = df_optimized[col].min()
col_max = df_optimized[col].max()
if col_min >= 0:
if col_max < 255:
df_optimized[col] = df_optimized[col].astype(np.uint8)
elif col_max < 65535:
df_optimized[col] = df_optimized[col].astype(np.uint16)
elif col_max < 4294967295:
df_optimized[col] = df_optimized[col].astype(np.uint32)
else:
if col_min > np.iinfo(np.int8).min and col_max < np.iinfo(np.int8).max:
df_optimized[col] = df_optimized[col].astype(np.int8)
elif col_min > np.iinfo(np.int16).min and col_max < np.iinfo(np.int16).max:
df_optimized[col] = df_optimized[col].astype(np.int16)
elif col_min > np.iinfo(np.int32).min and col_max < np.iinfo(np.int32).max:
df_optimized[col] = df_optimized[col].astype(np.int32)
# 优化浮点类型
for col in df_optimized.select_dtypes(include=['float']).columns:
df_optimized[col] = df_optimized[col].astype(np.float32)
# 优化对象类型(类别型)
for col in df_optimized.select_dtypes(include=['object']).columns:
num_unique = df_optimized[col].nunique()
num_total = len(df_optimized[col])
if num_unique / num_total < 0.5: # 类别比例小于50%
df_optimized[col] = df_optimized[col].astype('category')
return df_optimized
def compare_memory_usage(self, df_original: pd.DataFrame, df_optimized: pd.DataFrame):
"""对比内存使用"""
mem_original = df_original.memory_usage(deep=True).sum() / 1024**2
mem_optimized = df_optimized.memory_usage(deep=True).sum() / 1024**2
print(f"\n内存使用对比:")
print(f" 原始数据: {mem_original:.2f} MB")
print(f" 优化后: {mem_optimized:.2f} MB")
print(f" 节省: {(1 - mem_optimized/mem_original)*100:.1f}%")
def create_large_dataset(n_rows: int = 1000000) -> pd.DataFrame:
"""创建大规模测试数据集"""
np.random.seed(42)
data = {
'id': range(n_rows),
'user_id': np.random.randint(1, 100000, n_rows),
'category': np.random.choice(['A', 'B', 'C', 'D', 'E'], n_rows),
'subcategory': np.random.choice(['X', 'Y', 'Z'], n_rows),
'value1': np.random.randn(n_rows),
'value2': np.random.randint(1, 1000, n_rows),
'value3': np.random.randint(1, 100, n_rows),
'date': pd.date_range('2020-01-01', periods=n_rows, freq='min'),
'is_active': np.random.choice([True, False], n_rows),
'score': np.random.uniform(0, 100, n_rows)
}
return pd.DataFrame(data)
def demonstrate_chunk_processing():
"""演示分块处理"""
print("\n" + "="*60)
print("分块处理演示")
print("="*60)
# 创建大文件
chunk_size = 100000
total_rows = 1000000
print(f"\n模拟处理 {total_rows} 行数据,每块 {chunk_size} 行")
def process_with_chunks():
"""使用分块处理"""
results = []
for chunk_start in range(0, total_rows, chunk_size):
chunk_end = min(chunk_start + chunk_size, total_rows)
# 模拟处理每个块
chunk_data = pd.DataFrame({'value': np.random.randn(chunk_end - chunk_start)})
result = chunk_data['value'].sum()
results.append(result)
return sum(results)
def process_without_chunks():
"""不使用分块(一次性加载)"""
all_data = pd.DataFrame({'value': np.random.randn(total_rows)})
return all_data['value'].sum()
optimizer = PandasOptimizer()
print("\n分块处理:")
time_chunked = optimizer.benchmark(process_with_chunks, name="分块处理")
print("\n一次性处理:")
time_full = optimizer.benchmark(process_without_chunks, name="一次性处理")
print(f"\n分块处理优势: 内存占用低,适合超大数据集")
def demonstrate_vectorization():
"""演示向量化操作"""
print("\n" + "="*60)
print("向量化操作演示")
print("="*60)
df = create_large_dataset(1000000)
# 方法1: Python循环
def loop_calculation():
result = []
for _, row in df.iterrows():
if row['value2'] > 500:
result.append(row['value1'] * 2)
else:
result.append(row['value1'])
return pd.Series(result)
# 方法2: apply
def apply_calculation():
return df.apply(lambda row: row['value1'] * 2 if row['value2'] > 500 else row['value1'], axis=1)
# 方法3: 向量化
def vectorized_calculation():
return np.where(df['value2'] > 500, df['value1'] * 2, df['value1'])
optimizer = PandasOptimizer()
print("\n不同方法性能对比:")
print("注意: iterrows循环非常慢,只测试小数据")
# 只用少量数据测试iterrows
df_small = df.head(1000)
start = time.perf_counter()
loop_calculation()
print(f"iterrows循环 (1000行): {time.perf_counter() - start:.4f}s")
start = time.perf_counter()
vectorized_calculation()
print(f"向量化操作 (100万行): {time.perf_counter() - start:.4f}s")
def demonstrate_query_optimization():
"""演示查询优化"""
print("\n" + "="*60)
print("查询优化演示")
print("="*60)
df = create_large_dataset(1000000)
# 方法1: 链式布尔索引
def chained_indexing():
temp = df[df['category'] == 'A']
temp = temp[temp['value2'] > 500]
temp = temp[temp['is_active'] == True]
return temp
# 方法2: 单次布尔索引
def single_indexing():
mask = (df['category'] == 'A') & (df['value2'] > 500) & (df['is_active'] == True)
return df[mask]
# 方法3: 使用query
def query_method():
return df.query("category == 'A' and value2 > 500 and is_active == True")
optimizer = PandasOptimizer()
print("\n查询性能对比:")
optimizer.benchmark(chained_indexing, name="链式索引")
optimizer.benchmark(single_indexing, name="单次索引")
optimizer.benchmark(query_method, name="query方法")
def demonstrate_groupby_optimization():
"""演示分组聚合优化"""
print("\n" + "="*60)
print("分组聚合优化演示")
print("="*60)
df = create_large_dataset(1000000)
# 方法1: 多次groupby
def multiple_groupby():
mean_val = df.groupby('category')['value1'].mean()
sum_val = df.groupby('category')['value2'].sum()
count_val = df.groupby('category')['value3'].count()
return pd.concat([mean_val, sum_val, count_val], axis=1)
# 方法2: agg聚合
def agg_groupby():
return df.groupby('category').agg({
'value1': 'mean',
'value2': 'sum',
'value3': 'count'
})
optimizer = PandasOptimizer()
print("\n分组聚合性能对比:")
optimizer.benchmark(multiple_groupby, name="多次groupby")
optimizer.benchmark(agg_groupby, name="agg聚合")
def main():
"""主函数"""
print("="*60)
print("Pandas千万级数据分析优化")
print("="*60)
# 创建测试数据
print("\n创建测试数据集...")
df = create_large_dataset(1000000)
print(f"数据集大小: {len(df)} 行 x {len(df.columns)} 列")
# 1. 数据类型优化
print("\n" + "="*60)
print("数据类型优化")
print("="*60)
optimizer = PandasOptimizer()
df_optimized = optimizer.optimize_dtypes(df)
optimizer.compare_memory_usage(df, df_optimized)
# 显示数据类型变化
print("\n数据类型变化:")
for col in df.columns:
print(f" {col}: {df[col].dtype} -> {df_optimized[col].dtype}")
# 2. 分块处理
demonstrate_chunk_processing()
# 3. 向量化操作
demonstrate_vectorization()
# 4. 查询优化
demonstrate_query_optimization()
# 5. 分组聚合优化
demonstrate_groupby_optimization()
print("\n" + "="*60)
print("Pandas优化总结")
print("="*60)
print("1. 数据类型优化: 使用category、int32/float32等")
print("2. 分块处理: 使用chunksize处理超大数据")
print("3. 向量化操作: 避免iterrows,使用内置函数")
print("4. 查询优化: 使用单次布尔索引或query")
print("5. 聚合优化: 使用agg代替多次groupby")
print("6. 内存管理: 及时删除不需要的变量")
print("="*60)
if __name__ == "__main__":
main()
Pandas数据处理流程图
关键要点
- 数据类型优化:可节省50%以上内存
- 分块处理:避免内存溢出
- 向量化:比循环快100倍以上
- 查询优化:单次布尔索引效率最高
- 聚合优化:agg减少多次扫描
通过合理优化,Pandas可以高效处理千万级数据分析任务。
【声明】本内容来自华为云开发者社区博主,不代表华为云及华为云开发者社区的观点和立场。转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息,否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱:
cloudbbs@huaweicloud.com
- 点赞
- 收藏
- 关注作者
评论(0)