文章

python批量处理csv数据

1. 数据读取

1.1 CSV 文件读取配置

    with open(in_csv_path, 'r', encoding='UTF-8-sig', newline='\n') as fin:
        csv_in = csv.DictReader(fin, delimiter="\t")
  • 文件路径: in_csv_path(支持自定义路径)
  • 编码规范: UTF-8-sig(自动识别 BOM 头)
  • 分隔符: \t(支持自定义分隔符)
  • 输出格式: 字典结构(字段名作为键)

2. 数据处理

2.1 多进程处理框架

def batch_cal(rows, csv_out):
    pool = mp.Pool(mp_pool_count)
    results = pool.map(row_process, rows)
    pool.close()pool.join()
    csv_out.writerows(results)
  • 并发控制: 通过 mp_pool_count(默认 10 进程)
  • 任务分批: 每批次处理 mp_pool_task_count 条记录(默认 10 条)
  • 异常隔离: 子进程异常不影响整体流程

2.2 行级处理函数

def row_process(row):
    try:
        points = row['points']
        point = remote_call(points)
    except Exception as e:
        point = ''
        print(e)

    row.update({'point': point})
    return row
  • 字段提取: 支持任意字段映射
  • 重试机制: 远程调用失败自动重试 3 次
  • 异常捕获: 防止进程崩溃

2.3 远程服务调用

def remote_call(points):
    for i in range(3):
        try:
            body = remote_body.copy()
            body['points'] = points
            r = req.post(remote_url, data=body, headers={'Connection': 'close'}, timeout=10)obj = r.json()
            if 'point' in obj:
                return obj['point']
            else:
                return None
        except Exception as e:
            print(e)
  • 断线重连: 自动重试机制
  • 超时控制: 10 秒超时限制
  • 连接管理: 显式关闭连接

3. 数据写入

3.1 CSV 写入配置

with open(out_csv_path, 'w', encoding='utf8', newline='\n') as fout:
    csv_out = csv.DictWriter(fout, fieldnames=csv_header)
    csv_out.writeheader()
  • 输出字段: 通过 [csv_header] 显式定义
  • 编码规范: UTF-8(无 BOM)
  • 批量刷新: 每处理完一批次立即刷新缓冲区

4. 系统优化配置

4.1 CSV 字段长度限制调整

original_limit = csv.field_size_limit()

4.2 并行参数配置

mp_pool_task_count = 10 # 单批次任务数 
mp_pool_count = 10 # 并发进程数

5. 完整执行流程

  • 内存优化: 自动分批处理,避免内存溢出
  • 进度反馈: 实时输出处理行数
  • 资源释放: 自动关闭文件句柄和网络连接

完整代码示例

import csv
import sys

import requests as req

import multiprocessing as mp

mp_pool_task_count = 10
mp_pool_count = 10
remote_url = "http://ip:port/qm_point"
remote_body = {
    "ak": "****",
    "points": "113.490697,23.031145|113.490697,23.031145"
}

csv_header = [
    "index",
    "points"
]


def remote_call(points):
    for i in range(3):
        try:
            body = remote_body.copy()
            body['points'] = points
            r = req.post(remote_url, data=body, headers={'Connection': 'close'}, timeout=10)obj = r.json()
            if 'point' in obj:
                return obj['point']
            else:
                return None
        except Exception as e:
            print(e)


def row_process(row):
    try:
        points = row['points']
        point = remote_call(points)
    except Exception as e:
        point = ''
        print(e)

    row.update({'point': point})
    return row


def batch_cal(rows, csv_out):
    pool = mp.Pool(mp_pool_count)
    results = pool.map(row_process, rows)
    pool.close()pool.join()
    csv_out.writerows(results)


def mult_process(in_csv_path, out_csv_path):
    # 获取当前最大字段大小限制
    original_limit = csv.field_size_limit()
    print("original_limit:", original_limit)

    try:
        # 尝试设置新限制为系统最大整数值
        csv.field_size_limit(sys.maxsize)
    except OverflowError:
        # 32 位系统需要特殊处理
        csv.field_size_limit(int(1e9))  # 设置 1GB 的限制(通常足够)

    count = 0
    print("path:", in_csv_path)
    with open(in_csv_path, 'r', encoding='UTF-8-sig', newline='\n') as fin:
        csv_in = csv.DictReader(fin, delimiter="\t")
        with open(out_csv_path, 'w', encoding='utf8', newline='\n') as fout:
            csv_out = csv.DictWriter(fout, fieldnames=csv_header)
            csv_out.writeheader()tasks = []
            for row in csv_in:
                count += 1
                tasks.append(row)
                if len(tasks) >= mp_pool_task_count:
                    batch_cal(tasks, csv_out)
                    fout.flush()
                    print("process suc === rows:", count)tasks = []
            if len(tasks) > 0:
                batch_cal(tasks, csv_out)
                fout.flush()
                print("process suc === rows:", count)


if __name__ == '__main__':
    mult_process(r"data/task_info.csv", r"data/task_info_out.csv")
License:  CC BY 4.0