同一组下不同分区上的 Kafka Consumer 仍然间歇性地消费相同的消息

发布于 2025-01-09 19:36:49 字数 2380 浏览 1 评论 0原文

我有 1 个消费者组和 5 个消费者。也有 5 个分区,因此每个消费者获得 1 个分区。

CLI 还显示

bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic Topic-1
Topic: Topic-1  TopicId: kJqfk1FoRSWtkkjfsgw9FSg    PartitionCount: 5   ReplicationFactor: 1    Configs: segment.bytes=1073741824
    Topic: Topic-1  Partition: 0    Leader: 0   Replicas: 0 Isr: 0
    Topic: Topic-1  Partition: 1    Leader: 0   Replicas: 0 Isr: 0
    Topic: Topic-1  Partition: 2    Leader: 0   Replicas: 0 Isr: 0
    Topic: Topic-1  Partition: 3    Leader: 0   Replicas: 0 Isr: 0
    Topic: Topic-1  Partition: 4    Leader: 0   Replicas: 0 Isr: 0

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic Topic-1 --from-beginning --partition {n} 正确显示每个分区的不同消息。< /strong>

但是,我经常看到 2 个或更多消费者处理同一条消息,并且对于 kafka 来说是新手,我真的无法弄清楚问题所在。

我正在使用 pykafka 来消费消息:

class CB_Kafka_Consumer:
    def __init__(self):
        self._connect_kafka_consumer()
        module_logger.info(f"Kafka Consumer at {kafka_broker_url}:{kafka_broker_port}")
        ''' Get DB session object '''
        self.session = get_db_session(SQL_USERNAME, SQL_PASSWORD, SQL_SERVER_IP, SQL_DATABASE)
        module_logger.info(f"Connected to MySQL at {SQL_SERVER_IP}")

    def _connect_kafka_consumer(self):
        self._consumer = None
        try:
            self._client = KafkaClient(f"{kafka_broker_url}:{kafka_broker_port}")
            topic = self._client.topics[kafka_topic]
            self._consumer = topic.get_simple_consumer(consumer_group=CONSUMER_GROUP_NAME)

            module_logger.info("Created a Kafka Consumer")
        except Exception as ex:
            module_logger.error('Exception while connecting Kafka')
            traceback.print_exc()

    def start_consuming(self):
        module_logger.info("*"*10 + " Staring to Consume Messages " + "*"*10)
        while True:
            for msg in self._consumer:
                self._consumer.commit_offsets()
                message = json.loads(msg.value.decode('utf-8'))
                module_logger.debug(f"\n----- RECEIVED MESSAGE ----------")
                module_logger.debug(pprint.pformat(message))
                self.process_message(message) #Logic for processing messages (takes anywhere between 10min to 4 hours for the task to complete)
        self._consumer.close()

I have 1 consumer group and 5 consumers. There are 5 partitions too hence each consumer gets 1 partition.

CLI also shows that

bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic Topic-1
Topic: Topic-1  TopicId: kJqfk1FoRSWtkkjfsgw9FSg    PartitionCount: 5   ReplicationFactor: 1    Configs: segment.bytes=1073741824
    Topic: Topic-1  Partition: 0    Leader: 0   Replicas: 0 Isr: 0
    Topic: Topic-1  Partition: 1    Leader: 0   Replicas: 0 Isr: 0
    Topic: Topic-1  Partition: 2    Leader: 0   Replicas: 0 Isr: 0
    Topic: Topic-1  Partition: 3    Leader: 0   Replicas: 0 Isr: 0
    Topic: Topic-1  Partition: 4    Leader: 0   Replicas: 0 Isr: 0

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic Topic-1 --from-beginning --partition {n} correctly shows different messages for each partition.

However, I see a lot of times that 2 or more consumers work on the same message and being new to kafka, im not really able to figure out the problem.

I am using pykafka to consume messages:

class CB_Kafka_Consumer:
    def __init__(self):
        self._connect_kafka_consumer()
        module_logger.info(f"Kafka Consumer at {kafka_broker_url}:{kafka_broker_port}")
        ''' Get DB session object '''
        self.session = get_db_session(SQL_USERNAME, SQL_PASSWORD, SQL_SERVER_IP, SQL_DATABASE)
        module_logger.info(f"Connected to MySQL at {SQL_SERVER_IP}")

    def _connect_kafka_consumer(self):
        self._consumer = None
        try:
            self._client = KafkaClient(f"{kafka_broker_url}:{kafka_broker_port}")
            topic = self._client.topics[kafka_topic]
            self._consumer = topic.get_simple_consumer(consumer_group=CONSUMER_GROUP_NAME)

            module_logger.info("Created a Kafka Consumer")
        except Exception as ex:
            module_logger.error('Exception while connecting Kafka')
            traceback.print_exc()

    def start_consuming(self):
        module_logger.info("*"*10 + " Staring to Consume Messages " + "*"*10)
        while True:
            for msg in self._consumer:
                self._consumer.commit_offsets()
                message = json.loads(msg.value.decode('utf-8'))
                module_logger.debug(f"\n----- RECEIVED MESSAGE ----------")
                module_logger.debug(pprint.pformat(message))
                self.process_message(message) #Logic for processing messages (takes anywhere between 10min to 4 hours for the task to complete)
        self._consumer.close()

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

很糊涂小朋友 2025-01-16 19:36:49

打印消息的分区和偏移量。您应该看到它们实际上是您正在处理的独特事件。

如果这些是相同的,“10 分钟到 4 小时”的过程很可能会导致消费者组重新平衡(默认情况下,Kafka 要求您每隔几毫秒调用一次记录轮询),并且您正在经历至少一次处理语义,因此需要自己处理重复项。

我发现您在代码中使用了一些数据库客户端,因此建议使用 Kafka Connect 框架,而不是编写自己的 Consumer

Print the partition and offset of the messages. You should see they are, in fact, unique events you're processing.

If those are the same, the "10min to 4hr" process is very likely causing a consumer group rebalance (Kafka requires you to invoke a record poll every few milliseconds, by default), and you're experiencing at-least-once processing semantics, and therefore need to handle duplicates on your own.

I see you're using some database client in your code, and so the recommendation would be to use Kafka Connect framework, rather than writing your own Consumer

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文