文章来源于网络收集而来,版权归原创者所有,如有侵权请及时联系!
confluent-kafka-python
With the latest release of the Confluent platform, there is a new python client on the scene. confluent-kafka-python is a python wrapper around librdkafka and is largely built by the same author. The underlying library is basis for most non-JVM clients out there. We have already mentioned it earlier when looking at pykafka.
https://github.com/confluentinc/confluent-kafka-python
import confluent_kafka
topic = 'confluent-kafka-topic'
def confluent_kafka_producer_performance():
topic = 'confluent-kafka-topic'
conf = {'bootstrap.servers': bootstrap_servers}
producer = confluent_kafka.Producer(**conf)
messages_to_retry = 0
producer_start = time.time()
for i in range(msg_count):
try:
producer.produce(topic, value=msg_payload)
except BufferError as e:
messages_to_retry += 1
# hacky retry messages that over filled the local buffer
for i in range(messages_to_retry):
producer.poll(0)
try:
producer.produce(topic, value=msg_payload)
except BufferError as e:
producer.poll(0)
producer.produce(topic, value=msg_payload)
producer.flush()
return time.time() - producer_start
producer_timings['confluent_kafka_producer'] = confluent_kafka_producer_performance()
calculate_thoughput(producer_timings['confluent_kafka_producer'])
Processed 1000000 messsages in 5.45 seconds
17.50 MB/s
183456.28 Msgs/s
client = KafkaClient(hosts=bootstrap_servers)
topic = client.topics[b'confluent-kafka-topic']
print(topic.earliest_available_offsets())
print(topic.latest_available_offsets())
{0: OffsetPartitionResponse(offset=[0], err=0)}
{0: OffsetPartitionResponse(offset=[1000000], err=0)}
import confluent_kafka
import uuid
def confluent_kafka_consumer_performance():
topic = 'confluent-kafka-topic'
msg_consumed_count = 0
conf = {'bootstrap.servers': bootstrap_servers,
'group.id': uuid.uuid1(),
'session.timeout.ms': 6000,
'default.topic.config': {
'auto.offset.reset': 'earliest'
}
}
consumer = confluent_kafka.Consumer(**conf)
consumer_start = time.time()
# This is the same as pykafka, subscribing to a topic will start a background thread
consumer.subscribe([topic])
while True:
msg = consumer.poll(1)
if msg:
msg_consumed_count += 1
if msg_consumed_count >= msg_count:
break
consumer_timing = time.time() - consumer_start
consumer.close()
return consumer_timing
_ = confluent_kafka_consumer_performance() # Warm cache
consumer_timings['confluent_kafka_consumer'] = confluent_kafka_consumer_performance()
calculate_thoughput(consumer_timings['confluent_kafka_consumer'])
Processed 1000000 messsages in 3.83 seconds
24.93 MB/s
261407.91 Msgs/s
The confluent_kafka client is crushingly fast. It can consume over 250K messages a second from a single broker. Note that the raw C client has been benchmarked at over 3 million messages/sec, so you see how much overhead python adds. But on the side of developer speed, you don't have to code in C!
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论