Locust Python:Kafka消费者在单独的线程中
我正在使用 locust 用于负载测试。我想在单独的线程中注册KAFKA消费者,以测量消息处理的时间。这是我现在得到的:
def register_kafka_consumer(topic, environment):
def poll():
request_type = 'KAFKA_MESSAGE_CONSUMED'
consumer = Consumer({
'bootstrap.servers': bootstrap_servers,
'group.id': group_id
})
consumer.subscribe([topic])
logging.getLogger().info("Consumer subscribed to topic '%s': %s", topic, consumer)
try:
while True:
msg = consumer.poll()
if msg is not None and not msg.error():
# diff_millis calculation
environment.events.request.fire(
request_type=request_type,
name=topic,
response=msg.value(),
response_time=diff_millis,
)
except Exception:
logging.getLogger().error("Error during polling message for consumer: %s",
str(consumer), exc_info=True)
finally:
logging.getLogger().info("Kafka consumer closed: %s", str(consumer))
# here. I'm starting a separate thread
t = threading.Thread(target=poll)
t.start()
logging.getLogger().info("Thread started")
@events.init.add_listener
def on_locust_init(environment, **kwargs):
register_kafka_consumer(raw_topic, environment)
但是,根据日志,执行被困在中,而true
循环。
[2022-05-11 17:20:53,309] INFO/locust.main: Starting web interface at http://localhost:8089
[2022-05-11 17:20:53,340] INFO/root: Consumer subscribed to topic 'some_topic': <cimpl.Consumer object at 0x10c28c510>
因为没有线程启动
日志条目。
我在做什么错?有什么方法可以克服这个问题吗?
编辑1
我找到了此页面在蝗虫文档中,带有一个示例绿色的用法。这是我现在得到的。
def register_kafka_consumer(topic, environment):
def poll():
request_type = 'KAFKA_MESSAGE_CONSUMED'
consumer = Consumer({
'bootstrap.servers': bootstrap_servers,
'group.id': group_id
})
consumer.subscribe([topic])
logging.getLogger().info("Consumer subscribed to topic '%s': %s", topic, consumer)
try:
while environment.runner.state not in [STATE_STOPPING, STATE_STOPPED, STATE_CLEANUP]:
time.sleep(1)
msg = consumer.poll()
if msg is not None and not msg.error():
# diff_millis calculation
environment.events.request.fire(
request_type=request_type,
name=topic,
response=msg.value(),
response_time=diff_millis,
)
except Exception:
logging.getLogger().error("Error during polling message for consumer: %s",
str(consumer), exc_info=True)
finally:
logging.getLogger().info("Kafka consumer closed: %s", str(consumer))
if not isinstance(environment.runner, WorkerRunner):
gevent.spawn(poll)
logging.getLogger().info("Greenlet started")
@events.init.add_listener
def on_locust_init(environment, **kwargs):
register_kafka_consumer(raw_topic, environment)
根据日志,Kafka消费者现已注册。
[2022-05-12 12:39:03,310] INFO/locust.main: Starting web interface at http://localhost:8089
[2022-05-12 12:39:03,323] NFO/root: Greenlet started
[2022-05-12 12:39:03,323] INFO/locust.main: Starting Locust 2.8.2
[2022-05-12 12:39:03,379] INFO/root: Consumer subscribed to topic 'kafka_topic': <cimpl.Consumer object at 0x111ef4880>
尽管我现在无法打开Web UI。只有白屏和无限加载。也许这项Grenleet任务以某种方式占领了该过程。有什么想法如何解决这个问题?
I'm using Locust for load testing. I want to register a Kafka consumer in separate thread to measure the time of message processing. Here is what I got now:
def register_kafka_consumer(topic, environment):
def poll():
request_type = 'KAFKA_MESSAGE_CONSUMED'
consumer = Consumer({
'bootstrap.servers': bootstrap_servers,
'group.id': group_id
})
consumer.subscribe([topic])
logging.getLogger().info("Consumer subscribed to topic '%s': %s", topic, consumer)
try:
while True:
msg = consumer.poll()
if msg is not None and not msg.error():
# diff_millis calculation
environment.events.request.fire(
request_type=request_type,
name=topic,
response=msg.value(),
response_time=diff_millis,
)
except Exception:
logging.getLogger().error("Error during polling message for consumer: %s",
str(consumer), exc_info=True)
finally:
logging.getLogger().info("Kafka consumer closed: %s", str(consumer))
# here. I'm starting a separate thread
t = threading.Thread(target=poll)
t.start()
logging.getLogger().info("Thread started")
@events.init.add_listener
def on_locust_init(environment, **kwargs):
register_kafka_consumer(raw_topic, environment)
But according to logs the execution is stuck in while True
loop.
[2022-05-11 17:20:53,309] INFO/locust.main: Starting web interface at http://localhost:8089
[2022-05-11 17:20:53,340] INFO/root: Consumer subscribed to topic 'some_topic': <cimpl.Consumer object at 0x10c28c510>
Because there is no Thread started
log entry.
What am I doing wrong? Is there any approach to overcome this issue?
EDIT 1
I found this page in Locust documentation with an example of greenlets usage. Here is what I got now.
def register_kafka_consumer(topic, environment):
def poll():
request_type = 'KAFKA_MESSAGE_CONSUMED'
consumer = Consumer({
'bootstrap.servers': bootstrap_servers,
'group.id': group_id
})
consumer.subscribe([topic])
logging.getLogger().info("Consumer subscribed to topic '%s': %s", topic, consumer)
try:
while environment.runner.state not in [STATE_STOPPING, STATE_STOPPED, STATE_CLEANUP]:
time.sleep(1)
msg = consumer.poll()
if msg is not None and not msg.error():
# diff_millis calculation
environment.events.request.fire(
request_type=request_type,
name=topic,
response=msg.value(),
response_time=diff_millis,
)
except Exception:
logging.getLogger().error("Error during polling message for consumer: %s",
str(consumer), exc_info=True)
finally:
logging.getLogger().info("Kafka consumer closed: %s", str(consumer))
if not isinstance(environment.runner, WorkerRunner):
gevent.spawn(poll)
logging.getLogger().info("Greenlet started")
@events.init.add_listener
def on_locust_init(environment, **kwargs):
register_kafka_consumer(raw_topic, environment)
According to logs, the Kafka consumer is registered now.
[2022-05-12 12:39:03,310] INFO/locust.main: Starting web interface at http://localhost:8089
[2022-05-12 12:39:03,323] NFO/root: Greenlet started
[2022-05-12 12:39:03,323] INFO/locust.main: Starting Locust 2.8.2
[2022-05-12 12:39:03,379] INFO/root: Consumer subscribed to topic 'kafka_topic': <cimpl.Consumer object at 0x111ef4880>
Though I cannot open the Web UI now. There is only white screen and infinite loading. Perhaps this grenleet task somehow overoccupied the process. Any ideas how to solve this problem?
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
好吧,我明白了。问题是由此陈述引起的:
没有超时。因此,Python口译员永远等待,并且永远不会给其他机会执行。但是,如果我直接设置超时,一切都像魅力一样工作。这是最终版本。
当
consumer.poll()
到达超时时,它只是返回none
,循环再次启动。无论如何,我仍然不明白为什么python无法在没有超时的情况下切换poll
的上下文。但是,问题现在已经解决。Ok, I got it. The problem was caused by this statement:
There is no timeout. So, Python interpreter waits forever and never gives a chance another thread for execution. But if I set the timeout directly, everything works like a charm. Here is the final version.
When
consumer.poll()
reaches timeout, it just returnsNone
and the loop starts again. Anyway, I still don't understand why Python cannot switch context onpoll
with no timeout. However, the problem is solved now.