kafka connect' extractfield $ key' SMT结果在未知字段中'错误

发布于 2025-02-09 10:42:44 字数 3512 浏览 3 评论 0原文

我有一个Debezium Connector(在KSQLDB-Server上运行)的设置,该设置是从SQL Server CDC表流到KAFKA主题的流动值。我正在尝试将消息的键从JSON转换为整数值。我收到的示例密钥看起来像这样:{“ internationId”:11117},我想将其表示为数字11117。根据kafka connect文档,对于。但是,当我将连接器配置为使用此转换时,我会收到一个错误是由以下原因引起的:java.lang.illegalargumentException:未知字段:InternalID

连接器配置:

CREATE SOURCE CONNECTOR properties_sql_connector WITH (
'connector.class'= 'io.debezium.connector.sqlserver.SqlServerConnector', 
'database.hostname'= 'propertiessql', 
'database.port'= '1433', 
'database.user'= 'XXX', 
'database.password'= 'XXX', 
'database.dbname'= 'Properties', 
'database.server.name'= 'properties', 
'table.exclude.list'= 'dbo.__EFMigrationsHistory', 
'database.history.kafka.bootstrap.servers'= 'kafka:9091', 
'database.history.kafka.topic'= 'dbhistory.properties',
'key.converter.schemas.enable'= 'false',
'transforms'= 'unwrap,extractField',
'transforms.unwrap.type'= 'io.debezium.transforms.ExtractNewRecordState',
'transforms.unwrap.delete.handling.mode'= 'none',
'transforms.extractField.type'= 'org.apache.kafka.connect.transforms.ExtractField$Key',
'transforms.extractField.field'= 'InternalID',
'key.converter'= 'org.apache.kafka.connect.json.JsonConverter');

错误详细信息:

--------------------------------------------------------------------------------------------------------------------------------------
 0       | FAILED | org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:223) 
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:149)
        at org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:50)
        at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:355)
        at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:258)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.IllegalArgumentException: Unknown field: InternalID
        at org.apache.kafka.connect.transforms.ExtractField.apply(ExtractField.java:65)
        at org.apache.kafka.connect.runtime.TransformationChain.lambda$apply$0(TransformationChain.java:50)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:173)       
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:207) 
        ... 11 more

为什么此转换失败的任何想法?我是否缺少一些配置? 当ExtractField转换被删除时,我的消息的键看起来如上:{“ internationId”:11117}

I have a setup of Debezium connector (running on ksqlDB-server) that's streaming values from SQL Server CDC Tables to Kafka Topics. I'm trying to transform the key of my message from JSON to Integer value. The example key I'm receiving looks like this: {"InternalID":11117} and I want to represent it as just a number 11117. According to Kafka Connect documentation this should be fairly easy with ExtractField SMT. However when I'm configuring my connector to use this transform I'm receiving an error Caused by: java.lang.IllegalArgumentException: Unknown field: InternalID.

Connector config:

CREATE SOURCE CONNECTOR properties_sql_connector WITH (
'connector.class'= 'io.debezium.connector.sqlserver.SqlServerConnector', 
'database.hostname'= 'propertiessql', 
'database.port'= '1433', 
'database.user'= 'XXX', 
'database.password'= 'XXX', 
'database.dbname'= 'Properties', 
'database.server.name'= 'properties', 
'table.exclude.list'= 'dbo.__EFMigrationsHistory', 
'database.history.kafka.bootstrap.servers'= 'kafka:9091', 
'database.history.kafka.topic'= 'dbhistory.properties',
'key.converter.schemas.enable'= 'false',
'transforms'= 'unwrap,extractField',
'transforms.unwrap.type'= 'io.debezium.transforms.ExtractNewRecordState',
'transforms.unwrap.delete.handling.mode'= 'none',
'transforms.extractField.type'= 'org.apache.kafka.connect.transforms.ExtractField$Key',
'transforms.extractField.field'= 'InternalID',
'key.converter'= 'org.apache.kafka.connect.json.JsonConverter');

Error details:

--------------------------------------------------------------------------------------------------------------------------------------
 0       | FAILED | org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:223) 
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:149)
        at org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:50)
        at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:355)
        at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:258)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.IllegalArgumentException: Unknown field: InternalID
        at org.apache.kafka.connect.transforms.ExtractField.apply(ExtractField.java:65)
        at org.apache.kafka.connect.runtime.TransformationChain.lambda$apply$0(TransformationChain.java:50)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:173)       
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:207) 
        ... 11 more

Any ideas for why this transform is failing? Am I missing some configuration?
When extractField transform is removed the key of my message looks like the above:{"InternalID":11117}

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(3

贱贱哒 2025-02-16 10:42:44

默认情况下,当您为任何连接器(包括Debezium)配置SMT时,将将转换应用于连接器发出的每个记录。其中包括可能没有检索到的数据并且可能没有必要字段的更改事件消息。
要解决此问题,您需要选择性地将SMT应用于使用SMT Predicates生成的更改事件消息的特定子集。

官方文档位于在这里

在您的特定情况下,您只能将SMT应用于该特定数据库表的输出主题,它看起来像这样:

# Create a predicate that matches your output 
predicates: topicNameMatch
predicates.topicNameMatch.type: org.apache.kafka.connect.transforms.predicates.TopicNameMatches
predicates.topicNameMatch.pattern: *output topic name goes here*

# Your logic to extract the field from the key
transforms.extractField.type: org.apache.kafka.connect.transforms.ExtractField$Key
transforms.extractField.field: InternalID

# This references the predicate above
transforms.extractField.predicate: topicNameMatch

如果主题名称匹配对您不起作用,则上面列出的文档中还有其他谓词。

By default, when you configure SMTs for any connector, including Debezium, transformation is applied to every record that the connector emits. That includes change event messages that might not have the retrieved data and might not have the necessary fields.
To solve this, you need to apply your SMTs selectively to only a specific subset of change event messages generated by Debezium using SMT predicates.

The official documentation is located here.

In your specific case, you could apply the SMT only to the output topic for that specific database table it would look something like this:

# Create a predicate that matches your output 
predicates: topicNameMatch
predicates.topicNameMatch.type: org.apache.kafka.connect.transforms.predicates.TopicNameMatches
predicates.topicNameMatch.pattern: *output topic name goes here*

# Your logic to extract the field from the key
transforms.extractField.type: org.apache.kafka.connect.transforms.ExtractField$Key
transforms.extractField.field: InternalID

# This references the predicate above
transforms.extractField.predicate: topicNameMatch

There are other predicates located in the documentation listed above if the topic name matching doesn't work for you.

少钕鈤記 2025-02-16 10:42:44

为了从JSON中提取一个命名字段,您需要schemas.enable ='true'该转换器的

任何数据未从Debezium采购的数据,这将需要JSON的架构为活动的一部分。

或者,如果您使用的是模式注册表,请切换到使用该架构的其他转换器,并且应该可以工作。

In order to extract a named field from JSON, you'll need schemas.enable = 'true' for that converter

For any data that's not sourced from Debezium, that'll require the JSON has a schema as part of the event.

Or, if you're using the Schema Registry, switch to a different converter that uses that, and it should work.

你在看孤独的风景 2025-02-16 10:42:44

extractNewRecordState提取有效载荷从SMT信封中,并将此数据作为值。当您需要使用valuetokey转换将internId从值复制到键。 extractfield $ key从键结构提取字段,默认情况下,该结构仅具有表的主键,可能不是internalID

如果是InternalID是您的主要键,则阅读@dynamitem答案。

ExtractNewRecordState extracts payload.after from SMT Envelope and places this data as value. When you need to copy InternalID from value to key using ValueToKey transform. ExtractField$Key extracts field from key struct, by default this struct has only primary key of a table, probably it is not InternalID.

In case of InternalID is your primary key then read @dynamitem answer.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文