即使在Pojos和Avro概念上,flink默认为Kryo序列化

发布于 2025-02-12 05:27:55 字数 6665 浏览 0 评论 0原文

我正在尝试进行弗林克状态模式进化的POC。我正在使用Flink 1.15.0和Java 11。 我尝试创建3个数据类 - 每种序列化类型之一:

  1. io.peleg.kryo.user - 使用java.time.instant我知道不支持的类Pojo序列化。
  2. io.peleg.pojo.user - 仅使用经典包装原始图 - 整数longstring。使用Lombok生成geters,setter和构造函数。
  3. io.peleg.avro.user - 使用Avro Maven插件从Avro架构生成。

对于每个班级,我写了一个流动作业,该流量使用一个时间窗口来缓冲元素并将其变成列表。

对于每个类,我都尝试执行以下操作:

  1. 运行一个工作
  2. 停止使用SavePoint
  3. 将字段添加到数据类中
  4. ,使用所有数据类提交给

数据类提交的字段,并在此例外情况下提交和SavePoint失败:

java.lang.Exception: Exception while creating StreamOperatorStateContext.
    at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:255)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:268)
    at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:700)
    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:676)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:643)
    at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
    at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:917)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
    at java.base/java.lang.Thread.run(Unknown Source)
Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state backend for WindowOperator_3983d6bb2f0a45b638461bc99138f22f_(2/2) from any of the 1 provided restore options.
    at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:160)
    at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:346)
    at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:164)
    ... 11 more
Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed when trying to restore heap backend
    at org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.restoreState(HeapKeyedStateBackendBuilder.java:172)
    at org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:106)
    at org.apache.flink.runtime.state.hashmap.HashMapStateBackend.createKeyedStateBackend(HashMapStateBackend.java:143)
    at org.apache.flink.runtime.state.hashmap.HashMapStateBackend.createKeyedStateBackend(HashMapStateBackend.java:74)
    at org.apache.flink.runtime.state.StateBackend.createKeyedStateBackend(StateBackend.java:140)
    at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:329)
    at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168)
    at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
    ... 13 more
Caused by: com.esotericsoftware.kryo.KryoException: java.lang.IndexOutOfBoundsException: Index 83 out of bounds for length 3
Serialization trace:
favoriteColor (io.peleg.avro.User)
    at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
    at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:116)
    at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
    at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:402)
    at org.apache.flink.runtime.state.heap.HeapSavepointRestoreOperation.readKVStateData(HeapSavepointRestoreOperation.java:219)
    at org.apache.flink.runtime.state.heap.HeapSavepointRestoreOperation.readKeyGroupStateData(HeapSavepointRestoreOperation.java:149)
    at org.apache.flink.runtime.state.heap.HeapSavepointRestoreOperation.restore(HeapSavepointRestoreOperation.java:125)
    at org.apache.flink.runtime.state.heap.HeapSavepointRestoreOperation.restore(HeapSavepointRestoreOperation.java:57)
    at org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.restoreState(HeapKeyedStateBackendBuilder.java:169)
    ... 20 more
Caused by: java.lang.IndexOutOfBoundsException: Index 83 out of bounds for length 3
    at java.base/jdk.internal.util.Preconditions.outOfBounds(Unknown Source)
    at java.base/jdk.internal.util.Preconditions.outOfBoundsCheckIndex(Unknown Source)
    at java.base/jdk.internal.util.Preconditions.checkIndex(Unknown Source)
    at java.base/java.util.Objects.checkIndex(Unknown Source)
    at java.base/java.util.ArrayList.get(Unknown Source)
    at com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
    at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
    at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:728)
    at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:113)
    ... 31 more

我希望此例外将被抛出> io.peleg.kryo.user由于Flink不支持Kryo序列化类的状态架构进化。

但是在我看来,对于所有课程,它最终都使用Kryo Serialializer而不是Pojo或Avro Serializer。

我在弗林克集群上运行了工作,我使用docker撰写:

version: "2.2"
services:
  jobmanager:
    image: flink:latest
    ports:
      - "8081:8081"
    command: jobmanager
    environment:
      - |
        FLINK_PROPERTIES=
        jobmanager.rpc.address: jobmanager

  taskmanager:
    image: flink:latest
    depends_on:
      - jobmanager
    command: taskmanager
    scale: 1
    environment:
      - |
        FLINK_PROPERTIES=
        jobmanager.rpc.address: jobmanager
        taskmanager.numberOfTaskSlots: 2

我的整个代码在github上在这里

我想实现的目标是从较旧版本的POJO的保存点上运行一份作业。

I am trying to do a POC of Flink State Schema Evolution. I am using Flink 1.15.0 and Java 11.
I tried to create 3 data classes - one for each serialization type:

  1. io.peleg.kryo.User - Uses java.time.Instant class which I know is not supported for POJO serialization in Flink.
  2. io.peleg.pojo.User - Uses only classic wrapped primitives - Integer, Long, String. The getters, setters and constructors are generated using Lombok.
  3. io.peleg.avro.User - Generated from Avro schema using Avro Maven Plugin.

For each class I wrote a stream job that uses a time window to buffer elements and turn them into a list.

For each class I tried to do the following:

  1. Run a job
  2. Stop with savepoint
  3. Add a field to the data class
  4. Submit using savepoint

For all data classes the submit with savepoint failed with this exception:

java.lang.Exception: Exception while creating StreamOperatorStateContext.
    at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:255)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:268)
    at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:700)
    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:676)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:643)
    at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
    at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:917)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
    at java.base/java.lang.Thread.run(Unknown Source)
Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state backend for WindowOperator_3983d6bb2f0a45b638461bc99138f22f_(2/2) from any of the 1 provided restore options.
    at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:160)
    at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:346)
    at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:164)
    ... 11 more
Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed when trying to restore heap backend
    at org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.restoreState(HeapKeyedStateBackendBuilder.java:172)
    at org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:106)
    at org.apache.flink.runtime.state.hashmap.HashMapStateBackend.createKeyedStateBackend(HashMapStateBackend.java:143)
    at org.apache.flink.runtime.state.hashmap.HashMapStateBackend.createKeyedStateBackend(HashMapStateBackend.java:74)
    at org.apache.flink.runtime.state.StateBackend.createKeyedStateBackend(StateBackend.java:140)
    at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:329)
    at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168)
    at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
    ... 13 more
Caused by: com.esotericsoftware.kryo.KryoException: java.lang.IndexOutOfBoundsException: Index 83 out of bounds for length 3
Serialization trace:
favoriteColor (io.peleg.avro.User)
    at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
    at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:116)
    at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
    at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:402)
    at org.apache.flink.runtime.state.heap.HeapSavepointRestoreOperation.readKVStateData(HeapSavepointRestoreOperation.java:219)
    at org.apache.flink.runtime.state.heap.HeapSavepointRestoreOperation.readKeyGroupStateData(HeapSavepointRestoreOperation.java:149)
    at org.apache.flink.runtime.state.heap.HeapSavepointRestoreOperation.restore(HeapSavepointRestoreOperation.java:125)
    at org.apache.flink.runtime.state.heap.HeapSavepointRestoreOperation.restore(HeapSavepointRestoreOperation.java:57)
    at org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.restoreState(HeapKeyedStateBackendBuilder.java:169)
    ... 20 more
Caused by: java.lang.IndexOutOfBoundsException: Index 83 out of bounds for length 3
    at java.base/jdk.internal.util.Preconditions.outOfBounds(Unknown Source)
    at java.base/jdk.internal.util.Preconditions.outOfBoundsCheckIndex(Unknown Source)
    at java.base/jdk.internal.util.Preconditions.checkIndex(Unknown Source)
    at java.base/java.util.Objects.checkIndex(Unknown Source)
    at java.base/java.util.ArrayList.get(Unknown Source)
    at com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
    at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
    at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:728)
    at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:113)
    ... 31 more

I expected this exception would be thrown for io.peleg.kryo.User since Flink does not support state schema evolution for Kryo serialized classes.

But it seems to me like for all classes it ended up using the Kryo serializer instead of the POJO or Avro serializers.

I ran the job on a Flink cluster I spun up using docker compose:

version: "2.2"
services:
  jobmanager:
    image: flink:latest
    ports:
      - "8081:8081"
    command: jobmanager
    environment:
      - |
        FLINK_PROPERTIES=
        jobmanager.rpc.address: jobmanager

  taskmanager:
    image: flink:latest
    depends_on:
      - jobmanager
    command: taskmanager
    scale: 1
    environment:
      - |
        FLINK_PROPERTIES=
        jobmanager.rpc.address: jobmanager
        taskmanager.numberOfTaskSlots: 2

My entire code is public on GitHub here.

What I would like to achieve is succusfuly running a job from a savepoint of an older version of a POJO with less/more fields.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

戴着白色围巾的女孩 2025-02-19 05:27:55

我对这个Pojo类进行了实验:

public class Event {

    public long id;
    public String data;
    public Instant timestamp;

    public Event() {}

    public Event(final long id, final String data, final Instant timestamp ) {
        this.id = id;
        this.data = data;
        this.timestamp = timestamp;
    }

    ...
}

下图来自Intellij调试器。如您所见,Flink提供了InstantSerializer,并为此pojoserializer用于此event> event类。

我不确定您在哪里出错,但是您可以用来

env.getConfig().disableGenericTypes();

禁用Kyro后卫,这应该有所帮助通过这些实验。

I did an experiment with this POJO class:

public class Event {

    public long id;
    public String data;
    public Instant timestamp;

    public Event() {}

    public Event(final long id, final String data, final Instant timestamp ) {
        this.id = id;
        this.data = data;
        this.timestamp = timestamp;
    }

    ...
}

The image below is from the IntelliJ debugger. As you can see, Flink supplied an InstantSerializer, and is using its POJOSerializer for this Event class.

enter image description here

I'm not sure where you went wrong, but you can use

env.getConfig().disableGenericTypes();

to disable the Kyro fallback, which should help with these experiments.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文