如何在python代码内部将获取的数据分流?

发布于 2022-09-07 08:52:01 字数 578 浏览 47 评论 0

比如我使用kafka或rabbitmq从消息队列中获取数据,来源只有一条队列,不能从数据源分流数据。而计算难度较高,所以要在内部进行分流。比如我的消息处理类为:

class Worker(threading.Thread):
    def __init__(self):
        self.raw = []
    
    def run(self):
        while True:
            if self.raw:
                d = self.raw.pop()
                处理d数据
                将结果保存到批量插入的类中
    

然后在程序运行时,我创建若干个Worker,然后将消息源传入的数据,分流保存到这若干个Worker的raw属性中。问题就在于,如何能开销比较低的分流这些数据?
还有这样多线程处理的思路是否正确?我之前测试过,感觉python自带的Queue的效率并不是特别高。
另外,假如我要设计弹性创建Worker,应该如何用python代码完成,就是当数据流较大时,我就追加创建一些Worker,当数据流降低时,就销毁一些Worker。

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(3

余生一个溪 2022-09-14 08:52:01

对 cpython 来说,耗 CPU 资源的运算,应该使用多进程而不是多线程。

在本例中,有两个地方可以做"分流",其一是如你所提的,在接收到数据后作分配,另一个是利用消息框架本身。
以 RabbitMQ 为例,你可以参考这个教程 Work Queues,https://www.rabbitmq.com/tuto...

其它诸如“找更省开销的方法”,“弹性工作池”等等,不妨放一下,先把功能实现了,再针对瓶颈做优化,以达到事半功倍的效果。

关于优化 python 性能,有一篇文章可以参考下 https://pypy.org/performance....

魔法唧唧 2022-09-14 08:52:01

最后通过阅读一些文档发现,如果涉及很密集的运算,那么选择python本来就不明智。Queue的设计也没有考虑大量的流量处理。如果问的问题一直没有答案,那最有可能的就是问题本身就不对。

山川志 2022-09-14 08:52:01

如果确定是计算密集确实不适合使用python中的多线程,但是是可以考虑使用多进程的。你不需要通过自己创建一个queue来进行内部分流,即使需要一个Queue, 也是需要通过给Queue设置大小来限制Queue的流量。

以rabbitmq为例, 请看https://www.rabbitmq.com/tuto...

在rabbitmq的官方例子中,是使用pika做为rabbitmq的客户端的, 消息模型应该是和你的是一致的,稍微修改一下官方的work.py例子,通过建立多个rabbitmq客户端来消费消息:

#!/usr/bin/env python
import pika
import time
from concurrent.futures import ProcessPoolExecutor
# from concurrent.futures import ThreadPoolExecutor


connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))

channels = [
    (1, connection.channel(1)),
    (2, connection.channel(2)),
    (3, connection.channel(3)),
]

def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    time.sleep(0.2)
    print(" [x] Done: %s" % ch.channel_number)
    ch.basic_ack(delivery_tag=method.delivery_tag)


for _, channel in channels:
    channel.queue_declare(queue='task_queue', durable=True)
    channel.basic_qos(prefetch_count=1)
    channel.basic_consume(callback, queue='task_queue')


def start_consumer(channel_name):
    dict(channels).get(channel_name).start_consuming()


print(' [*] Waiting for messages. To exit press CTRL+C')

with ProcessPoolExecutor(max_workers=len(channels)) as e:
    e.map(start_consumer, range(1, 4))

# with ThreadPoolExecutor(max_workers=len(channels)) as e:
#     e.map(start_consumer, range(1, 4))

弹性的创建worker我觉的从程序(worker.py)内部去实现是比较困难的,从程序外部来看更容易实现, 首先监控流量, 流量增大可以通过启动更多的worker.py脚本来加快消息的消费; 反之, 减少worker.py启动的数量。

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文