同一组下不同分区上的 Kafka Consumer 仍然间歇性地消费相同的消息
我有 1 个消费者组和 5 个消费者。也有 5 个分区,因此每个消费者获得 1 个分区。
CLI 还显示
bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic Topic-1
Topic: Topic-1 TopicId: kJqfk1FoRSWtkkjfsgw9FSg PartitionCount: 5 ReplicationFactor: 1 Configs: segment.bytes=1073741824
Topic: Topic-1 Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Topic: Topic-1 Partition: 1 Leader: 0 Replicas: 0 Isr: 0
Topic: Topic-1 Partition: 2 Leader: 0 Replicas: 0 Isr: 0
Topic: Topic-1 Partition: 3 Leader: 0 Replicas: 0 Isr: 0
Topic: Topic-1 Partition: 4 Leader: 0 Replicas: 0 Isr: 0
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic Topic-1 --from-beginning --partition {n} 正确显示每个分区的不同消息。< /strong>
但是,我经常看到 2 个或更多消费者处理同一条消息,并且对于 kafka 来说是新手,我真的无法弄清楚问题所在。
我正在使用 pykafka 来消费消息:
class CB_Kafka_Consumer:
def __init__(self):
self._connect_kafka_consumer()
module_logger.info(f"Kafka Consumer at {kafka_broker_url}:{kafka_broker_port}")
''' Get DB session object '''
self.session = get_db_session(SQL_USERNAME, SQL_PASSWORD, SQL_SERVER_IP, SQL_DATABASE)
module_logger.info(f"Connected to MySQL at {SQL_SERVER_IP}")
def _connect_kafka_consumer(self):
self._consumer = None
try:
self._client = KafkaClient(f"{kafka_broker_url}:{kafka_broker_port}")
topic = self._client.topics[kafka_topic]
self._consumer = topic.get_simple_consumer(consumer_group=CONSUMER_GROUP_NAME)
module_logger.info("Created a Kafka Consumer")
except Exception as ex:
module_logger.error('Exception while connecting Kafka')
traceback.print_exc()
def start_consuming(self):
module_logger.info("*"*10 + " Staring to Consume Messages " + "*"*10)
while True:
for msg in self._consumer:
self._consumer.commit_offsets()
message = json.loads(msg.value.decode('utf-8'))
module_logger.debug(f"\n----- RECEIVED MESSAGE ----------")
module_logger.debug(pprint.pformat(message))
self.process_message(message) #Logic for processing messages (takes anywhere between 10min to 4 hours for the task to complete)
self._consumer.close()
I have 1 consumer group and 5 consumers. There are 5 partitions too hence each consumer gets 1 partition.
CLI also shows that
bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic Topic-1
Topic: Topic-1 TopicId: kJqfk1FoRSWtkkjfsgw9FSg PartitionCount: 5 ReplicationFactor: 1 Configs: segment.bytes=1073741824
Topic: Topic-1 Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Topic: Topic-1 Partition: 1 Leader: 0 Replicas: 0 Isr: 0
Topic: Topic-1 Partition: 2 Leader: 0 Replicas: 0 Isr: 0
Topic: Topic-1 Partition: 3 Leader: 0 Replicas: 0 Isr: 0
Topic: Topic-1 Partition: 4 Leader: 0 Replicas: 0 Isr: 0
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic Topic-1 --from-beginning --partition {n} correctly shows different messages for each partition.
However, I see a lot of times that 2 or more consumers work on the same message and being new to kafka, im not really able to figure out the problem.
I am using pykafka to consume messages:
class CB_Kafka_Consumer:
def __init__(self):
self._connect_kafka_consumer()
module_logger.info(f"Kafka Consumer at {kafka_broker_url}:{kafka_broker_port}")
''' Get DB session object '''
self.session = get_db_session(SQL_USERNAME, SQL_PASSWORD, SQL_SERVER_IP, SQL_DATABASE)
module_logger.info(f"Connected to MySQL at {SQL_SERVER_IP}")
def _connect_kafka_consumer(self):
self._consumer = None
try:
self._client = KafkaClient(f"{kafka_broker_url}:{kafka_broker_port}")
topic = self._client.topics[kafka_topic]
self._consumer = topic.get_simple_consumer(consumer_group=CONSUMER_GROUP_NAME)
module_logger.info("Created a Kafka Consumer")
except Exception as ex:
module_logger.error('Exception while connecting Kafka')
traceback.print_exc()
def start_consuming(self):
module_logger.info("*"*10 + " Staring to Consume Messages " + "*"*10)
while True:
for msg in self._consumer:
self._consumer.commit_offsets()
message = json.loads(msg.value.decode('utf-8'))
module_logger.debug(f"\n----- RECEIVED MESSAGE ----------")
module_logger.debug(pprint.pformat(message))
self.process_message(message) #Logic for processing messages (takes anywhere between 10min to 4 hours for the task to complete)
self._consumer.close()
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
打印消息的分区和偏移量。您应该看到它们实际上是您正在处理的独特事件。
如果这些是相同的,“10 分钟到 4 小时”的过程很可能会导致消费者组重新平衡(默认情况下,Kafka 要求您每隔几毫秒调用一次记录轮询),并且您正在经历至少一次处理语义,因此需要自己处理重复项。
我发现您在代码中使用了一些数据库客户端,因此建议使用 Kafka Connect 框架,而不是编写自己的 Consumer
Print the partition and offset of the messages. You should see they are, in fact, unique events you're processing.
If those are the same, the "10min to 4hr" process is very likely causing a consumer group rebalance (Kafka requires you to invoke a record poll every few milliseconds, by default), and you're experiencing at-least-once processing semantics, and therefore need to handle duplicates on your own.
I see you're using some database client in your code, and so the recommendation would be to use Kafka Connect framework, rather than writing your own Consumer