python 异步协程的实现

发布于 2022-09-07 20:22:16 字数 1375 浏览 10 评论 0

需要对一个上百万行的json文件进行清洗,并需要将清洗好的结构化数据重新存储为csv文件。尝试使用pandas的dataframe转存清洗好的数据条目,却发现常规的清洗一条写入一条的速度太慢了,速度主要卡在每次数据的写入,于是为此专门定义了一个 asyn writeline,同时利用 async readline 每次生成 100 个协程处理100行数据,然而测试结果却和常规的按序处理无差别,平均每条数据处理速度都在 0.5 s 左右,感觉是自己的 asyn writeline 有问题,还请大神赐教。

测试代码如下:

import pandas as pd
import json
import time
import asyncio

def trop():
    tropicos = pd.DataFrame()
    with open(r"/tropicosbase.json", "r") as yn:
        count = 0
        tropicos['tag'] = None
        tropicos.loc[0] = None
        async def readline(line):
            nonlocal count
            js = json.loads(line)
            await writeline(js, tropicos, count)
            count += 1
            tropicos.loc[count] = None
        cs = yn.readlines()[:100]
        tasks = [asyncio.ensure_future(readline(line)) for line in cs]
        loop = asyncio.get_event_loop()
        start = time.time()
        loop.run_until_complete(asyncio.wait(tasks))
        end = time.time()
        print(end - start)

    tropicos.to_csv(r'/tropicos.csv', index=None)


async def writeline(js, tropicos, count):
    for k, v in js.items():
        try:
            tropicos[k][count] = v
        except KeyError:
            if k == 'detailsdiv':
                pass
            else:
                tropicos[k] = pd.Series()
                tropicos[k][count] = v

trop()

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(3

梦一生花开无言 2022-09-14 20:22:17

这种情况应该用多进程,而不是多线程或协程……

︶ ̄淡然 2022-09-14 20:22:17

个人经验.
尽量把结果保存在内存中,减少写入文件的次数.
1次写入10000行和写入10000次1行的时间,差别很大.

橘亓 2022-09-14 20:22:16

用 asyncio 处理这种问题没有任何优势,它应该用在 I/O 密集型运算。

你应该用最简单的实现方法,然后使用 cProfile 找出性能的瓶颈。

参考 https://docs.python.org/3/lib...

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文