有人面对这个``时间戳始终都应该是非负或无效的。
从Flink应用程序向Kafka主题发布消息时获取此错误。 相同的代码在我们的测试环境中使用相似的生产者配置,但在生产环境中失败。我找不到这个问题的原因。
2022-04-15 16:51:37,228 thread="Sink: spend-limit-publisher-sink (2/2)#15" level=WARN logger=o.a.f.r.t.Task - Sink: spend-limit-publisher-sink (2/2)#15 (d7b4646c840c3882bb784125393b484a) switched from RUNNING to FAILED with failure cause: java.lang.IllegalArgumentException: Invalid timestamp: -9223372036854775808. Timestamp should always be non-negative or null.
at org.apache.kafka.clients.producer.ProducerRecord.<init>(ProducerRecord.java:74)
at org.apache.kafka.clients.producer.ProducerRecord.<init>(ProducerRecord.java:97)
at com.ex******.streamplatform.sdk.kafka.flink.ProducerFunction$Tuple2ProducerFunction.apply(ProducerFunction.java:80)
at com.ex******.streamplatform.sdk.kafka.flink.ProducerFunction$Tuple2ProducerFunction.apply(ProducerFunction.java:72)
at com.ex******.streamplatform.sdk.kafka.flink.KafkaSerializationSchemaAdapter.serialize(KafkaSerializationSchemaAdapter.java:37)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:907)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:99)
at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:223)
at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:54)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:205)
at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:684)
at org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:639)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:650)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:623)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
at java.base/java.lang.Thread.run(Unknown Source)
Getting this error while publishing messages to a kafka topic from a flink app.
The same code is working in our test environment with similar producer configuration but failing in production environment. I couldn't find the reason for this issue.
2022-04-15 16:51:37,228 thread="Sink: spend-limit-publisher-sink (2/2)#15" level=WARN logger=o.a.f.r.t.Task - Sink: spend-limit-publisher-sink (2/2)#15 (d7b4646c840c3882bb784125393b484a) switched from RUNNING to FAILED with failure cause: java.lang.IllegalArgumentException: Invalid timestamp: -9223372036854775808. Timestamp should always be non-negative or null.
at org.apache.kafka.clients.producer.ProducerRecord.<init>(ProducerRecord.java:74)
at org.apache.kafka.clients.producer.ProducerRecord.<init>(ProducerRecord.java:97)
at com.ex******.streamplatform.sdk.kafka.flink.ProducerFunction$Tuple2ProducerFunction.apply(ProducerFunction.java:80)
at com.ex******.streamplatform.sdk.kafka.flink.ProducerFunction$Tuple2ProducerFunction.apply(ProducerFunction.java:72)
at com.ex******.streamplatform.sdk.kafka.flink.KafkaSerializationSchemaAdapter.serialize(KafkaSerializationSchemaAdapter.java:37)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:907)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:99)
at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:223)
at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:54)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:205)
at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:684)
at org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:639)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:650)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:623)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
at java.base/java.lang.Thread.run(Unknown Source)
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
我发现问题是因为水槽缺乏水印。因此,时间戳默认为long.min_value(-9223372036854775808)
通过添加水印来修复它
I figured out the issue was because of lack of watermark at the sink. So the timestamp was defaulting to Long.MIN_VALUE (-9223372036854775808)
Fixed it by adding watermark