弹簧云数据流量缩放与Apache Kafka
我在缩放上遵循了许多春季教程(在这里),但仍未找到使所有消费者忙碌的秘密敲门声。目的是让5个(示例)下沉过程不断忙碌。文档意味着对于简单的源流|处理器|接收器
,我将使用以下流属性:
deployer.sink.count=5
app.processor.producer.partitionKeyExpression=headers['X-PartitionKey']?:''
app.sink.spring.cloud.stream.kafka.binder.autoAddPartitions=true
app.sink.spring.cloud.stream.kafka.binder.minPartitionCount=5
自然,处理器添加了一个名为X-PartitionKey的标头字段,最终完全具有唯一的唯一,以至于应该足够平衡。
我发现只有2或3个接收到消息,其余的坐着。感觉就像前几个观看多个分区一样,其他人只是坐在待机中说没有发现分区的承诺偏移
。我可以通过使用kowl
看到消息获得了唯一的分区,但是负载永远不会平衡以反射。
我在某个地方缺少配置吗?这是Kafka活页夹问题吗?
更新#1
我注意到每个实例都没有获得唯一的客户端。不确定这是否相关。
Instance 1 - [Consumer clientId=consumer-mystream-2, groupId=mystream]
Instance 2 - [Consumer clientId=consumer-mystream-3, groupId=mystream]
Instance 3 - [Consumer clientId=consumer-mystream-3, groupId=mystream]
Instance 4 - [Consumer clientId=consumer-mystream-3, groupId=mystream]
Instance 5 - [Consumer clientId=consumer-mystream-2, groupId=mystream]
更新#2
使用以下3个属性,所有5个实例都很忙。此方法似乎绕过SCST并使用MessageKey而不是PartitionKey。还不是完美的,但是更好。
deployer.sink.count=5
app.processor.spring.cloud.stream.kafka.binder.messageKeyExpression=headers['X-PartitionKey']?:''
app.sink.spring.cloud.stream.kafka.binder.autoAddPartitions=true
更新#3
添加spring.cloud.stream.bindings.output.producer.partition-count-count
等于Kafka分区的实际数量似乎已经解决了问题。如果您的Deployer。*。计数
属性少于分区的实际数量,则将不会分配多余的分区,但是无论如何,消费者将被分配给他们,并且可能会安置。
I've followed many Spring tutorials on scaling (here, and here), but still haven't found the secret knock that keeps all consumers busy. The intent is to have 5 (example) sink processes constantly busy. The documentation implies that for a simple stream of source | processor | sink
, I would use the following stream properties:
deployer.sink.count=5
app.processor.producer.partitionKeyExpression=headers['X-PartitionKey']?:''
app.sink.spring.cloud.stream.kafka.binder.autoAddPartitions=true
app.sink.spring.cloud.stream.kafka.binder.minPartitionCount=5
Naturally, the processor adds a header field called X-PartitionKey that ends up being unique enough that it should balance adequately.
What I find is that only 2 or 3 ever receive messages, and the remainder sit idle. It feels like the first few watch multiple partitions, and the others just sit in stand-by saying Found no committed offset for partition
. I can see by using kowl
that the messages are getting unique partitions, but the load never gets balanced to reflect as such.
Am I missing configuration somewhere? Is it a kafka binder issue?
Update #1
I've noticed that each instance isn't getting a unique clientId. Not sure if this is relevant.
Instance 1 - [Consumer clientId=consumer-mystream-2, groupId=mystream]
Instance 2 - [Consumer clientId=consumer-mystream-3, groupId=mystream]
Instance 3 - [Consumer clientId=consumer-mystream-3, groupId=mystream]
Instance 4 - [Consumer clientId=consumer-mystream-3, groupId=mystream]
Instance 5 - [Consumer clientId=consumer-mystream-2, groupId=mystream]
Update #2
Using the below 3 properties, all 5 instances are busy. This method seems to bypass SCSt and uses a MessageKey instead of PartitionKey. Not perfect yet, but better.
deployer.sink.count=5
app.processor.spring.cloud.stream.kafka.binder.messageKeyExpression=headers['X-PartitionKey']?:''
app.sink.spring.cloud.stream.kafka.binder.autoAddPartitions=true
Update #3
Adding spring.cloud.stream.bindings.output.producer.partition-count
equal to the actual number of Kafka partitions seems to have resolved the issue. If your deployer.*.count
property is less than the actual number of partitions, the excess partitions will not be assigned messages, but the consumers will be assigned to them anyways, and may sit idle.
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
您可以清楚地看到在这5个实例中均匀分配的10个分区,例如
(在所有5个实例中)的消息
仅表示该群体ID从未为该分区承担偏移。
这些条件对于这种情况是正常的:
主要消费者是消费者 - 潮汐-2;重复的日志进一步向下是
consumer clientId = consumer -mystream -3
- 我认为这是用于健康检查的消费者 - 它具有相同的组,但从未订阅过这个主题,也许是因为这个聊天是因为在同一组中。一旦主要消费者承担纪录的偏移,chat不休就消失了。
无论如何,如果您将记录发送给10个分区中的每个分区,每个实例肯定会得到2个。
编辑
它可以根据我的预期使用本机或粘合剂分区。
partitionCount
会自动调整以匹配分区的实际数量,并且在计算分区时用作模量。You can clearly see the 10 partitions evenly assigned across the 5 instances, e.g.
The
messages (in all 5 instances) simply means this groupId has never committed an offset for that partition.
These logs are normal for that condition:
The main consumer is consumer-mystream-2; the repeated logs further down are for
Consumer clientId=consumer-mystream-3
- I think that's the consumer used for the health check - it has the same group but never subscribes to the topic maybe this chatter is because it's in the same group.The chatter goes away as soon as the main consumer commits the offset of a record.
In any case, if you send a record to each of the 10 partitions, each instance will definitely get 2.
EDIT
It works as expected for me, using either native or binder partitioning. The
partitionCount
is automatically adjusted up to match the actual number of partitions and that is used as the modulus when calculating the partition.