有人面对这个``时间戳始终都应该是非负或无效的。

发布于 2025-01-21 16:49:13 字数 2663 浏览 3 评论 0原文

从Flink应用程序向Kafka主题发布消息时获取此错误。 相同的代码在我们的测试环境中使用相似的生产者配置,但在生产环境中失败。我找不到这个问题的原因。

2022-04-15 16:51:37,228 thread="Sink: spend-limit-publisher-sink (2/2)#15" level=WARN  logger=o.a.f.r.t.Task - Sink: spend-limit-publisher-sink (2/2)#15 (d7b4646c840c3882bb784125393b484a) switched from RUNNING to FAILED with failure cause: java.lang.IllegalArgumentException: Invalid timestamp: -9223372036854775808. Timestamp should always be non-negative or null.
at org.apache.kafka.clients.producer.ProducerRecord.<init>(ProducerRecord.java:74)
at org.apache.kafka.clients.producer.ProducerRecord.<init>(ProducerRecord.java:97)
at com.ex******.streamplatform.sdk.kafka.flink.ProducerFunction$Tuple2ProducerFunction.apply(ProducerFunction.java:80)
at com.ex******.streamplatform.sdk.kafka.flink.ProducerFunction$Tuple2ProducerFunction.apply(ProducerFunction.java:72)
at com.ex******.streamplatform.sdk.kafka.flink.KafkaSerializationSchemaAdapter.serialize(KafkaSerializationSchemaAdapter.java:37)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:907)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:99)
at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:223)
at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:54)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:205)
at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:684)
at org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:639)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:650)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:623)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
at java.base/java.lang.Thread.run(Unknown Source)

Getting this error while publishing messages to a kafka topic from a flink app.
The same code is working in our test environment with similar producer configuration but failing in production environment. I couldn't find the reason for this issue.

2022-04-15 16:51:37,228 thread="Sink: spend-limit-publisher-sink (2/2)#15" level=WARN  logger=o.a.f.r.t.Task - Sink: spend-limit-publisher-sink (2/2)#15 (d7b4646c840c3882bb784125393b484a) switched from RUNNING to FAILED with failure cause: java.lang.IllegalArgumentException: Invalid timestamp: -9223372036854775808. Timestamp should always be non-negative or null.
at org.apache.kafka.clients.producer.ProducerRecord.<init>(ProducerRecord.java:74)
at org.apache.kafka.clients.producer.ProducerRecord.<init>(ProducerRecord.java:97)
at com.ex******.streamplatform.sdk.kafka.flink.ProducerFunction$Tuple2ProducerFunction.apply(ProducerFunction.java:80)
at com.ex******.streamplatform.sdk.kafka.flink.ProducerFunction$Tuple2ProducerFunction.apply(ProducerFunction.java:72)
at com.ex******.streamplatform.sdk.kafka.flink.KafkaSerializationSchemaAdapter.serialize(KafkaSerializationSchemaAdapter.java:37)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:907)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:99)
at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:223)
at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:54)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:205)
at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:684)
at org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:639)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:650)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:623)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
at java.base/java.lang.Thread.run(Unknown Source)

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

始终不够爱げ你 2025-01-28 16:49:13

我发现问题是因为水槽缺乏水印。因此,时间戳默认为long.min_value(-9223372036854775808)

通过添加水印来修复它

limitedStream1.union(limitedStream2)
            .assignTimestampsAndWatermarks(WatermarkStrategy.<String>forMonotonousTimestamps()
                    .withTimestampAssigner((id, streamRecordTimestamp) -> Instant.now().toEpochMilli()))
            .map(new DataMapper())
            .filter(new NotNull<>())
            .addSink(connectorFactory.createProducer(String.class, EventRecord.class));
           

I figured out the issue was because of lack of watermark at the sink. So the timestamp was defaulting to Long.MIN_VALUE (-9223372036854775808)

Fixed it by adding watermark

limitedStream1.union(limitedStream2)
            .assignTimestampsAndWatermarks(WatermarkStrategy.<String>forMonotonousTimestamps()
                    .withTimestampAssigner((id, streamRecordTimestamp) -> Instant.now().toEpochMilli()))
            .map(new DataMapper())
            .filter(new NotNull<>())
            .addSink(connectorFactory.createProducer(String.class, EventRecord.class));
           
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文