返回介绍

pykafka

发布于 2024-10-10 00:31:04 字数 4677 浏览 0 评论 0 收藏 0

我们将检查的第一个客户端是 pykafka。它是我们最常用,并且成功使用的客户端。It tries less hard to replicate the existing java client API. It also has a partition balancing code for kafka broker version 0.8.2

https://github.com/Parsely/pykafka

from pykafka import KafkaClient

def pykafka_producer_performance(use_rdkafka=False):
    
    # Setup client
    client = KafkaClient(hosts=bootstrap_servers)
    topic = client.topics[b'pykafka-test-topic']
    producer = topic.get_producer(use_rdkafka=use_rdkafka)

    msgs_produced = 0
    produce_start = time.time()
    for i in range(msg_count):
        # Start producing
        producer.produce(msg_payload)
                        
    producer.stop() # Will flush background queue
    
    return time.time() - produce_start
producer_timings['pykafka_producer'] = pykafka_producer_performance()
calculate_thoughput(producer_timings['pykafka_producer'])
Processed 1000000 messsages in 57.32 seconds
1.66 MB/s
17446.37 Msgs/s

If you are monitoring this function, you will notice that the produce loop completes and then the function stalls at the produce.stop() . This is because the producer is asynchronous and batches produce calls to Kafka.

Kafka's speed comes from the ability to batch many message together. To take advantage of this, the client will keep a buffer of messages in the background and batch them. So, when you call producer.produce you are performing no external I/O. That message is queued in an in-memory buffer and the method returns immediately. So we are able to load the in-memory buffer faster then pykafka can send them to kafka. producer.stop() will block until all messages are sent.

So when producing messages make sure you allow the producer to flush the remaining messages before you exit.

Another way to ensure that the messages where produced is to check the topic offsets.

client = KafkaClient(hosts=bootstrap_servers)
topic = client.topics[b'pykafka-test-topic']
print(topic.earliest_available_offsets())
print(topic.latest_available_offsets())
{0: OffsetPartitionResponse(offset=[0], err=0)}
{0: OffsetPartitionResponse(offset=[1000000], err=0)}

Pykafka has an optional producer backend that wraps the librdkafka package. librdkafka is a pure C kafka client and holds very impressive benchmarks. Let rerun our pykafka producer test with rdkafka enabled.

producer_timings['pykafka_producer_rdkafka'] = pykafka_producer_performance(use_rdkafka=True)
calculate_thoughput(producer_timings['pykafka_producer_rdkafka'])
Processed 1000000 messsages in 15.72 seconds
6.06 MB/s
63595.38 Msgs/s
def pykafka_consumer_performance(use_rdkafka=False):
    # Setup client
    client = KafkaClient(hosts=bootstrap_servers)
    topic = client.topics[b'pykafka-test-topic']

    msg_consumed_count = 0
    
    consumer_start = time.time()
    # Consumer starts polling messages in background thread, need to start timer here
    consumer = topic.get_simple_consumer(use_rdkafka=use_rdkafka)

    while True:
        msg = consumer.consume()
        if msg:
            msg_consumed_count += 1

        if msg_consumed_count >= msg_count:
            break
                        
    consumer_timing = time.time() - consumer_start
    consumer.stop()    
    return consumer_timing
_ = pykafka_consumer_performance(use_rdkafka=False)
consumer_timings['pykafka_consumer'] = pykafka_consumer_performance(use_rdkafka=False)
calculate_thoughput(consumer_timings['pykafka_consumer'])
Processed 1000000 messsages in 29.43 seconds
3.24 MB/s
33976.94 Msgs/s
# run it once thorough to warm the cache
_ = pykafka_consumer_performance(use_rdkafka=True)
consumer_timings['pykafka_consumer_rdkafka'] = pykafka_consumer_performance(use_rdkafka=True)
calculate_thoughput(consumer_timings['pykafka_consumer_rdkafka'])
Processed 1000000 messsages in 6.09 seconds
15.67 MB/s
164311.50 Msgs/s

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
    原文