Locust Python:Kafka消费者在单独的线程中

发布于 2025-01-27 22:47:30 字数 4055 浏览 4 评论 0原文

我正在使用 locust 用于负载测试。我想在单独的线程中注册KAFKA消费者,以测量消息处理的时间。这是我现在得到的:

def register_kafka_consumer(topic, environment):
    def poll():
        request_type = 'KAFKA_MESSAGE_CONSUMED'
        consumer = Consumer({
            'bootstrap.servers': bootstrap_servers,
            'group.id': group_id
        })
        consumer.subscribe([topic])
        logging.getLogger().info("Consumer subscribed to topic '%s': %s", topic, consumer)
        try:
            while True:
                msg = consumer.poll()
                if msg is not None and not msg.error():
                    # diff_millis calculation
                    environment.events.request.fire(
                        request_type=request_type,
                        name=topic,
                        response=msg.value(),
                        response_time=diff_millis,
                    )
        except Exception:
            logging.getLogger().error("Error during polling message for consumer: %s",
                                      str(consumer), exc_info=True)
        finally:
            logging.getLogger().info("Kafka consumer closed: %s", str(consumer))

    # here. I'm starting a separate thread
    t = threading.Thread(target=poll)
    t.start()
    logging.getLogger().info("Thread started")


@events.init.add_listener
def on_locust_init(environment, **kwargs):
    register_kafka_consumer(raw_topic, environment)

但是,根据日志,执行被困在中,而true循环。

[2022-05-11 17:20:53,309] INFO/locust.main: Starting web interface at http://localhost:8089
[2022-05-11 17:20:53,340] INFO/root: Consumer subscribed to topic 'some_topic': <cimpl.Consumer object at 0x10c28c510>

因为没有线程启动日志条目。

我在做什么错?有什么方法可以克服这个问题吗?

编辑1

我找到了此页面在蝗虫文档中,带有一个示例绿色的用法。这是我现在得到的。

def register_kafka_consumer(topic, environment):
    def poll():
        request_type = 'KAFKA_MESSAGE_CONSUMED'
        consumer = Consumer({
            'bootstrap.servers': bootstrap_servers,
            'group.id': group_id
        })
        consumer.subscribe([topic])
        logging.getLogger().info("Consumer subscribed to topic '%s': %s", topic, consumer)
        try:
            while environment.runner.state not in [STATE_STOPPING, STATE_STOPPED, STATE_CLEANUP]:
                time.sleep(1)
                msg = consumer.poll()
                if msg is not None and not msg.error():
                    # diff_millis calculation
                    environment.events.request.fire(
                        request_type=request_type,
                        name=topic,
                        response=msg.value(),
                        response_time=diff_millis,
                    )
        except Exception:
            logging.getLogger().error("Error during polling message for consumer: %s",
                                      str(consumer), exc_info=True)
        finally:
            logging.getLogger().info("Kafka consumer closed: %s", str(consumer))

    if not isinstance(environment.runner, WorkerRunner):
        gevent.spawn(poll)
        logging.getLogger().info("Greenlet started")


@events.init.add_listener
def on_locust_init(environment, **kwargs):
    register_kafka_consumer(raw_topic, environment)

根据日志,Kafka消费者现已注册。

[2022-05-12 12:39:03,310] INFO/locust.main: Starting web interface at http://localhost:8089
[2022-05-12 12:39:03,323] NFO/root: Greenlet started
[2022-05-12 12:39:03,323] INFO/locust.main: Starting Locust 2.8.2
[2022-05-12 12:39:03,379] INFO/root: Consumer subscribed to topic 'kafka_topic': <cimpl.Consumer object at 0x111ef4880>

尽管我现在无法打开Web UI。只有白屏和无限加载。也许这项Grenleet任务以某种方式占领了该过程。有什么想法如何解决这个问题?

I'm using Locust for load testing. I want to register a Kafka consumer in separate thread to measure the time of message processing. Here is what I got now:

def register_kafka_consumer(topic, environment):
    def poll():
        request_type = 'KAFKA_MESSAGE_CONSUMED'
        consumer = Consumer({
            'bootstrap.servers': bootstrap_servers,
            'group.id': group_id
        })
        consumer.subscribe([topic])
        logging.getLogger().info("Consumer subscribed to topic '%s': %s", topic, consumer)
        try:
            while True:
                msg = consumer.poll()
                if msg is not None and not msg.error():
                    # diff_millis calculation
                    environment.events.request.fire(
                        request_type=request_type,
                        name=topic,
                        response=msg.value(),
                        response_time=diff_millis,
                    )
        except Exception:
            logging.getLogger().error("Error during polling message for consumer: %s",
                                      str(consumer), exc_info=True)
        finally:
            logging.getLogger().info("Kafka consumer closed: %s", str(consumer))

    # here. I'm starting a separate thread
    t = threading.Thread(target=poll)
    t.start()
    logging.getLogger().info("Thread started")


@events.init.add_listener
def on_locust_init(environment, **kwargs):
    register_kafka_consumer(raw_topic, environment)

But according to logs the execution is stuck in while True loop.

[2022-05-11 17:20:53,309] INFO/locust.main: Starting web interface at http://localhost:8089
[2022-05-11 17:20:53,340] INFO/root: Consumer subscribed to topic 'some_topic': <cimpl.Consumer object at 0x10c28c510>

Because there is no Thread started log entry.

What am I doing wrong? Is there any approach to overcome this issue?

EDIT 1

I found this page in Locust documentation with an example of greenlets usage. Here is what I got now.

def register_kafka_consumer(topic, environment):
    def poll():
        request_type = 'KAFKA_MESSAGE_CONSUMED'
        consumer = Consumer({
            'bootstrap.servers': bootstrap_servers,
            'group.id': group_id
        })
        consumer.subscribe([topic])
        logging.getLogger().info("Consumer subscribed to topic '%s': %s", topic, consumer)
        try:
            while environment.runner.state not in [STATE_STOPPING, STATE_STOPPED, STATE_CLEANUP]:
                time.sleep(1)
                msg = consumer.poll()
                if msg is not None and not msg.error():
                    # diff_millis calculation
                    environment.events.request.fire(
                        request_type=request_type,
                        name=topic,
                        response=msg.value(),
                        response_time=diff_millis,
                    )
        except Exception:
            logging.getLogger().error("Error during polling message for consumer: %s",
                                      str(consumer), exc_info=True)
        finally:
            logging.getLogger().info("Kafka consumer closed: %s", str(consumer))

    if not isinstance(environment.runner, WorkerRunner):
        gevent.spawn(poll)
        logging.getLogger().info("Greenlet started")


@events.init.add_listener
def on_locust_init(environment, **kwargs):
    register_kafka_consumer(raw_topic, environment)

According to logs, the Kafka consumer is registered now.

[2022-05-12 12:39:03,310] INFO/locust.main: Starting web interface at http://localhost:8089
[2022-05-12 12:39:03,323] NFO/root: Greenlet started
[2022-05-12 12:39:03,323] INFO/locust.main: Starting Locust 2.8.2
[2022-05-12 12:39:03,379] INFO/root: Consumer subscribed to topic 'kafka_topic': <cimpl.Consumer object at 0x111ef4880>

Though I cannot open the Web UI now. There is only white screen and infinite loading. Perhaps this grenleet task somehow overoccupied the process. Any ideas how to solve this problem?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

半世蒼涼 2025-02-03 22:47:31

好吧,我明白了。问题是由此陈述引起的:

msg = consumer.poll()

没有超时。因此,Python口译员永远等待,并且永远不会给其他机会执行。但是,如果我直接设置超时,一切都像魅力一样工作。这是最终版本。

def register_kafka_consumer(topic, environment):
    def poll():
        request_type = 'KAFKA_MESSAGE_CONSUMED'
        consumer = Consumer({
            'bootstrap.servers': bootstrap_servers,
            'group.id': group_id
        })
        consumer.subscribe([topic])
        logging.getLogger().info("Consumer subscribed to topic '%s': %s", topic, consumer)
        try:
            while environment.runner.state not in [STATE_STOPPING, STATE_STOPPED, STATE_CLEANUP]:
                time.sleep(1)
                # timeout in seconds
                msg = consumer.poll(timeout=0)
                if msg is not None and not msg.error():
                    # diff_millis calculation
                    environment.events.request.fire(
                        request_type=request_type,
                        name=topic,
                        response=msg.value(),
                        response_time=diff_millis,
                    )
        except Exception:
            logging.getLogger().error("Error during polling message for consumer: %s",
                                      str(consumer), exc_info=True)
        finally:
            logging.getLogger().info("Kafka consumer closed: %s", str(consumer))

    if not isinstance(environment.runner, WorkerRunner):
        gevent.spawn(poll)
        logging.getLogger().info("Greenlet started")


@events.init.add_listener
def on_locust_init(environment, **kwargs):
    register_kafka_consumer(raw_topic, environment)

consumer.poll()到达超时时,它只是返回none,循环再次启动。无论如何,我仍然不明白为什么python无法在没有超时的情况下切换poll的上下文。但是,问题现在已经解决。

Ok, I got it. The problem was caused by this statement:

msg = consumer.poll()

There is no timeout. So, Python interpreter waits forever and never gives a chance another thread for execution. But if I set the timeout directly, everything works like a charm. Here is the final version.

def register_kafka_consumer(topic, environment):
    def poll():
        request_type = 'KAFKA_MESSAGE_CONSUMED'
        consumer = Consumer({
            'bootstrap.servers': bootstrap_servers,
            'group.id': group_id
        })
        consumer.subscribe([topic])
        logging.getLogger().info("Consumer subscribed to topic '%s': %s", topic, consumer)
        try:
            while environment.runner.state not in [STATE_STOPPING, STATE_STOPPED, STATE_CLEANUP]:
                time.sleep(1)
                # timeout in seconds
                msg = consumer.poll(timeout=0)
                if msg is not None and not msg.error():
                    # diff_millis calculation
                    environment.events.request.fire(
                        request_type=request_type,
                        name=topic,
                        response=msg.value(),
                        response_time=diff_millis,
                    )
        except Exception:
            logging.getLogger().error("Error during polling message for consumer: %s",
                                      str(consumer), exc_info=True)
        finally:
            logging.getLogger().info("Kafka consumer closed: %s", str(consumer))

    if not isinstance(environment.runner, WorkerRunner):
        gevent.spawn(poll)
        logging.getLogger().info("Greenlet started")


@events.init.add_listener
def on_locust_init(environment, **kwargs):
    register_kafka_consumer(raw_topic, environment)

When consumer.poll() reaches timeout, it just returns None and the loop starts again. Anyway, I still don't understand why Python cannot switch context on poll with no timeout. However, the problem is solved now.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文