kafka connect' extractfield $ key' SMT结果在未知字段中'错误
我有一个Debezium Connector(在KSQLDB-Server上运行)的设置,该设置是从SQL Server CDC表流到KAFKA主题的流动值。我正在尝试将消息的键从JSON转换为整数值。我收到的示例密钥看起来像这样:{“ internationId”:11117}
,我想将其表示为数字11117
。根据kafka connect文档,对于。但是,当我将连接器配置为使用此转换时,我会收到一个错误是由以下原因引起的:java.lang.illegalargumentException:未知字段:InternalID
。
连接器配置:
CREATE SOURCE CONNECTOR properties_sql_connector WITH (
'connector.class'= 'io.debezium.connector.sqlserver.SqlServerConnector',
'database.hostname'= 'propertiessql',
'database.port'= '1433',
'database.user'= 'XXX',
'database.password'= 'XXX',
'database.dbname'= 'Properties',
'database.server.name'= 'properties',
'table.exclude.list'= 'dbo.__EFMigrationsHistory',
'database.history.kafka.bootstrap.servers'= 'kafka:9091',
'database.history.kafka.topic'= 'dbhistory.properties',
'key.converter.schemas.enable'= 'false',
'transforms'= 'unwrap,extractField',
'transforms.unwrap.type'= 'io.debezium.transforms.ExtractNewRecordState',
'transforms.unwrap.delete.handling.mode'= 'none',
'transforms.extractField.type'= 'org.apache.kafka.connect.transforms.ExtractField$Key',
'transforms.extractField.field'= 'InternalID',
'key.converter'= 'org.apache.kafka.connect.json.JsonConverter');
错误详细信息:
--------------------------------------------------------------------------------------------------------------------------------------
0 | FAILED | org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:223)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:149)
at org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:50)
at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:355)
at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:258)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.IllegalArgumentException: Unknown field: InternalID
at org.apache.kafka.connect.transforms.ExtractField.apply(ExtractField.java:65)
at org.apache.kafka.connect.runtime.TransformationChain.lambda$apply$0(TransformationChain.java:50)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:173)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:207)
... 11 more
为什么此转换失败的任何想法?我是否缺少一些配置? 当ExtractField
转换被删除时,我的消息的键看起来如上:{“ internationId”:11117}
I have a setup of Debezium connector (running on ksqlDB-server) that's streaming values from SQL Server CDC Tables to Kafka Topics. I'm trying to transform the key of my message from JSON to Integer value. The example key I'm receiving looks like this: {"InternalID":11117}
and I want to represent it as just a number 11117
. According to Kafka Connect documentation this should be fairly easy with ExtractField SMT. However when I'm configuring my connector to use this transform I'm receiving an error Caused by: java.lang.IllegalArgumentException: Unknown field: InternalID
.
Connector config:
CREATE SOURCE CONNECTOR properties_sql_connector WITH (
'connector.class'= 'io.debezium.connector.sqlserver.SqlServerConnector',
'database.hostname'= 'propertiessql',
'database.port'= '1433',
'database.user'= 'XXX',
'database.password'= 'XXX',
'database.dbname'= 'Properties',
'database.server.name'= 'properties',
'table.exclude.list'= 'dbo.__EFMigrationsHistory',
'database.history.kafka.bootstrap.servers'= 'kafka:9091',
'database.history.kafka.topic'= 'dbhistory.properties',
'key.converter.schemas.enable'= 'false',
'transforms'= 'unwrap,extractField',
'transforms.unwrap.type'= 'io.debezium.transforms.ExtractNewRecordState',
'transforms.unwrap.delete.handling.mode'= 'none',
'transforms.extractField.type'= 'org.apache.kafka.connect.transforms.ExtractField$Key',
'transforms.extractField.field'= 'InternalID',
'key.converter'= 'org.apache.kafka.connect.json.JsonConverter');
Error details:
--------------------------------------------------------------------------------------------------------------------------------------
0 | FAILED | org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:223)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:149)
at org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:50)
at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:355)
at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:258)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.IllegalArgumentException: Unknown field: InternalID
at org.apache.kafka.connect.transforms.ExtractField.apply(ExtractField.java:65)
at org.apache.kafka.connect.runtime.TransformationChain.lambda$apply$0(TransformationChain.java:50)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:173)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:207)
... 11 more
Any ideas for why this transform is failing? Am I missing some configuration?
When extractField
transform is removed the key of my message looks like the above:{"InternalID":11117}
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(3)
默认情况下,当您为任何连接器(包括Debezium)配置SMT时,将将转换应用于连接器发出的每个记录。其中包括可能没有检索到的数据并且可能没有必要字段的更改事件消息。
要解决此问题,您需要选择性地将SMT应用于使用SMT Predicates生成的更改事件消息的特定子集。
官方文档位于在这里。
在您的特定情况下,您只能将SMT应用于该特定数据库表的输出主题,它看起来像这样:
如果主题名称匹配对您不起作用,则上面列出的文档中还有其他谓词。
By default, when you configure SMTs for any connector, including Debezium, transformation is applied to every record that the connector emits. That includes change event messages that might not have the retrieved data and might not have the necessary fields.
To solve this, you need to apply your SMTs selectively to only a specific subset of change event messages generated by Debezium using SMT predicates.
The official documentation is located here.
In your specific case, you could apply the SMT only to the output topic for that specific database table it would look something like this:
There are other predicates located in the documentation listed above if the topic name matching doesn't work for you.
为了从JSON中提取一个命名字段,您需要
schemas.enable ='true'
该转换器的任何数据未从Debezium采购的数据,这将需要JSON的架构为活动的一部分。
或者,如果您使用的是模式注册表,请切换到使用该架构的其他转换器,并且应该可以工作。
In order to extract a named field from JSON, you'll need
schemas.enable = 'true'
for that converterFor any data that's not sourced from Debezium, that'll require the JSON has a schema as part of the event.
Or, if you're using the Schema Registry, switch to a different converter that uses that, and it should work.
extractNewRecordState
提取有效载荷
从SMT信封中,并将此数据作为值。当您需要使用valuetokey
转换将internId
从值复制到键。extractfield $ key
从键结构提取字段,默认情况下,该结构仅具有表的主键,可能不是internalID
。如果是
InternalID
是您的主要键,则阅读@dynamitem答案。ExtractNewRecordState
extractspayload.after
from SMT Envelope and places this data as value. When you need to copyInternalID
from value to key usingValueToKey
transform.ExtractField$Key
extracts field from key struct, by default this struct has only primary key of a table, probably it is notInternalID
.In case of
InternalID
is your primary key then read @dynamitem answer.