在重新平衡过程中将任务从一个 Pod 移动到另一个 Pod 时,有状态 Kafka Stream 进程会丢失状态

发布于 2025-01-09 23:32:00 字数 22531 浏览 1 评论 0原文

在重新平衡过程中将任务从一个 Pod 移动到另一个 Pod 时,有状态 Kafka Stream 进程会丢失状态。

终止 pod 时,它会重新启动,并且任务仍分配给同一个 pod,并且进程会正确重新启动(此场景没有问题)。 当我们缩小规模并且任务被迫移动到另一个 Pod 时,我们可以看到 Kafka 流恢复了变更日志,但它不起作用,因为该进程创建了一个新状态,版本为 1 而不是 3992,并且 CounterSize 为 1 2964。(可以在日志中检查)

检查日志后,我们发现当任务分配给新的 pod 时,相同的键会转到变更日志主题中的不同分区,如下图所示(不确定是否是一个问题)

在此处输入图像描述

在此处输入图像描述

在此处输入图像描述

我们正在使用的详细信息:

  • 应用程序名称是 customer-状态。
  • 我们正在使用 AWS MSK - Apache Kafka 版本 2.8.0 和 6 个代理
  • 应用程序作为 statefulSet 部署到具有 3 个副本的 EKS/Kubernetes
  • Java 11
  • Spring Cloud 2020.0.3
  • Kafka Stream 2.8.0

我们应用了一些配置更改,但我们找不到原因它没有从变更日志中获取最后的状态。

spring.cloud.stream.kafka.streams.binder.configuration:
                                                max.request.size: 5242892
                                                max.partition.fetch.bytes: 5242892
                                                max.fetch.bytes: 15728676
                                                acceptable.recovery.lag: 0
                                                num.standby.replicas: 1
                                                num.stream.threads: 2
spring.kafka.streams.binder:
                        functions:
                            process.applicationId: customer-state-process
                        configuration:
                            group.instance.id: ${POD_NAME} => We have a helm chart that will populate the VARIABLE with a POD name. As it is a statefulSet the name will be always customer-state-0, customer-state-1, customer-state-3
                            session.timeout.ms: 30000
                            acceptable.recovery.lag: 0

状态存储的设置是什么?

@Bean
public StreamsBuilderFactoryBeanConfigurer streamsBuilderFactoryBeanCustomizer() {
    return factoryBean -> {
        try {
            final StreamsBuilder streamsBuilder = factoryBean.getObject();
            if (isNull(streamsBuilder)) {
                throw new NullPointerException("streamsBuilder is null");
            }

            // Customer's State initialization
            final PrimitiveAvroSerde<String> customerKeySerde = createKeySerde(factoryBean, true);
            final SpecificAvroSerde<StateAvro> valueSerde = createValueSerde(factoryBean);

            final StoreBuilder<KeyValueStore<String, StateAvro>> storeBuilder = Stores.keyValueStoreBuilder(
                Stores.persistentKeyValueStore(STATE_STORE_NAME),
                customerKeySerde,
                valueSerde);
            streamsBuilder.addStateStore(storeBuilder);

        } catch (Exception e) {
            throw new RuntimeException("Can't create State Store");
        }
    };
}

CounterSize是如何实现并保存到kafka的?

它是一个 Java Map 序列化到 Avro,然后发布到 Kafka

CounterSize 打印在日志中只是该 Map 的一些条目

"type" : "record",
"name" : "StateAvro",
"namespace" : "com.dpml.avro.state",
"fields" : [ {
    "name" : "version",
    "type" : "long"
}, {
   "name" : "DepositCounter",
   "type" : [
       "null",
       "MetaCounterAvro"
   ],
   "default": null
} ...

{
"type" : "record",
"name" : "MetaCounterAvro",
"namespace" : "com.dpml.avro.state",
"fields" : [ {
    "name" : "entries",
    "type" : {
        "type" : "array",
        "items" : "PairLongString",
        "java-class" : "java.util.Map"
    }
}, {
    "name" : "hours",
    "type" : "long"
} ]

}


这是日志打印:

log.debug("Successfully updated state for customerId={}, {}", getCustomerId(), createLogInfo(avro));

    private String createLogInfo(StateAvro stateAvro) {
    final var depositSize = Optional.ofNullable(stateAvro.getDepositCounter())
        .map(MetaCounterAvro::getEntries)
        .map(List::size)
        .orElse(0);

    return String.format("version=%d, eventTime=%d, triggerEvent=%s, updatedByEventId=%s, CounterSize=%s,
        stateAvro.getVersion(),
        stateAvro.getEventTime(),
        stateAvro.getTriggerEvent().name(),
        stateAvro.getUpdatedByEventId(),
        depositSize);
}

LOGS:

=> offset from incoming event partition=2 offset=74800529

2022-02-24T10:52:58.867Z DEBUG c.k.d.k.t.EventToStateTransformer [thread=customer-state-process-2dff5b91-2c07-4a22-b7a8-7fbdbd413eaf-StreamThread-1] [] - Technical info -> context=topic-input-name, offset=74800529 task=1_2 - Received event {"header": {"timestamp": 1645699978378, "eventId": "8f10dc2e-fcf2-4fcb-a312-0bba7170e16d", "customerId": 1234567890}
2022-02-24T10:52:58.868Z DEBUG c.k.d.k.t.DomainEventProcessor [thread=customer-state-process-2dff5b91-2c07-4a22-b7a8-7fbdbd413eaf-StreamThread-1] [] - State already exists for customerId=1234567890 version=3990, eventTime=1645699976221, updatedByEventId=21844581-5fd9-41da-b05d-e1cb719349a5, CounterSize=2962
2022-02-24T10:52:58.868Z INFO c.k.d.k.t.CustomerNewEventProcessor [thread=customer-state-process-2dff5b91-2c07-4a22-b7a8-7fbdbd413eaf-StreamThread-1] [] - CustomerEvent -> adding customerId=1234567890 timeEvent=1645699978378 eventId=8f10dc2e-fcf2-4fcb-a312-0bba7170e16d Technical info -> partition=2 offset=74800529 task=1_2
2022-02-24T10:52:58.869Z DEBUG c.k.d.k.t.DomainEventProcessor [thread=customer-state-process-2dff5b91-2c07-4a22-b7a8-7fbdbd413eaf-StreamThread-1] [] - Successfully updated state for customerId=1234567890 version=3991, eventTime=1645699978378, triggerEvent=CustomerEvent, updatedByEventId=8f10dc2e-fcf2-4fcb-a312-0bba7170e16d, CounterSize=2963

=> scaled down from 3 pods to 2 pods to force the rebalance

2022-02-24T10:52:59.011Z INFO o.a.k.c.c.i.AbstractCoordinator [thread=customer-state-process-2dff5b91-2c07-4a22-b7a8-7fbdbd413eaf-StreamThread-1] [] - [Consumer instanceId=customer-state-0-1, clientId=customer-state-process-2dff5b91-2c07-4a22-b7a8-7fbdbd413eaf-StreamThread-1-consumer, groupId=customer-state-process] Attempt to heartbeat failed since group is rebalancing
2022-02-24T10:52:59.012Z INFO o.a.k.c.c.i.AbstractCoordinator [thread=customer-state-process-2dff5b91-2c07-4a22-b7a8-7fbdbd413eaf-StreamThread-1] [] - [Consumer instanceId=customer-state-0-1, clientId=customer-state-process-2dff5b91-2c07-4a22-b7a8-7fbdbd413eaf-StreamThread-1-consumer, groupId=customer-state-process] (Re-)joining group
2022-02-24T10:52:59.090Z INFO o.a.k.c.c.i.AbstractCoordinator [thread=customer-state-process-2dff5b91-2c07-4a22-b7a8-7fbdbd413eaf-StreamThread-1] [] - [Consumer instanceId=customer-state-0-1, clientId=customer-state-process-2dff5b91-2c07-4a22-b7a8-7fbdbd413eaf-StreamThread-1-consumer, groupId=customer-state-process] Successfully joined group with generation Generation{generationId=192444, memberId='customer-state-0-1-6693263a-0054-46bb-b775-697741beb01a', protocol='stream'}
2022-02-24T10:52:59.101Z INFO o.a.k.c.c.i.AbstractCoordinator [thread=customer-state-process-2dff5b91-2c07-4a22-b7a8-7fbdbd413eaf-StreamThread-1] [] - [Consumer instanceId=customer-state-0-1, clientId=customer-state-process-2dff5b91-2c07-4a22-b7a8-7fbdbd413eaf-StreamThread-1-consumer, groupId=customer-state-process] Successfully synced group in generation Generation{generationId=192444, memberId='customer-state-0-1-6693263a-0054-46bb-b775-697741beb01a', protocol='stream'}
2022-02-24T10:52:59.101Z INFO o.a.k.c.c.i.ConsumerCoordinator [thread=customer-state-process-2dff5b91-2c07-4a22-b7a8-7fbdbd413eaf-StreamThread-1] [] - [Consumer instanceId=customer-state-0-1, clientId=customer-state-process-2dff5b91-2c07-4a22-b7a8-7fbdbd413eaf-StreamThread-1-consumer, groupId=customer-state-process] Updating assignment with Assigned partitions: [topic-input-name-2, topic-input-name-0] Current owned partitions: [topic-input-name-2] Added partitions (assigned - owned): [topic-input-name-0] Revoked partitions (owned - assigned): []
2022-02-24T10:52:59.101Z INFO o.a.k.c.c.i.ConsumerCoordinator [thread=customer-state-process-2dff5b91-2c07-4a22-b7a8-7fbdbd413eaf-StreamThread-1] [] - [Consumer instanceId=customer-state-0-1, clientId=customer-state-process-2dff5b91-2c07-4a22-b7a8-7fbdbd413eaf-StreamThread-1-consumer, groupId=customer-state-process] Notifying assignor about the new Assignment(partitions=[topic-input-name-0, topic-input-name-2], userDataSize=98)
2022-02-24T10:52:59.102Z INFO o.a.k.s.p.i.StreamsPartitionAssignor [thread=customer-state-process-2dff5b91-2c07-4a22-b7a8-7fbdbd413eaf-StreamThread-1] [] - stream-thread [customer-state-process-2dff5b91-2c07-4a22-b7a8-7fbdbd413eaf-StreamThread-1-consumer] No followup rebalance was requested, resetting the rebalance schedule.
2022-02-24T10:52:59.102Z INFO o.a.k.s.p.i.TaskManager [thread=customer-state-process-2dff5b91-2c07-4a22-b7a8-7fbdbd413eaf-StreamThread-1] [] - stream-thread [customer-state-process-2dff5b91-2c07-4a22-b7a8-7fbdbd413eaf-StreamThread-1] Handle new assignment with: New active tasks: [1_0, 1_2] New standby tasks: [1_3] Existing active tasks: [1_2] Existing standby tasks: [1_1]
2022-02-24T10:52:59.102Z INFO o.a.k.s.p.i.StandbyTask [thread=customer-state-process-2dff5b91-2c07-4a22-b7a8-7fbdbd413eaf-StreamThread-1] [] - stream-thread [customer-state-process-2dff5b91-2c07-4a22-b7a8-7fbdbd413eaf-StreamThread-1] standby-task [1_1] Suspended running
2022-02-24T10:52:59.150Z INFO o.a.k.c.c.KafkaConsumer [thread=customer-state-process-2dff5b91-2c07-4a22-b7a8-7fbdbd413eaf-StreamThread-1] [] - [Consumer clientId=customer-state-process-2dff5b91-2c07-4a22-b7a8-7fbdbd413eaf-StreamThread-1-restore-consumer, groupId=null] Subscribed to partition(s): customer-state-process-customer-state-store-changelog-2
2022-02-24T10:52:59.154Z INFO o.a.k.s.p.i.StandbyTask [thread=customer-state-process-2dff5b91-2c07-4a22-b7a8-7fbdbd413eaf-StreamThread-1] [] - stream-thread [customer-state-process-2dff5b91-2c07-4a22-b7a8-7fbdbd413eaf-StreamThread-1] standby-task [1_1] Closed clean
2022-02-24T10:52:59.155Z INFO i.c.k.s.KafkaAvroSerializerConfig [thread=customer-state-process-2dff5b91-2c07-4a22-b7a8-7fbdbd413eaf-StreamThread-1] [] - KafkaAvroSerializerConfig values: bearer.auth.token = [hidden] schema.registry.url = [https://cp-schema-registry.cp-schema-registry.svc.cluster.local:443] basic.auth.user.info = [hidden] auto.register.schemas = true max.schemas.per.subject = 1000 basic.auth.credentials.source = URL schema.registry.basic.auth.user.info = [hidden] bearer.auth.credentials.source = STATIC_TOKEN value.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicRecordNameStrategy key.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy
2022-02-24T10:52:59.155Z INFO i.c.k.s.KafkaAvroDeserializerConfig [thread=customer-state-process-2dff5b91-2c07-4a22-b7a8-7fbdbd413eaf-StreamThread-1] [] - KafkaAvroDeserializerConfig values: bearer.auth.token = [hidden] schema.registry.url = [https://cp-schema-registry.cp-schema-registry.svc.cluster.local:443] basic.auth.user.info = [hidden] auto.register.schemas = true max.schemas.per.subject = 1000 basic.auth.credentials.source = URL schema.registry.basic.auth.user.info = [hidden] bearer.auth.credentials.source = STATIC_TOKEN specific.avro.reader = true value.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicRecordNameStrategy key.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy
2022-02-24T10:52:59.157Z INFO o.a.k.c.c.i.ConsumerCoordinator [thread=customer-state-process-2dff5b91-2c07-4a22-b7a8-7fbdbd413eaf-StreamThread-1] [] - [Consumer instanceId=customer-state-0-1, clientId=customer-state-process-2dff5b91-2c07-4a22-b7a8-7fbdbd413eaf-StreamThread-1-consumer, groupId=customer-state-process] Adding newly assigned partitions: topic-input-name-0
2022-02-24T10:52:59.157Z INFO o.a.k.s.p.i.StreamThread [thread=customer-state-process-2dff5b91-2c07-4a22-b7a8-7fbdbd413eaf-StreamThread-1] [] - stream-thread [customer-state-process-2dff5b91-2c07-4a22-b7a8-7fbdbd413eaf-StreamThread-1] State transition from RUNNING to PARTITIONS_ASSIGNED
2022-02-24T10:52:59.157Z INFO o.a.k.s.KafkaStreams [thread=customer-state-process-2dff5b91-2c07-4a22-b7a8-7fbdbd413eaf-StreamThread-1] [] - stream-client [customer-state-process-2dff5b91-2c07-4a22-b7a8-7fbdbd413eaf] State transition from RUNNING to REBALANCING
2022-02-24T10:52:59.158Z INFO o.a.k.c.c.i.ConsumerCoordinator [thread=customer-state-process-2dff5b91-2c07-4a22-b7a8-7fbdbd413eaf-StreamThread-1] [] - [Consumer instanceId=customer-state-0-1, clientId=customer-state-process-2dff5b91-2c07-4a22-b7a8-7fbdbd413eaf-StreamThread-1-consumer, groupId=customer-state-process] Setting offset for partition topic-input-name-0 to the committed offset FetchPosition{offset=73628671, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[b-2.d2-msk-cluster-pt.c2.kafka.eu-central-1.amazonaws.com:9094 (id: 2 rack: euc1-az3)], epoch=141}}
2022-02-24T10:52:59.199Z INFO o.a.k.s.p.i.ProcessorStateManager [thread=customer-state-process-2dff5b91-2c07-4a22-b7a8-7fbdbd413eaf-StreamThread-1] [] - stream-thread [customer-state-process-2dff5b91-2c07-4a22-b7a8-7fbdbd413eaf-StreamThread-1] task [1_0] State store customer-state-store did not find checkpoint offset, hence would default to the starting offset at changelog customer-state-process-customer-state-store-changelog-0
2022-02-24T10:52:59.199Z INFO o.a.k.s.p.i.StreamTask [thread=customer-state-process-2dff5b91-2c07-4a22-b7a8-7fbdbd413eaf-StreamThread-1] [] - stream-thread [customer-state-process-2dff5b91-2c07-4a22-b7a8-7fbdbd413eaf-StreamThread-1] task [1_0] Initialized
2022-02-24T10:52:59.240Z INFO o.a.k.s.p.i.ProcessorStateManager [thread=customer-state-process-2dff5b91-2c07-4a22-b7a8-7fbdbd413eaf-StreamThread-1] [] - stream-thread [customer-state-process-2dff5b91-2c07-4a22-b7a8-7fbdbd413eaf-StreamThread-1] standby-task [1_3] State store customer-state-store did not find checkpoint offset, hence would default to the starting offset at changelog customer-state-process-customer-state-store-changelog-3
2022-02-24T10:52:59.240Z INFO o.a.k.s.p.i.StandbyTask [thread=customer-state-process-2dff5b91-2c07-4a22-b7a8-7fbdbd413eaf-StreamThread-1] [] - stream-thread [customer-state-process-2dff5b91-2c07-4a22-b7a8-7fbdbd413eaf-StreamThread-1] standby-task [1_3] Initialized
2022-02-24T10:52:59.286Z INFO o.a.k.c.c.KafkaConsumer [thread=customer-state-process-2dff5b91-2c07-4a22-b7a8-7fbdbd413eaf-StreamThread-1] [] - [Consumer clientId=customer-state-process-2dff5b91-2c07-4a22-b7a8-7fbdbd413eaf-StreamThread-1-restore-consumer, groupId=null] Subscribed to partition(s): customer-state-process-customer-state-store-changelog-0, customer-state-process-customer-state-store-changelog-3, customer-state-process-customer-state-store-changelog-2
2022-02-24T10:52:59.286Z INFO o.a.k.c.c.i.SubscriptionState [thread=customer-state-process-2dff5b91-2c07-4a22-b7a8-7fbdbd413eaf-StreamThread-1] [] - [Consumer clientId=customer-state-process-2dff5b91-2c07-4a22-b7a8-7fbdbd413eaf-StreamThread-1-restore-consumer, groupId=null] Seeking to EARLIEST offset of partition customer-state-process-customer-state-store-changelog-0
2022-02-24T10:52:59.287Z INFO o.a.k.c.c.i.SubscriptionState [thread=customer-state-process-2dff5b91-2c07-4a22-b7a8-7fbdbd413eaf-StreamThread-1] [] - [Consumer clientId=customer-state-process-2dff5b91-2c07-4a22-b7a8-7fbdbd413eaf-StreamThread-1-restore-consumer, groupId=null] Seeking to EARLIEST offset of partition customer-state-process-customer-state-store-changelog-3
2022-02-24T10:52:59.471Z INFO o.a.k.c.c.i.SubscriptionState [thread=customer-state-process-2dff5b91-2c07-4a22-b7a8-7fbdbd413eaf-StreamThread-1] [] - [Consumer clientId=customer-state-process-2dff5b91-2c07-4a22-b7a8-7fbdbd413eaf-StreamThread-1-restore-consumer, groupId=null] Resetting offset for partition customer-state-process-customer-state-store-changelog-3 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[b-2.d2-msk-cluster-pt.c2.kafka.eu-central-1.amazonaws.com:9094 (id: 2 rack: euc1-az3)], epoch=0}}.
2022-02-24T10:52:59.471Z INFO o.a.k.c.c.i.SubscriptionState [thread=customer-state-process-2dff5b91-2c07-4a22-b7a8-7fbdbd413eaf-StreamThread-1] [] - [Consumer clientId=customer-state-process-2dff5b91-2c07-4a22-b7a8-7fbdbd413eaf-StreamThread-1-restore-consumer, groupId=null] Resetting offset for partition customer-state-process-customer-state-store-changelog-0 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[b-4.d2-msk-cluster-pt.c2.kafka.eu-central-1.amazonaws.com:9094 (id: 4 rack: euc1-az3)], epoch=0}}.
2022-02-24T10:53:09.480Z INFO o.a.k.s.p.i.StoreChangelogReader [thread=customer-state-process-2dff5b91-2c07-4a22-b7a8-7fbdbd413eaf-StreamThread-1] [] - stream-thread [customer-state-process-2dff5b91-2c07-4a22-b7a8-7fbdbd413eaf-StreamThread-1] Restoration in progress for 1 partitions. {customer-state-process-customer-state-store-changelog-0: position=19011, end=42297, totalRestored=19011}
2022-02-24T10:53:11.769Z INFO o.a.k.s.p.i.StreamThread [thread=customer-state-process-2dff5b91-2c07-4a22-b7a8-7fbdbd413eaf-StreamThread-1] [] - stream-thread [customer-state-process-2dff5b91-2c07-4a22-b7a8-7fbdbd413eaf-StreamThread-1] Processed 939 total records, ran 2 punctuators, and committed 4 total tasks since the last update
2022-02-24T10:53:14.105Z INFO o.a.k.s.p.i.StoreChangelogReader [thread=customer-state-process-2dff5b91-2c07-4a22-b7a8-7fbdbd413eaf-StreamThread-1] [] - stream-thread [customer-state-process-2dff5b91-2c07-4a22-b7a8-7fbdbd413eaf-StreamThread-1] Finished restoring changelog customer-state-process-customer-state-store-changelog-0 to store customer-state-store with a total number of 42297 records

2022-02-24T10:53:14.106Z INFO c.k.d.k.t.EventToStateTransformer [thread=customer-state-process-2dff5b91-2c07-4a22-b7a8-7fbdbd413eaf-StreamThread-1] [] - Fetching state store in transformer
2022-02-24T10:53:14.152Z INFO o.a.k.s.p.i.StreamTask [thread=customer-state-process-2dff5b91-2c07-4a22-b7a8-7fbdbd413eaf-StreamThread-1] [] - stream-thread [customer-state-process-2dff5b91-2c07-4a22-b7a8-7fbdbd413eaf-StreamThread-1] task [1_0] Restored and ready to run
2022-02-24T10:53:14.152Z INFO o.a.k.s.p.i.StreamThread [thread=customer-state-process-2dff5b91-2c07-4a22-b7a8-7fbdbd413eaf-StreamThread-1] [] - stream-thread [customer-state-process-2dff5b91-2c07-4a22-b7a8-7fbdbd413eaf-StreamThread-1] Restoration took 14995 ms for all tasks [1_0, 1_2, 1_3]
2022-02-24T10:53:14.152Z INFO o.a.k.s.p.i.StreamThread [thread=customer-state-process-2dff5b91-2c07-4a22-b7a8-7fbdbd413eaf-StreamThread-1] [] - stream-thread [customer-state-process-2dff5b91-2c07-4a22-b7a8-7fbdbd413eaf-StreamThread-1] State transition from PARTITIONS_ASSIGNED to RUNNING
2022-02-24T10:53:14.152Z INFO o.a.k.s.KafkaStreams [thread=customer-state-process-2dff5b91-2c07-4a22-b7a8-7fbdbd413eaf-StreamThread-1] [] - stream-client [customer-state-process-2dff5b91-2c07-4a22-b7a8-7fbdbd413eaf] State transition from REBALANCING to RUNNING
2022-02-24T10:53:14.162Z DEBUG c.k.d.k.t.EventToStateTransformer [thread=customer-state-process-2dff5b91-2c07-4a22-b7a8-7fbdbd413eaf-StreamThread-1] [] - Technical info -> context=null, offset=-1 task=1_0 - Received event {"header": {"timestamp": 1645699978060, "eventId": "96d3e5cc-f667-4735-91b8-529374c00d82", "customerId": 1234567890 }

=> At this point, the framework has finished restoring the changelog, so it should find the state of 2022-02-24T10:52:58.868Z, but it didn't
=> offset from incoming event become -1

2022-02-24T10:53:14.162Z DEBUG c.k.d.k.t.DomainEventProcessor [thread=customer-state-process-2dff5b91-2c07-4a22-b7a8-7fbdbd413eaf-StreamThread-1] [] - Creating new state for customerId=1234567890
2022-02-24T10:53:14.514Z INFO c.k.d.k.t.CustomerNewEventProcessor [thread=customer-state-process-2dff5b91-2c07-4a22-b7a8-7fbdbd413eaf-StreamThread-1] [] - CustomerEvent -> adding customerId=1234567890 timeEvent=1645699978060 eventId=96d3e5cc-f667-4735-91b8-529374c00d82 Technical info -> partition=-1 offset=-1 task=1_0
2022-02-24T10:53:14.515Z DEBUG c.k.d.k.t.DomainEventProcessor [thread=customer-state-process-2dff5b91-2c07-4a22-b7a8-7fbdbd413eaf-StreamThread-1] [] - Successfully updated state for customerId=1234567890, version=1, eventTime=1645699978060, triggerEvent=CustomerEvent, updatedByEventId=96d3e5cc-f667-4735-91b8-529374c00d82, CounterSize=1
2022-02-24T10:53:14.515Z DEBUG c.k.d.k.t.EventToStateTransformer [thread=customer-state-process-2dff5b91-2c07-4a22-b7a8-7fbdbd413eaf-StreamThread-1] [] - Technical info -> context=null, offset=-1 task=1_0 - Received event {"header": {"timestamp": 1645699977697, "eventId": "42a9535c-ab90-41da-965c-ee4c5d871626", "customerId": 1234567890 }
2022-02-24T10:53:14.515Z DEBUG c.k.d.k.t.DomainEventProcessor [thread=customer-state-process-2dff5b91-2c07-4a22-b7a8-7fbdbd413eaf-StreamThread-1] [] - State already exists for customerId=1234567890, version=1, eventTime=1645699978060, triggerEvent=CustomerEvent, updatedByEventId=96d3e5cc-f667-4735-91b8-529374c00d82, CounterSize=1
2022-02-24T10:53:14.515Z INFO c.k.d.k.t.CustomerNewEventProcessor [thread=customer-state-process-2dff5b91-2c07-4a22-b7a8-7fbdbd413eaf-StreamThread-1] [] - CustomerEvent -> adding customerId=1234567890 timeEvent=1645699977697 eventId=42a9535c-ab90-41da-965c-ee4c5d871626 Technical info -> partition=-1 offset=-1 task=1_0
2022-02-24T10:53:14.516Z DEBUG c.k.d.k.t.DomainEventProcessor [thread=customer-state-process-2dff5b91-2c07-4a22-b7a8-7fbdbd413eaf-StreamThread-1] [] - Successfully updated state for customerId=1234567890, version=2, eventTime=1645699977697, triggerEvent=CustomerEvent, updatedByEventId=42a9535c-ab90-41da-965c-ee4c5d871626, CounterSize=2
2022-02-24T10:53:14.516Z DEBUG c.k.d.k.t.EventToStateTransformer [thread=customer-state-process-2dff5b91-2c07-4a22-b7a8-7fbdbd413eaf-StreamThread-1] [] - Technical info -> context=null, offset=-1 task=1_0 - Received event {"header": {"timestamp": 1645699975892, "eventId": "459dc2ec-7a69-47f1-8c3e-53a5e5f4303b", "customerId": 1234567890 }

Stateful Kafka Stream process is losing state when moving tasks from one pod to another during the rebalancing process.

When killing the pod, it restarts and the task stays assigned to the same pod and the process restarts correctly (No issues with this scenario).
When we scale down and the task is forced to move to another pod we can see that the Kafka stream restored the changelog, but it didn't work because the process created a new state with version 1 instead of 3992 and with the CounterSize 1 instead of 2964. (It can be checked in the logs)

After checking the logs we saw that the same key goes to different partitions in the changelog topic when the task is assigned to a new pod as per the screenshots below ( Not sure if is an issue )

enter image description here

enter image description here

enter image description here

Details of what we are using:

  • Application name is customer-state.
  • We are using AWS MSK - Apache Kafka version 2.8.0 with 6 brokers
  • Application deployed as statefulSets into EKS/Kubernetes with 3 replicas
  • Java 11
  • Spring Cloud 2020.0.3
  • Kafka Stream 2.8.0

We applied a few configuration changes, but we cannot find why it is not taking the last state from the changelog.

spring.cloud.stream.kafka.streams.binder.configuration:
                                                max.request.size: 5242892
                                                max.partition.fetch.bytes: 5242892
                                                max.fetch.bytes: 15728676
                                                acceptable.recovery.lag: 0
                                                num.standby.replicas: 1
                                                num.stream.threads: 2
spring.kafka.streams.binder:
                        functions:
                            process.applicationId: customer-state-process
                        configuration:
                            group.instance.id: ${POD_NAME} => We have a helm chart that will populate the VARIABLE with a POD name. As it is a statefulSet the name will be always customer-state-0, customer-state-1, customer-state-3
                            session.timeout.ms: 30000
                            acceptable.recovery.lag: 0

What are the settings for the state-store?

@Bean
public StreamsBuilderFactoryBeanConfigurer streamsBuilderFactoryBeanCustomizer() {
    return factoryBean -> {
        try {
            final StreamsBuilder streamsBuilder = factoryBean.getObject();
            if (isNull(streamsBuilder)) {
                throw new NullPointerException("streamsBuilder is null");
            }

            // Customer's State initialization
            final PrimitiveAvroSerde<String> customerKeySerde = createKeySerde(factoryBean, true);
            final SpecificAvroSerde<StateAvro> valueSerde = createValueSerde(factoryBean);

            final StoreBuilder<KeyValueStore<String, StateAvro>> storeBuilder = Stores.keyValueStoreBuilder(
                Stores.persistentKeyValueStore(STATE_STORE_NAME),
                customerKeySerde,
                valueSerde);
            streamsBuilder.addStateStore(storeBuilder);

        } catch (Exception e) {
            throw new RuntimeException("Can't create State Store");
        }
    };
}

How is CounterSize implemented and saved to kafka?

It's a Java Map serialized to Avro, and then published to Kafka

CounterSize is printed in the logs is just a number of entries for that Map

"type" : "record",
"name" : "StateAvro",
"namespace" : "com.dpml.avro.state",
"fields" : [ {
    "name" : "version",
    "type" : "long"
}, {
   "name" : "DepositCounter",
   "type" : [
       "null",
       "MetaCounterAvro"
   ],
   "default": null
} ...

{
"type" : "record",
"name" : "MetaCounterAvro",
"namespace" : "com.dpml.avro.state",
"fields" : [ {
    "name" : "entries",
    "type" : {
        "type" : "array",
        "items" : "PairLongString",
        "java-class" : "java.util.Map"
    }
}, {
    "name" : "hours",
    "type" : "long"
} ]

}


This is the logs print:

log.debug("Successfully updated state for customerId={}, {}", getCustomerId(), createLogInfo(avro));

    private String createLogInfo(StateAvro stateAvro) {
    final var depositSize = Optional.ofNullable(stateAvro.getDepositCounter())
        .map(MetaCounterAvro::getEntries)
        .map(List::size)
        .orElse(0);

    return String.format("version=%d, eventTime=%d, triggerEvent=%s, updatedByEventId=%s, CounterSize=%s,
        stateAvro.getVersion(),
        stateAvro.getEventTime(),
        stateAvro.getTriggerEvent().name(),
        stateAvro.getUpdatedByEventId(),
        depositSize);
}

LOGS:

=> offset from incoming event partition=2 offset=74800529

2022-02-24T10:52:58.867Z DEBUG c.k.d.k.t.EventToStateTransformer [thread=customer-state-process-2dff5b91-2c07-4a22-b7a8-7fbdbd413eaf-StreamThread-1] [] - Technical info -> context=topic-input-name, offset=74800529 task=1_2 - Received event {"header": {"timestamp": 1645699978378, "eventId": "8f10dc2e-fcf2-4fcb-a312-0bba7170e16d", "customerId": 1234567890}
2022-02-24T10:52:58.868Z DEBUG c.k.d.k.t.DomainEventProcessor [thread=customer-state-process-2dff5b91-2c07-4a22-b7a8-7fbdbd413eaf-StreamThread-1] [] - State already exists for customerId=1234567890 version=3990, eventTime=1645699976221, updatedByEventId=21844581-5fd9-41da-b05d-e1cb719349a5, CounterSize=2962
2022-02-24T10:52:58.868Z INFO c.k.d.k.t.CustomerNewEventProcessor [thread=customer-state-process-2dff5b91-2c07-4a22-b7a8-7fbdbd413eaf-StreamThread-1] [] - CustomerEvent -> adding customerId=1234567890 timeEvent=1645699978378 eventId=8f10dc2e-fcf2-4fcb-a312-0bba7170e16d Technical info -> partition=2 offset=74800529 task=1_2
2022-02-24T10:52:58.869Z DEBUG c.k.d.k.t.DomainEventProcessor [thread=customer-state-process-2dff5b91-2c07-4a22-b7a8-7fbdbd413eaf-StreamThread-1] [] - Successfully updated state for customerId=1234567890 version=3991, eventTime=1645699978378, triggerEvent=CustomerEvent, updatedByEventId=8f10dc2e-fcf2-4fcb-a312-0bba7170e16d, CounterSize=2963

=> scaled down from 3 pods to 2 pods to force the rebalance

2022-02-24T10:52:59.011Z INFO o.a.k.c.c.i.AbstractCoordinator [thread=customer-state-process-2dff5b91-2c07-4a22-b7a8-7fbdbd413eaf-StreamThread-1] [] - [Consumer instanceId=customer-state-0-1, clientId=customer-state-process-2dff5b91-2c07-4a22-b7a8-7fbdbd413eaf-StreamThread-1-consumer, groupId=customer-state-process] Attempt to heartbeat failed since group is rebalancing
2022-02-24T10:52:59.012Z INFO o.a.k.c.c.i.AbstractCoordinator [thread=customer-state-process-2dff5b91-2c07-4a22-b7a8-7fbdbd413eaf-StreamThread-1] [] - [Consumer instanceId=customer-state-0-1, clientId=customer-state-process-2dff5b91-2c07-4a22-b7a8-7fbdbd413eaf-StreamThread-1-consumer, groupId=customer-state-process] (Re-)joining group
2022-02-24T10:52:59.090Z INFO o.a.k.c.c.i.AbstractCoordinator [thread=customer-state-process-2dff5b91-2c07-4a22-b7a8-7fbdbd413eaf-StreamThread-1] [] - [Consumer instanceId=customer-state-0-1, clientId=customer-state-process-2dff5b91-2c07-4a22-b7a8-7fbdbd413eaf-StreamThread-1-consumer, groupId=customer-state-process] Successfully joined group with generation Generation{generationId=192444, memberId='customer-state-0-1-6693263a-0054-46bb-b775-697741beb01a', protocol='stream'}
2022-02-24T10:52:59.101Z INFO o.a.k.c.c.i.AbstractCoordinator [thread=customer-state-process-2dff5b91-2c07-4a22-b7a8-7fbdbd413eaf-StreamThread-1] [] - [Consumer instanceId=customer-state-0-1, clientId=customer-state-process-2dff5b91-2c07-4a22-b7a8-7fbdbd413eaf-StreamThread-1-consumer, groupId=customer-state-process] Successfully synced group in generation Generation{generationId=192444, memberId='customer-state-0-1-6693263a-0054-46bb-b775-697741beb01a', protocol='stream'}
2022-02-24T10:52:59.101Z INFO o.a.k.c.c.i.ConsumerCoordinator [thread=customer-state-process-2dff5b91-2c07-4a22-b7a8-7fbdbd413eaf-StreamThread-1] [] - [Consumer instanceId=customer-state-0-1, clientId=customer-state-process-2dff5b91-2c07-4a22-b7a8-7fbdbd413eaf-StreamThread-1-consumer, groupId=customer-state-process] Updating assignment with Assigned partitions: [topic-input-name-2, topic-input-name-0] Current owned partitions: [topic-input-name-2] Added partitions (assigned - owned): [topic-input-name-0] Revoked partitions (owned - assigned): []
2022-02-24T10:52:59.101Z INFO o.a.k.c.c.i.ConsumerCoordinator [thread=customer-state-process-2dff5b91-2c07-4a22-b7a8-7fbdbd413eaf-StreamThread-1] [] - [Consumer instanceId=customer-state-0-1, clientId=customer-state-process-2dff5b91-2c07-4a22-b7a8-7fbdbd413eaf-StreamThread-1-consumer, groupId=customer-state-process] Notifying assignor about the new Assignment(partitions=[topic-input-name-0, topic-input-name-2], userDataSize=98)
2022-02-24T10:52:59.102Z INFO o.a.k.s.p.i.StreamsPartitionAssignor [thread=customer-state-process-2dff5b91-2c07-4a22-b7a8-7fbdbd413eaf-StreamThread-1] [] - stream-thread [customer-state-process-2dff5b91-2c07-4a22-b7a8-7fbdbd413eaf-StreamThread-1-consumer] No followup rebalance was requested, resetting the rebalance schedule.
2022-02-24T10:52:59.102Z INFO o.a.k.s.p.i.TaskManager [thread=customer-state-process-2dff5b91-2c07-4a22-b7a8-7fbdbd413eaf-StreamThread-1] [] - stream-thread [customer-state-process-2dff5b91-2c07-4a22-b7a8-7fbdbd413eaf-StreamThread-1] Handle new assignment with: New active tasks: [1_0, 1_2] New standby tasks: [1_3] Existing active tasks: [1_2] Existing standby tasks: [1_1]
2022-02-24T10:52:59.102Z INFO o.a.k.s.p.i.StandbyTask [thread=customer-state-process-2dff5b91-2c07-4a22-b7a8-7fbdbd413eaf-StreamThread-1] [] - stream-thread [customer-state-process-2dff5b91-2c07-4a22-b7a8-7fbdbd413eaf-StreamThread-1] standby-task [1_1] Suspended running
2022-02-24T10:52:59.150Z INFO o.a.k.c.c.KafkaConsumer [thread=customer-state-process-2dff5b91-2c07-4a22-b7a8-7fbdbd413eaf-StreamThread-1] [] - [Consumer clientId=customer-state-process-2dff5b91-2c07-4a22-b7a8-7fbdbd413eaf-StreamThread-1-restore-consumer, groupId=null] Subscribed to partition(s): customer-state-process-customer-state-store-changelog-2
2022-02-24T10:52:59.154Z INFO o.a.k.s.p.i.StandbyTask [thread=customer-state-process-2dff5b91-2c07-4a22-b7a8-7fbdbd413eaf-StreamThread-1] [] - stream-thread [customer-state-process-2dff5b91-2c07-4a22-b7a8-7fbdbd413eaf-StreamThread-1] standby-task [1_1] Closed clean
2022-02-24T10:52:59.155Z INFO i.c.k.s.KafkaAvroSerializerConfig [thread=customer-state-process-2dff5b91-2c07-4a22-b7a8-7fbdbd413eaf-StreamThread-1] [] - KafkaAvroSerializerConfig values: bearer.auth.token = [hidden] schema.registry.url = [https://cp-schema-registry.cp-schema-registry.svc.cluster.local:443] basic.auth.user.info = [hidden] auto.register.schemas = true max.schemas.per.subject = 1000 basic.auth.credentials.source = URL schema.registry.basic.auth.user.info = [hidden] bearer.auth.credentials.source = STATIC_TOKEN value.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicRecordNameStrategy key.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy
2022-02-24T10:52:59.155Z INFO i.c.k.s.KafkaAvroDeserializerConfig [thread=customer-state-process-2dff5b91-2c07-4a22-b7a8-7fbdbd413eaf-StreamThread-1] [] - KafkaAvroDeserializerConfig values: bearer.auth.token = [hidden] schema.registry.url = [https://cp-schema-registry.cp-schema-registry.svc.cluster.local:443] basic.auth.user.info = [hidden] auto.register.schemas = true max.schemas.per.subject = 1000 basic.auth.credentials.source = URL schema.registry.basic.auth.user.info = [hidden] bearer.auth.credentials.source = STATIC_TOKEN specific.avro.reader = true value.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicRecordNameStrategy key.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy
2022-02-24T10:52:59.157Z INFO o.a.k.c.c.i.ConsumerCoordinator [thread=customer-state-process-2dff5b91-2c07-4a22-b7a8-7fbdbd413eaf-StreamThread-1] [] - [Consumer instanceId=customer-state-0-1, clientId=customer-state-process-2dff5b91-2c07-4a22-b7a8-7fbdbd413eaf-StreamThread-1-consumer, groupId=customer-state-process] Adding newly assigned partitions: topic-input-name-0
2022-02-24T10:52:59.157Z INFO o.a.k.s.p.i.StreamThread [thread=customer-state-process-2dff5b91-2c07-4a22-b7a8-7fbdbd413eaf-StreamThread-1] [] - stream-thread [customer-state-process-2dff5b91-2c07-4a22-b7a8-7fbdbd413eaf-StreamThread-1] State transition from RUNNING to PARTITIONS_ASSIGNED
2022-02-24T10:52:59.157Z INFO o.a.k.s.KafkaStreams [thread=customer-state-process-2dff5b91-2c07-4a22-b7a8-7fbdbd413eaf-StreamThread-1] [] - stream-client [customer-state-process-2dff5b91-2c07-4a22-b7a8-7fbdbd413eaf] State transition from RUNNING to REBALANCING
2022-02-24T10:52:59.158Z INFO o.a.k.c.c.i.ConsumerCoordinator [thread=customer-state-process-2dff5b91-2c07-4a22-b7a8-7fbdbd413eaf-StreamThread-1] [] - [Consumer instanceId=customer-state-0-1, clientId=customer-state-process-2dff5b91-2c07-4a22-b7a8-7fbdbd413eaf-StreamThread-1-consumer, groupId=customer-state-process] Setting offset for partition topic-input-name-0 to the committed offset FetchPosition{offset=73628671, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[b-2.d2-msk-cluster-pt.c2.kafka.eu-central-1.amazonaws.com:9094 (id: 2 rack: euc1-az3)], epoch=141}}
2022-02-24T10:52:59.199Z INFO o.a.k.s.p.i.ProcessorStateManager [thread=customer-state-process-2dff5b91-2c07-4a22-b7a8-7fbdbd413eaf-StreamThread-1] [] - stream-thread [customer-state-process-2dff5b91-2c07-4a22-b7a8-7fbdbd413eaf-StreamThread-1] task [1_0] State store customer-state-store did not find checkpoint offset, hence would default to the starting offset at changelog customer-state-process-customer-state-store-changelog-0
2022-02-24T10:52:59.199Z INFO o.a.k.s.p.i.StreamTask [thread=customer-state-process-2dff5b91-2c07-4a22-b7a8-7fbdbd413eaf-StreamThread-1] [] - stream-thread [customer-state-process-2dff5b91-2c07-4a22-b7a8-7fbdbd413eaf-StreamThread-1] task [1_0] Initialized
2022-02-24T10:52:59.240Z INFO o.a.k.s.p.i.ProcessorStateManager [thread=customer-state-process-2dff5b91-2c07-4a22-b7a8-7fbdbd413eaf-StreamThread-1] [] - stream-thread [customer-state-process-2dff5b91-2c07-4a22-b7a8-7fbdbd413eaf-StreamThread-1] standby-task [1_3] State store customer-state-store did not find checkpoint offset, hence would default to the starting offset at changelog customer-state-process-customer-state-store-changelog-3
2022-02-24T10:52:59.240Z INFO o.a.k.s.p.i.StandbyTask [thread=customer-state-process-2dff5b91-2c07-4a22-b7a8-7fbdbd413eaf-StreamThread-1] [] - stream-thread [customer-state-process-2dff5b91-2c07-4a22-b7a8-7fbdbd413eaf-StreamThread-1] standby-task [1_3] Initialized
2022-02-24T10:52:59.286Z INFO o.a.k.c.c.KafkaConsumer [thread=customer-state-process-2dff5b91-2c07-4a22-b7a8-7fbdbd413eaf-StreamThread-1] [] - [Consumer clientId=customer-state-process-2dff5b91-2c07-4a22-b7a8-7fbdbd413eaf-StreamThread-1-restore-consumer, groupId=null] Subscribed to partition(s): customer-state-process-customer-state-store-changelog-0, customer-state-process-customer-state-store-changelog-3, customer-state-process-customer-state-store-changelog-2
2022-02-24T10:52:59.286Z INFO o.a.k.c.c.i.SubscriptionState [thread=customer-state-process-2dff5b91-2c07-4a22-b7a8-7fbdbd413eaf-StreamThread-1] [] - [Consumer clientId=customer-state-process-2dff5b91-2c07-4a22-b7a8-7fbdbd413eaf-StreamThread-1-restore-consumer, groupId=null] Seeking to EARLIEST offset of partition customer-state-process-customer-state-store-changelog-0
2022-02-24T10:52:59.287Z INFO o.a.k.c.c.i.SubscriptionState [thread=customer-state-process-2dff5b91-2c07-4a22-b7a8-7fbdbd413eaf-StreamThread-1] [] - [Consumer clientId=customer-state-process-2dff5b91-2c07-4a22-b7a8-7fbdbd413eaf-StreamThread-1-restore-consumer, groupId=null] Seeking to EARLIEST offset of partition customer-state-process-customer-state-store-changelog-3
2022-02-24T10:52:59.471Z INFO o.a.k.c.c.i.SubscriptionState [thread=customer-state-process-2dff5b91-2c07-4a22-b7a8-7fbdbd413eaf-StreamThread-1] [] - [Consumer clientId=customer-state-process-2dff5b91-2c07-4a22-b7a8-7fbdbd413eaf-StreamThread-1-restore-consumer, groupId=null] Resetting offset for partition customer-state-process-customer-state-store-changelog-3 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[b-2.d2-msk-cluster-pt.c2.kafka.eu-central-1.amazonaws.com:9094 (id: 2 rack: euc1-az3)], epoch=0}}.
2022-02-24T10:52:59.471Z INFO o.a.k.c.c.i.SubscriptionState [thread=customer-state-process-2dff5b91-2c07-4a22-b7a8-7fbdbd413eaf-StreamThread-1] [] - [Consumer clientId=customer-state-process-2dff5b91-2c07-4a22-b7a8-7fbdbd413eaf-StreamThread-1-restore-consumer, groupId=null] Resetting offset for partition customer-state-process-customer-state-store-changelog-0 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[b-4.d2-msk-cluster-pt.c2.kafka.eu-central-1.amazonaws.com:9094 (id: 4 rack: euc1-az3)], epoch=0}}.
2022-02-24T10:53:09.480Z INFO o.a.k.s.p.i.StoreChangelogReader [thread=customer-state-process-2dff5b91-2c07-4a22-b7a8-7fbdbd413eaf-StreamThread-1] [] - stream-thread [customer-state-process-2dff5b91-2c07-4a22-b7a8-7fbdbd413eaf-StreamThread-1] Restoration in progress for 1 partitions. {customer-state-process-customer-state-store-changelog-0: position=19011, end=42297, totalRestored=19011}
2022-02-24T10:53:11.769Z INFO o.a.k.s.p.i.StreamThread [thread=customer-state-process-2dff5b91-2c07-4a22-b7a8-7fbdbd413eaf-StreamThread-1] [] - stream-thread [customer-state-process-2dff5b91-2c07-4a22-b7a8-7fbdbd413eaf-StreamThread-1] Processed 939 total records, ran 2 punctuators, and committed 4 total tasks since the last update
2022-02-24T10:53:14.105Z INFO o.a.k.s.p.i.StoreChangelogReader [thread=customer-state-process-2dff5b91-2c07-4a22-b7a8-7fbdbd413eaf-StreamThread-1] [] - stream-thread [customer-state-process-2dff5b91-2c07-4a22-b7a8-7fbdbd413eaf-StreamThread-1] Finished restoring changelog customer-state-process-customer-state-store-changelog-0 to store customer-state-store with a total number of 42297 records

2022-02-24T10:53:14.106Z INFO c.k.d.k.t.EventToStateTransformer [thread=customer-state-process-2dff5b91-2c07-4a22-b7a8-7fbdbd413eaf-StreamThread-1] [] - Fetching state store in transformer
2022-02-24T10:53:14.152Z INFO o.a.k.s.p.i.StreamTask [thread=customer-state-process-2dff5b91-2c07-4a22-b7a8-7fbdbd413eaf-StreamThread-1] [] - stream-thread [customer-state-process-2dff5b91-2c07-4a22-b7a8-7fbdbd413eaf-StreamThread-1] task [1_0] Restored and ready to run
2022-02-24T10:53:14.152Z INFO o.a.k.s.p.i.StreamThread [thread=customer-state-process-2dff5b91-2c07-4a22-b7a8-7fbdbd413eaf-StreamThread-1] [] - stream-thread [customer-state-process-2dff5b91-2c07-4a22-b7a8-7fbdbd413eaf-StreamThread-1] Restoration took 14995 ms for all tasks [1_0, 1_2, 1_3]
2022-02-24T10:53:14.152Z INFO o.a.k.s.p.i.StreamThread [thread=customer-state-process-2dff5b91-2c07-4a22-b7a8-7fbdbd413eaf-StreamThread-1] [] - stream-thread [customer-state-process-2dff5b91-2c07-4a22-b7a8-7fbdbd413eaf-StreamThread-1] State transition from PARTITIONS_ASSIGNED to RUNNING
2022-02-24T10:53:14.152Z INFO o.a.k.s.KafkaStreams [thread=customer-state-process-2dff5b91-2c07-4a22-b7a8-7fbdbd413eaf-StreamThread-1] [] - stream-client [customer-state-process-2dff5b91-2c07-4a22-b7a8-7fbdbd413eaf] State transition from REBALANCING to RUNNING
2022-02-24T10:53:14.162Z DEBUG c.k.d.k.t.EventToStateTransformer [thread=customer-state-process-2dff5b91-2c07-4a22-b7a8-7fbdbd413eaf-StreamThread-1] [] - Technical info -> context=null, offset=-1 task=1_0 - Received event {"header": {"timestamp": 1645699978060, "eventId": "96d3e5cc-f667-4735-91b8-529374c00d82", "customerId": 1234567890 }

=> At this point, the framework has finished restoring the changelog, so it should find the state of 2022-02-24T10:52:58.868Z, but it didn't
=> offset from incoming event become -1

2022-02-24T10:53:14.162Z DEBUG c.k.d.k.t.DomainEventProcessor [thread=customer-state-process-2dff5b91-2c07-4a22-b7a8-7fbdbd413eaf-StreamThread-1] [] - Creating new state for customerId=1234567890
2022-02-24T10:53:14.514Z INFO c.k.d.k.t.CustomerNewEventProcessor [thread=customer-state-process-2dff5b91-2c07-4a22-b7a8-7fbdbd413eaf-StreamThread-1] [] - CustomerEvent -> adding customerId=1234567890 timeEvent=1645699978060 eventId=96d3e5cc-f667-4735-91b8-529374c00d82 Technical info -> partition=-1 offset=-1 task=1_0
2022-02-24T10:53:14.515Z DEBUG c.k.d.k.t.DomainEventProcessor [thread=customer-state-process-2dff5b91-2c07-4a22-b7a8-7fbdbd413eaf-StreamThread-1] [] - Successfully updated state for customerId=1234567890, version=1, eventTime=1645699978060, triggerEvent=CustomerEvent, updatedByEventId=96d3e5cc-f667-4735-91b8-529374c00d82, CounterSize=1
2022-02-24T10:53:14.515Z DEBUG c.k.d.k.t.EventToStateTransformer [thread=customer-state-process-2dff5b91-2c07-4a22-b7a8-7fbdbd413eaf-StreamThread-1] [] - Technical info -> context=null, offset=-1 task=1_0 - Received event {"header": {"timestamp": 1645699977697, "eventId": "42a9535c-ab90-41da-965c-ee4c5d871626", "customerId": 1234567890 }
2022-02-24T10:53:14.515Z DEBUG c.k.d.k.t.DomainEventProcessor [thread=customer-state-process-2dff5b91-2c07-4a22-b7a8-7fbdbd413eaf-StreamThread-1] [] - State already exists for customerId=1234567890, version=1, eventTime=1645699978060, triggerEvent=CustomerEvent, updatedByEventId=96d3e5cc-f667-4735-91b8-529374c00d82, CounterSize=1
2022-02-24T10:53:14.515Z INFO c.k.d.k.t.CustomerNewEventProcessor [thread=customer-state-process-2dff5b91-2c07-4a22-b7a8-7fbdbd413eaf-StreamThread-1] [] - CustomerEvent -> adding customerId=1234567890 timeEvent=1645699977697 eventId=42a9535c-ab90-41da-965c-ee4c5d871626 Technical info -> partition=-1 offset=-1 task=1_0
2022-02-24T10:53:14.516Z DEBUG c.k.d.k.t.DomainEventProcessor [thread=customer-state-process-2dff5b91-2c07-4a22-b7a8-7fbdbd413eaf-StreamThread-1] [] - Successfully updated state for customerId=1234567890, version=2, eventTime=1645699977697, triggerEvent=CustomerEvent, updatedByEventId=42a9535c-ab90-41da-965c-ee4c5d871626, CounterSize=2
2022-02-24T10:53:14.516Z DEBUG c.k.d.k.t.EventToStateTransformer [thread=customer-state-process-2dff5b91-2c07-4a22-b7a8-7fbdbd413eaf-StreamThread-1] [] - Technical info -> context=null, offset=-1 task=1_0 - Received event {"header": {"timestamp": 1645699975892, "eventId": "459dc2ec-7a69-47f1-8c3e-53a5e5f4303b", "customerId": 1234567890 }

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文