Confluent JDBC接收器连接器可以识别Debezium Connector捕获的记录
我尝试使用Kafka Echo系统来cdc Postgresql。 Debezium Connector捕获PostgreSQL 13数据,然后生产到Kafka。而且我将Confluent的JDBC接收器连接器连接到Kafka的另一侧,但它无法正确识别JSON消息。
经过测试的环境:
- PostgreSQL 13,WAL逻辑(Docker)
- Debezium 1.9 PostgreSQL连接器(预构建的JAR)
- Confluentinc/kafka-connector-jdbc in Github。
- Kafka 2.8
- Java 17
测试表。
创建表人员(ID整数而非null,名称Varchar(50)NON NULL,Nickname Varchar(50),主键(ID));
Debezium在更新事件上产生了JSON消息。 (由Console-Consumer捕获)
{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"string",
"optional":true,
"field":"nickname"
}
],
"optional":true,
"name":"localdb_postgres.public.persons.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"string",
"optional":true,
"field":"nickname"
}
],
"optional":true,
"name":"localdb_postgres.public.persons.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false,incremental"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":true,
"field":"sequence"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"localdb_postgres.public.persons.Envelope"
},
"payload":{
"before":null,
"after":{
"id":3,
"name":"Ko Youngrok",
"nickname":"SuperMen"
},
"source":{
"version":"1.9.2.Final",
"connector":"postgresql",
"name":"localdb_postgres",
"ts_ms":1653878655898,
"snapshot":"false",
"db":"postgres",
"sequence":"[\"23092016\",\"23092304\"]",
"schema":"public",
"table":"persons",
"txId":516,
"lsn":23092304,
"xmin":null
},
"op":"u",
"ts_ms":1653878656372,
"transaction":null
}
来自Confluent JDBC接收器连接器的错误日志。
[2022-05-30 15:45:26,750]信息无法找到字段[sinkRecordfield {schema = schema = schema = io.debezium.connector.postgresql.source:struct}:struct},name ='source ='source ='isprimaryKey = false}} ,SinkRecordField {schema = schema {int64},name ='ts_ms',isprimarykey = false},sinkRecordfield {schema {schema = schema = schema = localdb_postgres.public.persons.valson.value:struct},struct},struct},name ='after'',isprimarykeykey =',isprimarykey = falleffield},sinkRecord},sinkRecordfield {schema = schema {string},name ='op',isprimarykey = false},sinkRecordfield {schema = schema = schema = localdb_postgres.public.public.persons.value:struct},name ='eft fefor' =模式{struct},name ='trassaction',isprimarykey = false}]列名称[昵称,id,name](io.confluent.connect.jdbc.sink.dbstructure:276)
>
[2022-05-30 15:45:26,752]错误workersInkTask {id = Conflue-jdbc-sink-connector-0}任务抛出了一个未被师且无法恢复的例外。在手动重新启动之前,任务被杀死,不会恢复。错误:无法更改表“ localdb_postgres”。“ public”。“ persons”要添加缺少字段sinkRecordfield {schema = schema = schema {io.debezium.connector.postgresql.source:struct}:struct},name ='source ='source ='source'由于该字段不是可选的,并且没有默认值(org.apache.kafka.connect.runtime.workersinktask:608) io.confluent.connect.jdbc.sink.sink.tablealterorcreateexception:无法更改表“ localdb_postgres”。“ public”。 ='source',isprimarykey = false},因为该字段不是可选的,并且没有默认值
='source',isprimarykey = false},因为该字段不是可选的,并且我想
- , debezium Connector打算使用顶级“ schema”列的目的是其他顶级列的类型递减, ”。每个匹配列都存在于模式和有效载荷中。有效载荷列是值,而模式列是有效载荷列的类型说明。
- 但是Confluet连接器将“架构”视为表格架,因此请尝试更改表。当然,没有enth outh信息可以创建数据库列,并且错误被提高了。
解决这个问题有什么想法吗?
这是汇合连接器配置。
name=conflue-jdbc-sink-connector
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
connection.url=blahblah
connection.user=blahblah
connection.password=blahblah
dialect.name=PostgreSqlDatabaseDialect
insert.mode=insert
pk.mode=none
pk.fields=none
batch.size=3000
delete.enabled=false
topics=localdb_postgres.public.persons
I try to cdc postgresql with kafka echo system. The debezium connector captures postgresql 13 data and produce then to kafka. And I attach JDBC sink connector of confluent to other side of kafka, but it does not recognize json message properly.
Tested environment:
- Postgresql 13, WAL logical (docker)
- Debezium 1.9 postgresql connector (pre-built jar)
- confluentinc/kafka-connector-jdbc in github.
- Kafka 2.8
- java 17
Tested table.
create table PERSONS (id integer not null, name varchar(50) not null, nickname varchar(50), primary key(id));
Debezium produced json message on update event. (captured by console-consumer)
{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"string",
"optional":true,
"field":"nickname"
}
],
"optional":true,
"name":"localdb_postgres.public.persons.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"string",
"optional":true,
"field":"nickname"
}
],
"optional":true,
"name":"localdb_postgres.public.persons.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false,incremental"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":true,
"field":"sequence"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"localdb_postgres.public.persons.Envelope"
},
"payload":{
"before":null,
"after":{
"id":3,
"name":"Ko Youngrok",
"nickname":"SuperMen"
},
"source":{
"version":"1.9.2.Final",
"connector":"postgresql",
"name":"localdb_postgres",
"ts_ms":1653878655898,
"snapshot":"false",
"db":"postgres",
"sequence":"[\"23092016\",\"23092304\"]",
"schema":"public",
"table":"persons",
"txId":516,
"lsn":23092304,
"xmin":null
},
"op":"u",
"ts_ms":1653878656372,
"transaction":null
}
The error log from Confluent jdbc sink connector.
[2022-05-30 15:45:26,750] INFO Unable to find fields [SinkRecordField{schema=Schema{io.debezium.connector.postgresql.Source:STRUCT}, name='source', isPrimaryKey=false}, SinkRecordField{schema=Schema{INT64}, name='ts_ms', isPrimaryKey=false}, SinkRecordField{schema=Schema{localdb_postgres.public.persons.Value:STRUCT}, name='after', isPrimaryKey=false}, SinkRecordField{schema=Schema{STRING}, name='op', isPrimaryKey=false}, SinkRecordField{schema=Schema{localdb_postgres.public.persons.Value:STRUCT}, name='before', isPrimaryKey=false}, SinkRecordField{schema=Schema{STRUCT}, name='transaction', isPrimaryKey=false}] among column names [nickname, id, name] (io.confluent.connect.jdbc.sink.DbStructure:276)
[2022-05-30 15:45:26,752] ERROR WorkerSinkTask{id=conflue-jdbc-sink-connector-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: Cannot ALTER TABLE "localdb_postgres"."public"."persons" to add missing field SinkRecordField{schema=Schema{io.debezium.connector.postgresql.Source:STRUCT}, name='source', isPrimaryKey=false}, as the field is not optional and does not have a default value (org.apache.kafka.connect.runtime.WorkerSinkTask:608)
io.confluent.connect.jdbc.sink.TableAlterOrCreateException: Cannot ALTER TABLE "localdb_postgres"."public"."persons" to add missing field SinkRecordField{schema=Schema{io.debezium.connector.postgresql.Source:STRUCT}, name='source', isPrimaryKey=false}, as the field is not optional and does not have a default value
I suppose,
- Debezium connector intended that purpose of top-level "schema" column is type decription of other top-level column "payload". Every matching columns exist in both schema and payload. Payload column is value and schema column is type description of payload column.
- But confluet connector regards "schema" as table schema, so try to alter table. Of course there's no enouth information to create database column and error was raised.
Any any idea to solve this problem?
This is confluent connector configuration.
name=conflue-jdbc-sink-connector
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
connection.url=blahblah
connection.user=blahblah
connection.password=blahblah
dialect.name=PostgreSqlDatabaseDialect
insert.mode=insert
pk.mode=none
pk.fields=none
batch.size=3000
delete.enabled=false
topics=localdb_postgres.public.persons
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
它消耗的记录的JDBC接收器连接顶级字段。
如果您只想在数据有效载荷后仅编写
,则需要提取。如果您继续使用JSONConverter,则最终仍会获得
,但是您只会看到与该数据相关的内容。schema
和有效载荷
例如,这将是转换后的完整记录。
The JDBC Sink Connect infers only top level fields of the records it consumes.
If you want to write only the
after
data payload, for example, you need to extract it. If you continue to use the JSONConverter, then you will still end up withschema
andpayload
, but you will only see what's relevant to that data.E.g. This would be the full record after the transform.