python 异步协程的实现
需要对一个上百万行的json文件进行清洗,并需要将清洗好的结构化数据重新存储为csv文件。尝试使用pandas的dataframe转存清洗好的数据条目,却发现常规的清洗一条写入一条的速度太慢了,速度主要卡在每次数据的写入,于是为此专门定义了一个 asyn writeline,同时利用 async readline 每次生成 100 个协程处理100行数据,然而测试结果却和常规的按序处理无差别,平均每条数据处理速度都在 0.5 s 左右,感觉是自己的 asyn writeline 有问题,还请大神赐教。
测试代码如下:
import pandas as pd
import json
import time
import asyncio
def trop():
tropicos = pd.DataFrame()
with open(r"/tropicosbase.json", "r") as yn:
count = 0
tropicos['tag'] = None
tropicos.loc[0] = None
async def readline(line):
nonlocal count
js = json.loads(line)
await writeline(js, tropicos, count)
count += 1
tropicos.loc[count] = None
cs = yn.readlines()[:100]
tasks = [asyncio.ensure_future(readline(line)) for line in cs]
loop = asyncio.get_event_loop()
start = time.time()
loop.run_until_complete(asyncio.wait(tasks))
end = time.time()
print(end - start)
tropicos.to_csv(r'/tropicos.csv', index=None)
async def writeline(js, tropicos, count):
for k, v in js.items():
try:
tropicos[k][count] = v
except KeyError:
if k == 'detailsdiv':
pass
else:
tropicos[k] = pd.Series()
tropicos[k][count] = v
trop()
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(3)
这种情况应该用多进程,而不是多线程或协程……
个人经验.
尽量把结果保存在内存中,减少写入文件的次数.
1次写入10000行和写入10000次1行的时间,差别很大.
用 asyncio 处理这种问题没有任何优势,它应该用在 I/O 密集型运算。
你应该用最简单的实现方法,然后使用 cProfile 找出性能的瓶颈。
参考 https://docs.python.org/3/lib...