我有一个在春季应用程序上运行的Kafka消费者。
我正在尝试使用 fetch.max.wait.ms 和 fetch.min.bytes 。
我希望消费者等到有15000000字节的消息或1分钟通过。
consumerProps.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 60000);
consumerProps.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 15000000);
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerProps));
我知道这种配置确实会产生效果,因为一旦设置了它,我就开始获得 org.apache.kafka.common.errors.disconnectexception
我增加了请求。
consumerProps.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 120000);
解决该问题, ,但是行为并不如预期:
消费者经常收集消息(低量,近距离。min.bytes)。
但是,有时会在一分钟内进行多次提取。
当我使用Spring Embeddedkafka对其进行测试时,它可以在本地开发人员上工作,但在生产中不起作用。 (MSK)
什么可以解释?是否有可能在MSK上运作良好?
这里还有其他属性在这里发挥作用吗?
可以说,假设我总是在fetch.min.bytes下,我看不到每分钟超过1次?
是否有一种情况,尽管记录进行了投票,但写了新的记录,那是什么预期行为?它会影响当前的民意调查吗?
(为此消费者定义的其他属性:session.timeout.ms,max.poll.records,max.partition.fetch.bytes)
================================
我发现了一些东西:
当消费者与单个分区的主题相反时,该配置按预期工作。
当对着具有多个分区的主题工作时,提取时间就会出乎意料。
I have a Kafka consumer running on a Spring application.
I am trying to config the consumer with fetch.max.wait.ms and fetch.min.bytes.
I would like the consumer to wait until there are 15000000 bytes of messages or 1 minute has passed.
consumerProps.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 60000);
consumerProps.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 15000000);
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerProps));
I know this configuration does have an effect, because once it was set i started to get org.apache.kafka.common.errors.DisconnectException
To resolve it i increased request.timeout.ms
consumerProps.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 120000);
This resolved the errors, but the behavior is not as expected:
The consumer is picking up messages (at low amount, no way near the fetch.min.bytes) very often.
However, within a minute it will sometimes do multiple fetches.
It works O.k on my local dev when i test it with Spring EmbeddedKafka, but doesn't work on production. (MSk)
What can explain it? Is it possible it doesn't work well on MSK?
Are there other properties that play a role here or can be in the way?
Is it correct to say that, assuming i am always under fetch.min.bytes, that i won't see more than 1 fetch per minute?
Is there a case where while records are polled, new ones are written, what is the expected behavior then? does it affect current poll or next one?
(other properties defined for this consumer: session.timeout.ms, max.poll.records, max.partition.fetch.bytes)
====== EDIT =====
After some investigation i discovered something:
The configuration works as expected when the consumer is working against a topic with a single partition.
When working against a topic with multiple partitions the fetch time becomes unexpected.
发布评论
评论(1)
我本人没有使用过春季消费者,但是在进行了一些研究之后,似乎不可能实现您要做的事情。按照此 thread> thread 无法在侦听器实现中配置轮询持续时间。
但是,您可以编写自己的民意调查逻辑,并使用民意调查持续时间和最大民意调查来实现所需的行为。 配置
I have not used the spring consumer myself but after doing some research it seems it is not possible to achieve what you are trying to do. As per this thread, it is not possible to configure poll duration in the listener implementation.
However, you can write your own poll logic and achieve the desired behaviour using poll duration and max poll records. You can use this code as reference and configure: