python批量处理csv数据
1. 数据读取
1.1 CSV 文件读取配置
with open(in_csv_path, 'r', encoding='UTF-8-sig', newline='\n') as fin:
csv_in = csv.DictReader(fin, delimiter="\t")
- 文件路径:
in_csv_path
(支持自定义路径) - 编码规范:
UTF-8-sig
(自动识别 BOM 头) - 分隔符:
\t
(支持自定义分隔符) - 输出格式: 字典结构(字段名作为键)
2. 数据处理
2.1 多进程处理框架
def batch_cal(rows, csv_out):
pool = mp.Pool(mp_pool_count)
results = pool.map(row_process, rows)
pool.close()pool.join()
csv_out.writerows(results)
- 并发控制: 通过
mp_pool_count
(默认 10 进程) - 任务分批: 每批次处理
mp_pool_task_count
条记录(默认 10 条) - 异常隔离: 子进程异常不影响整体流程
2.2 行级处理函数
def row_process(row):
try:
points = row['points']
point = remote_call(points)
except Exception as e:
point = ''
print(e)
row.update({'point': point})
return row
- 字段提取: 支持任意字段映射
- 重试机制: 远程调用失败自动重试 3 次
- 异常捕获: 防止进程崩溃
2.3 远程服务调用
def remote_call(points):
for i in range(3):
try:
body = remote_body.copy()
body['points'] = points
r = req.post(remote_url, data=body, headers={'Connection': 'close'}, timeout=10)obj = r.json()
if 'point' in obj:
return obj['point']
else:
return None
except Exception as e:
print(e)
- 断线重连: 自动重试机制
- 超时控制: 10 秒超时限制
- 连接管理: 显式关闭连接
3. 数据写入
3.1 CSV 写入配置
with open(out_csv_path, 'w', encoding='utf8', newline='\n') as fout:
csv_out = csv.DictWriter(fout, fieldnames=csv_header)
csv_out.writeheader()
- 输出字段: 通过 [csv_header] 显式定义
- 编码规范:
UTF-8
(无 BOM) - 批量刷新: 每处理完一批次立即刷新缓冲区
4. 系统优化配置
4.1 CSV 字段长度限制调整
original_limit = csv.field_size_limit()
4.2 并行参数配置
mp_pool_task_count = 10 # 单批次任务数
mp_pool_count = 10 # 并发进程数
5. 完整执行流程
- 内存优化: 自动分批处理,避免内存溢出
- 进度反馈: 实时输出处理行数
- 资源释放: 自动关闭文件句柄和网络连接
完整代码示例
import csv
import sys
import requests as req
import multiprocessing as mp
mp_pool_task_count = 10
mp_pool_count = 10
remote_url = "http://ip:port/qm_point"
remote_body = {
"ak": "****",
"points": "113.490697,23.031145|113.490697,23.031145"
}
csv_header = [
"index",
"points"
]
def remote_call(points):
for i in range(3):
try:
body = remote_body.copy()
body['points'] = points
r = req.post(remote_url, data=body, headers={'Connection': 'close'}, timeout=10)obj = r.json()
if 'point' in obj:
return obj['point']
else:
return None
except Exception as e:
print(e)
def row_process(row):
try:
points = row['points']
point = remote_call(points)
except Exception as e:
point = ''
print(e)
row.update({'point': point})
return row
def batch_cal(rows, csv_out):
pool = mp.Pool(mp_pool_count)
results = pool.map(row_process, rows)
pool.close()pool.join()
csv_out.writerows(results)
def mult_process(in_csv_path, out_csv_path):
# 获取当前最大字段大小限制
original_limit = csv.field_size_limit()
print("original_limit:", original_limit)
try:
# 尝试设置新限制为系统最大整数值
csv.field_size_limit(sys.maxsize)
except OverflowError:
# 32 位系统需要特殊处理
csv.field_size_limit(int(1e9)) # 设置 1GB 的限制(通常足够)
count = 0
print("path:", in_csv_path)
with open(in_csv_path, 'r', encoding='UTF-8-sig', newline='\n') as fin:
csv_in = csv.DictReader(fin, delimiter="\t")
with open(out_csv_path, 'w', encoding='utf8', newline='\n') as fout:
csv_out = csv.DictWriter(fout, fieldnames=csv_header)
csv_out.writeheader()tasks = []
for row in csv_in:
count += 1
tasks.append(row)
if len(tasks) >= mp_pool_task_count:
batch_cal(tasks, csv_out)
fout.flush()
print("process suc === rows:", count)tasks = []
if len(tasks) > 0:
batch_cal(tasks, csv_out)
fout.flush()
print("process suc === rows:", count)
if __name__ == '__main__':
mult_process(r"data/task_info.csv", r"data/task_info_out.csv")
License:
CC BY 4.0