使用Pandas优化千万级数据分析流程

举报
柠檬🍋 发表于 2026/02/26 20:25:41 2026/02/26
【摘要】 使用Pandas优化千万级数据分析流程Pandas是Python数据分析的核心库,但在处理大规模数据时需要进行优化。本文将介绍如何优化Pandas以高效处理千万级数据分析任务。 Pandas性能优化策略处理大规模数据的关键策略:数据类型优化:使用更高效的dtype分块处理:避免一次性加载所有数据向量化操作:避免循环,使用内置函数并行处理:利用多核加速 Pandas优化核心实现"""Pand...

使用Pandas优化千万级数据分析流程

Pandas是Python数据分析的核心库,但在处理大规模数据时需要进行优化。本文将介绍如何优化Pandas以高效处理千万级数据分析任务。

Pandas性能优化策略

处理大规模数据的关键策略:

  • 数据类型优化:使用更高效的dtype
  • 分块处理:避免一次性加载所有数据
  • 向量化操作:避免循环,使用内置函数
  • 并行处理:利用多核加速

Pandas优化核心实现

"""
Pandas千万级数据分析优化
包含数据类型优化、分块处理、向量化操作等
"""

import pandas as pd
import numpy as np
import time
from typing import Callable, List, Iterator
import gc


class PandasOptimizer:
    """Pandas优化工具类"""
    
    def __init__(self):
        self.results = []
    
    def benchmark(self, func: Callable, *args, iterations: int = 3, name: str = "") -> float:
        """性能测试"""
        times = []
        for _ in range(iterations):
            gc.collect()
            start = time.perf_counter()
            result = func(*args)
            end = time.perf_counter()
            times.append(end - start)
        
        avg_time = np.mean(times)
        print(f"{name or func.__name__}: {avg_time:.4f}s")
        return avg_time
    
    def optimize_dtypes(self, df: pd.DataFrame) -> pd.DataFrame:
        """优化数据类型"""
        df_optimized = df.copy()
        
        # 优化整数类型
        for col in df_optimized.select_dtypes(include=['int']).columns:
            col_min = df_optimized[col].min()
            col_max = df_optimized[col].max()
            
            if col_min >= 0:
                if col_max < 255:
                    df_optimized[col] = df_optimized[col].astype(np.uint8)
                elif col_max < 65535:
                    df_optimized[col] = df_optimized[col].astype(np.uint16)
                elif col_max < 4294967295:
                    df_optimized[col] = df_optimized[col].astype(np.uint32)
            else:
                if col_min > np.iinfo(np.int8).min and col_max < np.iinfo(np.int8).max:
                    df_optimized[col] = df_optimized[col].astype(np.int8)
                elif col_min > np.iinfo(np.int16).min and col_max < np.iinfo(np.int16).max:
                    df_optimized[col] = df_optimized[col].astype(np.int16)
                elif col_min > np.iinfo(np.int32).min and col_max < np.iinfo(np.int32).max:
                    df_optimized[col] = df_optimized[col].astype(np.int32)
        
        # 优化浮点类型
        for col in df_optimized.select_dtypes(include=['float']).columns:
            df_optimized[col] = df_optimized[col].astype(np.float32)
        
        # 优化对象类型(类别型)
        for col in df_optimized.select_dtypes(include=['object']).columns:
            num_unique = df_optimized[col].nunique()
            num_total = len(df_optimized[col])
            if num_unique / num_total < 0.5:  # 类别比例小于50%
                df_optimized[col] = df_optimized[col].astype('category')
        
        return df_optimized
    
    def compare_memory_usage(self, df_original: pd.DataFrame, df_optimized: pd.DataFrame):
        """对比内存使用"""
        mem_original = df_original.memory_usage(deep=True).sum() / 1024**2
        mem_optimized = df_optimized.memory_usage(deep=True).sum() / 1024**2
        
        print(f"\n内存使用对比:")
        print(f"  原始数据: {mem_original:.2f} MB")
        print(f"  优化后: {mem_optimized:.2f} MB")
        print(f"  节省: {(1 - mem_optimized/mem_original)*100:.1f}%")


def create_large_dataset(n_rows: int = 1000000) -> pd.DataFrame:
    """创建大规模测试数据集"""
    np.random.seed(42)
    
    data = {
        'id': range(n_rows),
        'user_id': np.random.randint(1, 100000, n_rows),
        'category': np.random.choice(['A', 'B', 'C', 'D', 'E'], n_rows),
        'subcategory': np.random.choice(['X', 'Y', 'Z'], n_rows),
        'value1': np.random.randn(n_rows),
        'value2': np.random.randint(1, 1000, n_rows),
        'value3': np.random.randint(1, 100, n_rows),
        'date': pd.date_range('2020-01-01', periods=n_rows, freq='min'),
        'is_active': np.random.choice([True, False], n_rows),
        'score': np.random.uniform(0, 100, n_rows)
    }
    
    return pd.DataFrame(data)


def demonstrate_chunk_processing():
    """演示分块处理"""
    print("\n" + "="*60)
    print("分块处理演示")
    print("="*60)
    
    # 创建大文件
    chunk_size = 100000
    total_rows = 1000000
    
    print(f"\n模拟处理 {total_rows} 行数据,每块 {chunk_size} 行")
    
    def process_with_chunks():
        """使用分块处理"""
        results = []
        for chunk_start in range(0, total_rows, chunk_size):
            chunk_end = min(chunk_start + chunk_size, total_rows)
            # 模拟处理每个块
            chunk_data = pd.DataFrame({'value': np.random.randn(chunk_end - chunk_start)})
            result = chunk_data['value'].sum()
            results.append(result)
        return sum(results)
    
    def process_without_chunks():
        """不使用分块(一次性加载)"""
        all_data = pd.DataFrame({'value': np.random.randn(total_rows)})
        return all_data['value'].sum()
    
    optimizer = PandasOptimizer()
    
    print("\n分块处理:")
    time_chunked = optimizer.benchmark(process_with_chunks, name="分块处理")
    
    print("\n一次性处理:")
    time_full = optimizer.benchmark(process_without_chunks, name="一次性处理")
    
    print(f"\n分块处理优势: 内存占用低,适合超大数据集")


def demonstrate_vectorization():
    """演示向量化操作"""
    print("\n" + "="*60)
    print("向量化操作演示")
    print("="*60)
    
    df = create_large_dataset(1000000)
    
    # 方法1: Python循环
    def loop_calculation():
        result = []
        for _, row in df.iterrows():
            if row['value2'] > 500:
                result.append(row['value1'] * 2)
            else:
                result.append(row['value1'])
        return pd.Series(result)
    
    # 方法2: apply
    def apply_calculation():
        return df.apply(lambda row: row['value1'] * 2 if row['value2'] > 500 else row['value1'], axis=1)
    
    # 方法3: 向量化
    def vectorized_calculation():
        return np.where(df['value2'] > 500, df['value1'] * 2, df['value1'])
    
    optimizer = PandasOptimizer()
    
    print("\n不同方法性能对比:")
    print("注意: iterrows循环非常慢,只测试小数据")
    
    # 只用少量数据测试iterrows
    df_small = df.head(1000)
    
    start = time.perf_counter()
    loop_calculation()
    print(f"iterrows循环 (1000行): {time.perf_counter() - start:.4f}s")
    
    start = time.perf_counter()
    vectorized_calculation()
    print(f"向量化操作 (100万行): {time.perf_counter() - start:.4f}s")


def demonstrate_query_optimization():
    """演示查询优化"""
    print("\n" + "="*60)
    print("查询优化演示")
    print("="*60)
    
    df = create_large_dataset(1000000)
    
    # 方法1: 链式布尔索引
    def chained_indexing():
        temp = df[df['category'] == 'A']
        temp = temp[temp['value2'] > 500]
        temp = temp[temp['is_active'] == True]
        return temp
    
    # 方法2: 单次布尔索引
    def single_indexing():
        mask = (df['category'] == 'A') & (df['value2'] > 500) & (df['is_active'] == True)
        return df[mask]
    
    # 方法3: 使用query
    def query_method():
        return df.query("category == 'A' and value2 > 500 and is_active == True")
    
    optimizer = PandasOptimizer()
    
    print("\n查询性能对比:")
    optimizer.benchmark(chained_indexing, name="链式索引")
    optimizer.benchmark(single_indexing, name="单次索引")
    optimizer.benchmark(query_method, name="query方法")


def demonstrate_groupby_optimization():
    """演示分组聚合优化"""
    print("\n" + "="*60)
    print("分组聚合优化演示")
    print("="*60)
    
    df = create_large_dataset(1000000)
    
    # 方法1: 多次groupby
    def multiple_groupby():
        mean_val = df.groupby('category')['value1'].mean()
        sum_val = df.groupby('category')['value2'].sum()
        count_val = df.groupby('category')['value3'].count()
        return pd.concat([mean_val, sum_val, count_val], axis=1)
    
    # 方法2: agg聚合
    def agg_groupby():
        return df.groupby('category').agg({
            'value1': 'mean',
            'value2': 'sum',
            'value3': 'count'
        })
    
    optimizer = PandasOptimizer()
    
    print("\n分组聚合性能对比:")
    optimizer.benchmark(multiple_groupby, name="多次groupby")
    optimizer.benchmark(agg_groupby, name="agg聚合")


def main():
    """主函数"""
    print("="*60)
    print("Pandas千万级数据分析优化")
    print("="*60)
    
    # 创建测试数据
    print("\n创建测试数据集...")
    df = create_large_dataset(1000000)
    print(f"数据集大小: {len(df)} 行 x {len(df.columns)} 列")
    
    # 1. 数据类型优化
    print("\n" + "="*60)
    print("数据类型优化")
    print("="*60)
    
    optimizer = PandasOptimizer()
    df_optimized = optimizer.optimize_dtypes(df)
    optimizer.compare_memory_usage(df, df_optimized)
    
    # 显示数据类型变化
    print("\n数据类型变化:")
    for col in df.columns:
        print(f"  {col}: {df[col].dtype} -> {df_optimized[col].dtype}")
    
    # 2. 分块处理
    demonstrate_chunk_processing()
    
    # 3. 向量化操作
    demonstrate_vectorization()
    
    # 4. 查询优化
    demonstrate_query_optimization()
    
    # 5. 分组聚合优化
    demonstrate_groupby_optimization()
    
    print("\n" + "="*60)
    print("Pandas优化总结")
    print("="*60)
    print("1. 数据类型优化: 使用category、int32/float32等")
    print("2. 分块处理: 使用chunksize处理超大数据")
    print("3. 向量化操作: 避免iterrows,使用内置函数")
    print("4. 查询优化: 使用单次布尔索引或query")
    print("5. 聚合优化: 使用agg代替多次groupby")
    print("6. 内存管理: 及时删除不需要的变量")
    print("="*60)


if __name__ == "__main__":
    main()

Pandas数据处理流程图

小数据
大数据
原始数据
数据类型优化
数据大小
直接处理
分块处理
向量化操作
分组聚合
结果输出

关键要点

  1. 数据类型优化:可节省50%以上内存
  2. 分块处理:避免内存溢出
  3. 向量化:比循环快100倍以上
  4. 查询优化:单次布尔索引效率最高
  5. 聚合优化:agg减少多次扫描

通过合理优化,Pandas可以高效处理千万级数据分析任务。

【声明】本内容来自华为云开发者社区博主,不代表华为云及华为云开发者社区的观点和立场。转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息,否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。