创建弹性水槽连接器时序列化错误

发布于 2025-02-03 01:53:59 字数 3241 浏览 4 评论 0原文

在创建弹性水槽连接器时,我遇到了以下错误。

CREATE SOURCE CONNECTOR `testconnector` WITH(
    "type.name"= '_doc',
    "input.data.format"= 'AVRO',
    "connector.class"= 'io.confluent.connect.elasticsearch.ElasticsearchSinkConnector',
    "tasks.max"= '1',
    "transforms"= 'Dealership',
    "topics"= 'es.contact.model',
    "transforms.Dealership.type"= 'io.confluent.connect.transforms.ExtractTopic$Value',
    "transforms.Dealership.field"= 'indexTopicName',
    "transforms.Dealership.skip.missing.or.null"= 'true',
    "connection.url"= 'http://192.168.1.5:19200',
    "key.ignore"= 'true',
    "schema.ignore"= 'true'
    );

错误是,

 | FAILED | org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:220)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:142)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:519)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:494)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:235)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:204)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:200)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:255)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error:
        at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:324)
        at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:87)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$5(WorkerSinkTask.java:519)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:166)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:200)
        ... 13 more
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing message to JSON in topic es.contact.model
        at org.apache.kafka.connect.json.JsonDeserializer.deserialize(JsonDeserializer.java:66)
        at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:322)

我使用了以下版本, OS- Linux Cent OS 7 Confluent Kafka版本-7.1.1 弹性接收器连接器-11.1.10

请帮助我解决此错误。

I got the below error while creating the elastic sink connector.

CREATE SOURCE CONNECTOR `testconnector` WITH(
    "type.name"= '_doc',
    "input.data.format"= 'AVRO',
    "connector.class"= 'io.confluent.connect.elasticsearch.ElasticsearchSinkConnector',
    "tasks.max"= '1',
    "transforms"= 'Dealership',
    "topics"= 'es.contact.model',
    "transforms.Dealership.type"= 'io.confluent.connect.transforms.ExtractTopic$Value',
    "transforms.Dealership.field"= 'indexTopicName',
    "transforms.Dealership.skip.missing.or.null"= 'true',
    "connection.url"= 'http://192.168.1.5:19200',
    "key.ignore"= 'true',
    "schema.ignore"= 'true'
    );

The error is,

 | FAILED | org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:220)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:142)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:519)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:494)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:235)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:204)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:200)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:255)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error:
        at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:324)
        at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:87)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$5(WorkerSinkTask.java:519)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:166)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:200)
        ... 13 more
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing message to JSON in topic es.contact.model
        at org.apache.kafka.connect.json.JsonDeserializer.deserialize(JsonDeserializer.java:66)
        at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:322)

I used the below versions,
OS - Linux Cent OS 7
Confluent kafka version - 7.1.1
Elastic Sink Connector - 11.1.10

Kindly help me to fix this error.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文