pykafka
我们将检查的第一个客户端是 pykafka。它是我们最常用,并且成功使用的客户端。It tries less hard to replicate the existing java client API. It also has a partition balancing code for kafka broker version 0.8.2
https://github.com/Parsely/pykafka
from pykafka import KafkaClient
def pykafka_producer_performance(use_rdkafka=False):
# Setup client
client = KafkaClient(hosts=bootstrap_servers)
topic = client.topics[b'pykafka-test-topic']
producer = topic.get_producer(use_rdkafka=use_rdkafka)
msgs_produced = 0
produce_start = time.time()
for i in range(msg_count):
# Start producing
producer.produce(msg_payload)
producer.stop() # Will flush background queue
return time.time() - produce_start
producer_timings['pykafka_producer'] = pykafka_producer_performance()
calculate_thoughput(producer_timings['pykafka_producer'])
Processed 1000000 messsages in 57.32 seconds
1.66 MB/s
17446.37 Msgs/s
If you are monitoring this function, you will notice that the produce loop completes and then the function stalls at the produce.stop()
. This is because the producer is asynchronous and batches produce calls to Kafka.
Kafka's speed comes from the ability to batch many message together. To take advantage of this, the client will keep a buffer of messages in the background and batch them. So, when you call producer.produce
you are performing no external I/O. That message is queued in an in-memory buffer and the method returns immediately. So we are able to load the in-memory buffer faster then pykafka can send them to kafka. producer.stop()
will block until all messages are sent.
So when producing messages make sure you allow the producer to flush the remaining messages before you exit.
Another way to ensure that the messages where produced is to check the topic offsets.
client = KafkaClient(hosts=bootstrap_servers)
topic = client.topics[b'pykafka-test-topic']
print(topic.earliest_available_offsets())
print(topic.latest_available_offsets())
{0: OffsetPartitionResponse(offset=[0], err=0)}
{0: OffsetPartitionResponse(offset=[1000000], err=0)}
Pykafka has an optional producer backend that wraps the librdkafka package. librdkafka is a pure C kafka client and holds very impressive benchmarks. Let rerun our pykafka producer test with rdkafka enabled.
producer_timings['pykafka_producer_rdkafka'] = pykafka_producer_performance(use_rdkafka=True)
calculate_thoughput(producer_timings['pykafka_producer_rdkafka'])
Processed 1000000 messsages in 15.72 seconds
6.06 MB/s
63595.38 Msgs/s
def pykafka_consumer_performance(use_rdkafka=False):
# Setup client
client = KafkaClient(hosts=bootstrap_servers)
topic = client.topics[b'pykafka-test-topic']
msg_consumed_count = 0
consumer_start = time.time()
# Consumer starts polling messages in background thread, need to start timer here
consumer = topic.get_simple_consumer(use_rdkafka=use_rdkafka)
while True:
msg = consumer.consume()
if msg:
msg_consumed_count += 1
if msg_consumed_count >= msg_count:
break
consumer_timing = time.time() - consumer_start
consumer.stop()
return consumer_timing
_ = pykafka_consumer_performance(use_rdkafka=False)
consumer_timings['pykafka_consumer'] = pykafka_consumer_performance(use_rdkafka=False)
calculate_thoughput(consumer_timings['pykafka_consumer'])
Processed 1000000 messsages in 29.43 seconds
3.24 MB/s
33976.94 Msgs/s
# run it once thorough to warm the cache
_ = pykafka_consumer_performance(use_rdkafka=True)
consumer_timings['pykafka_consumer_rdkafka'] = pykafka_consumer_performance(use_rdkafka=True)
calculate_thoughput(consumer_timings['pykafka_consumer_rdkafka'])
Processed 1000000 messsages in 6.09 seconds
15.67 MB/s
164311.50 Msgs/s
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论