弹簧云数据流量缩放与Apache Kafka

发布于 2025-01-26 13:26:39 字数 1913 浏览 4 评论 0原文

我在缩放上遵循了许多春季教程(在这里),但仍未找到使所有消费者忙碌的秘密敲门声。目的是让5个(示例)下沉过程不断忙碌。文档意味着对于简单的源流|处理器|接收器,我将使用以下流属性:

deployer.sink.count=5
app.processor.producer.partitionKeyExpression=headers['X-PartitionKey']?:''
app.sink.spring.cloud.stream.kafka.binder.autoAddPartitions=true
app.sink.spring.cloud.stream.kafka.binder.minPartitionCount=5

自然,处理器添加了一个名为X-PartitionKey的标头字段,最终完全具有唯一的唯一,以至于应该足够平衡。

我发现只有2或3个接收到消息,其余的坐着。感觉就像前几个观看多个分区一样,其他人只是坐在待机中说没有发现分区的承诺偏移。我可以通过使用kowl看到消息获得了唯一的分区,但是负载永远不会平衡以反射。

我在某个地方缺少配置吗?这是Kafka活页夹问题吗?

更新#1

我注意到每个实例都没有获得唯一的客户端。不确定这是否相关。

Instance 1 - [Consumer clientId=consumer-mystream-2, groupId=mystream]
Instance 2 - [Consumer clientId=consumer-mystream-3, groupId=mystream]
Instance 3 - [Consumer clientId=consumer-mystream-3, groupId=mystream]
Instance 4 - [Consumer clientId=consumer-mystream-3, groupId=mystream]
Instance 5 - [Consumer clientId=consumer-mystream-2, groupId=mystream]

更新#2

使用以下3个属性,所有5个实例都很忙。此方法似乎绕过SCST并使用MessageKey而不是PartitionKey。还不是完美的,但是更好。

deployer.sink.count=5
app.processor.spring.cloud.stream.kafka.binder.messageKeyExpression=headers['X-PartitionKey']?:''
app.sink.spring.cloud.stream.kafka.binder.autoAddPartitions=true

更新#3

添加spring.cloud.stream.bindings.output.producer.partition-count-count等于Kafka分区的实际数量似乎已经解决了问题。如果您的Deployer。*。计数属性少于分区的实际数量,则将不会分配多余的分区,但是无论如何,消费者将被分配给他们,并且可能会安置。

I've followed many Spring tutorials on scaling (here, and here), but still haven't found the secret knock that keeps all consumers busy. The intent is to have 5 (example) sink processes constantly busy. The documentation implies that for a simple stream of source | processor | sink, I would use the following stream properties:

deployer.sink.count=5
app.processor.producer.partitionKeyExpression=headers['X-PartitionKey']?:''
app.sink.spring.cloud.stream.kafka.binder.autoAddPartitions=true
app.sink.spring.cloud.stream.kafka.binder.minPartitionCount=5

Naturally, the processor adds a header field called X-PartitionKey that ends up being unique enough that it should balance adequately.

What I find is that only 2 or 3 ever receive messages, and the remainder sit idle. It feels like the first few watch multiple partitions, and the others just sit in stand-by saying Found no committed offset for partition. I can see by using kowl that the messages are getting unique partitions, but the load never gets balanced to reflect as such.

Am I missing configuration somewhere? Is it a kafka binder issue?

Update #1

I've noticed that each instance isn't getting a unique clientId. Not sure if this is relevant.

Instance 1 - [Consumer clientId=consumer-mystream-2, groupId=mystream]
Instance 2 - [Consumer clientId=consumer-mystream-3, groupId=mystream]
Instance 3 - [Consumer clientId=consumer-mystream-3, groupId=mystream]
Instance 4 - [Consumer clientId=consumer-mystream-3, groupId=mystream]
Instance 5 - [Consumer clientId=consumer-mystream-2, groupId=mystream]

Update #2

Using the below 3 properties, all 5 instances are busy. This method seems to bypass SCSt and uses a MessageKey instead of PartitionKey. Not perfect yet, but better.

deployer.sink.count=5
app.processor.spring.cloud.stream.kafka.binder.messageKeyExpression=headers['X-PartitionKey']?:''
app.sink.spring.cloud.stream.kafka.binder.autoAddPartitions=true

Update #3

Adding spring.cloud.stream.bindings.output.producer.partition-count equal to the actual number of Kafka partitions seems to have resolved the issue. If your deployer.*.count property is less than the actual number of partitions, the excess partitions will not be assigned messages, but the consumers will be assigned to them anyways, and may sit idle.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

土豪我们做朋友吧 2025-02-02 13:26:39

您可以清楚地看到在这5个实例中均匀分配的10个分区,例如

partitions assigned: [mystream.processor-7, mystream.processor-6]
1: 3,2
2: 0,1
3: 9,8
4: 5,4
5: 7,6

(在所有5个实例中)的消息

Found no committed offset for partition mystream.processor-8

仅表示该群体ID从未为该分区承担偏移。

这些条件对于这种情况是正常的:

2022-05-04 17:42:444.067信息[接收器,] 1 --- [container-0-c-1] oakccinternals.consumerCoordinator:[commuter clientId = compliter clientid = consumer-mystream-2,groupid = mystream = mystream = mystream = mystream = mystream]没有发现分区的偏移偏移。

2022-05-04 17:42:444.067信息[接收器,] 1 --- [container-0-c-1] oakccinternals.consumerCoordinator:[commuter clientId = compliter clientid = consumer-mystream-2,groupid = mystream = mystream = mystream = mystream = mystream]发现没有承诺的分区缩写mystream.processor-6

2022-05-04 17:42:44.163信息[sink,] 1 --- [Container-0-C-1] oakccinternals.subscriptionState:[commuter clientId = compliter clientId = consumer-mystream-2,groupid = mystream = mystream = mystream]重置分区mystream.processor-7的偏移量{offset = 0,offsetepoch = optional.empty.empty,currentLeader = definess = drademandePoch {strafe = dataflow-kafka-0.dataflow-kafka-0.dataflow-kafka-headless.defelest.default.default.svc.cluster.svc.cluster.svc.cluster.svc.cluster.svc.cluster.svc.cluster.svc.cluster.svc.cluster.svc.cluster.local :9092(ID:0机架:null)],epoch = 12}}。

2022-05-04 17:42:444.167信息[sink,] 1 --- [container-0-c-1] oakccinternals.subscriptionState:[commuter clientId = compliter clientid = consumer-mystream-2,groupid = mystream = mystream = mystream]重置分区的偏移mystream.processor-6以定位fetchPosition {offset = 0,offsetEpoch = optional.empty.empty,currentLeader = dradingAndePoch {dr量图= dataflow-kafka-1.dataflow-kafka-1.dataflow-kafka-headless.default.default.svc.cluster.svc.cluster.svc.cluster.svc.cluster.svc.cluster.svc.cluster.local :9092(ID:1架子:null)],epoch = 10}}。

主要消费者是消费者 - 潮汐-2;重复的日志进一步向下是consumer clientId = consumer -mystream -3 - 我认为这是用于健康检查的消费者 - 它具有相同的组,但从未订阅过这个主题,也许是因为这个聊天是因为在同一组中。

一旦主要消费者承担纪录的偏移,chat不休就消失了。

无论如何,如果您将记录发送给10个分区中的每个分区,每个实例肯定会得到2个。

编辑

它可以根据我的预期使用本机或粘合剂分区。 partitionCount会自动调整以匹配分区的实际数量,并且在计算分区时用作模量。

@SpringBootApplication
public class So72115288Application {

    public static void main(String[] args) {
        SpringApplication.run(So72115288Application.class, args).close();
    }

    List<AtomicInteger> receivedPartitions = new ArrayList<>();

    CountDownLatch latch = new CountDownLatch(100);

    @Bean
    public Consumer<Message<String>> input() {
        return msg -> {
            this.receivedPartitions.get(msg.getHeaders()
                    .get(KafkaHeaders.RECEIVED_PARTITION_ID, Integer.class))
            .incrementAndGet();
            this.latch.countDown();
        };
    }

    @Bean
    ApplicationRunner runner(StreamBridge bridge) {
        for (int i = 0; i < 10; i++) {
            this.receivedPartitions.add(new AtomicInteger());
        }
        return args -> {
            // native partitioning
//          IntStream.range(0, 100).forEach(i -> bridge.send("output", new GenericMessage<>("foo",
//                  Collections.singletonMap(KafkaHeaders.MESSAGE_KEY, ("key" + i).getBytes()))));
            // binder partitioning
            IntStream.range(0, 100).forEach(i -> bridge.send("output", new GenericMessage<>("foo",
                    Collections.singletonMap("custom", ("key" + i).getBytes()))));
            this.latch.await(60, TimeUnit.SECONDS);
            System.out.println(this.receivedPartitions.stream()
                    .map(ai -> ai.get())
                    .collect(Collectors.toList()));
        };
    }

}
spring.cloud.stream.bindings.input-in-0.group=foo
spring.cloud.stream.bindings.output.producer.partition-count=5
spring.cloud.stream.bindings.output.producer.partition-key-expression=headers['custom']
spring.cloud.stream.bindings.output.destination=input-in-0

spring.cloud.stream.kafka.binder.min-partition-count=5
spring.cloud.stream.kafka.binder.auto-add-partitions=true
[10, 13, 8, 10, 13, 6, 9, 16, 12, 3]

You can clearly see the 10 partitions evenly assigned across the 5 instances, e.g.

partitions assigned: [mystream.processor-7, mystream.processor-6]
1: 3,2
2: 0,1
3: 9,8
4: 5,4
5: 7,6

The

Found no committed offset for partition mystream.processor-8

messages (in all 5 instances) simply means this groupId has never committed an offset for that partition.

These logs are normal for that condition:

2022-05-04 17:42:44.067 INFO [sink,,] 1 --- [container-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-mystream-2, groupId=mystream] Found no committed offset for partition mystream.processor-7

2022-05-04 17:42:44.067 INFO [sink,,] 1 --- [container-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-mystream-2, groupId=mystream] Found no committed offset for partition mystream.processor-6

2022-05-04 17:42:44.163 INFO [sink,,] 1 --- [container-0-C-1] o.a.k.c.c.internals.SubscriptionState : [Consumer clientId=consumer-mystream-2, groupId=mystream] Resetting offset for partition mystream.processor-7 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[dataflow-kafka-0.dataflow-kafka-headless.default.svc.cluster.local:9092 (id: 0 rack: null)], epoch=12}}.

2022-05-04 17:42:44.167 INFO [sink,,] 1 --- [container-0-C-1] o.a.k.c.c.internals.SubscriptionState : [Consumer clientId=consumer-mystream-2, groupId=mystream] Resetting offset for partition mystream.processor-6 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[dataflow-kafka-1.dataflow-kafka-headless.default.svc.cluster.local:9092 (id: 1 rack: null)], epoch=10}}.

The main consumer is consumer-mystream-2; the repeated logs further down are for Consumer clientId=consumer-mystream-3 - I think that's the consumer used for the health check - it has the same group but never subscribes to the topic maybe this chatter is because it's in the same group.

The chatter goes away as soon as the main consumer commits the offset of a record.

In any case, if you send a record to each of the 10 partitions, each instance will definitely get 2.

EDIT

It works as expected for me, using either native or binder partitioning. The partitionCount is automatically adjusted up to match the actual number of partitions and that is used as the modulus when calculating the partition.

@SpringBootApplication
public class So72115288Application {

    public static void main(String[] args) {
        SpringApplication.run(So72115288Application.class, args).close();
    }

    List<AtomicInteger> receivedPartitions = new ArrayList<>();

    CountDownLatch latch = new CountDownLatch(100);

    @Bean
    public Consumer<Message<String>> input() {
        return msg -> {
            this.receivedPartitions.get(msg.getHeaders()
                    .get(KafkaHeaders.RECEIVED_PARTITION_ID, Integer.class))
            .incrementAndGet();
            this.latch.countDown();
        };
    }

    @Bean
    ApplicationRunner runner(StreamBridge bridge) {
        for (int i = 0; i < 10; i++) {
            this.receivedPartitions.add(new AtomicInteger());
        }
        return args -> {
            // native partitioning
//          IntStream.range(0, 100).forEach(i -> bridge.send("output", new GenericMessage<>("foo",
//                  Collections.singletonMap(KafkaHeaders.MESSAGE_KEY, ("key" + i).getBytes()))));
            // binder partitioning
            IntStream.range(0, 100).forEach(i -> bridge.send("output", new GenericMessage<>("foo",
                    Collections.singletonMap("custom", ("key" + i).getBytes()))));
            this.latch.await(60, TimeUnit.SECONDS);
            System.out.println(this.receivedPartitions.stream()
                    .map(ai -> ai.get())
                    .collect(Collectors.toList()));
        };
    }

}
spring.cloud.stream.bindings.input-in-0.group=foo
spring.cloud.stream.bindings.output.producer.partition-count=5
spring.cloud.stream.bindings.output.producer.partition-key-expression=headers['custom']
spring.cloud.stream.bindings.output.destination=input-in-0

spring.cloud.stream.kafka.binder.min-partition-count=5
spring.cloud.stream.kafka.binder.auto-add-partitions=true
[10, 13, 8, 10, 13, 6, 9, 16, 12, 3]
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文