我可以在Couchbase中使用Kafka-Connect(CDC)观看一个字段吗?
我们正在尝试将数据库从MySQL移至Couchbase,并实现一些CDC(更改数据捕获)逻辑,以将数据复制到我们的新DB。
所有环境设置和运行。 Mysql,Debezium,Kafka,Couchbase,Kubernetes,Pipeline等。我们还设置了Debezium的Kafka-Source连接器。在这里是:
- name: "our-connector"
config:
connector.class: "io.debezium.connector.mysql.MySqlConnector"
tasks.max: "1"
group.id: "our-connector"
database.server.name: "our-api"
database.hostname: "******"
database.user: "******"
database.password: "******"
database.port: "3306"
database.include.list: "our_db"
column.include.list: "our_db.our_table.our_field"
table.include.list: "our_db.our_table"
database.history.kafka.topic: "inf.our_table.our_db.schema-changes"
database.history.kafka.bootstrap.servers: "kafka-cluster-kafka-bootstrap.kafka:9092"
value.converter: "org.apache.kafka.connect.json.JsonConverter"
value.converter.schemas.enable: "false"
key.converter: "org.apache.kafka.connect.json.JsonConverter"
key.converter.schemas.enable: "false"
snapshot.locking.mode: "none"
tombstones.on.delete: "false"
event.deserialization.failure.handling.mode: "ignore"
database.history.skip.unparseable.ddl: "true"
include.schema.changes: "false"
snapshot.mode: "initial"
transforms: "extract,filter,unwrap"
predicates: "isOurTableChangeOurField"
predicates.isOurTableChangeOurField.type: "org.apache.kafka.connect.transforms.predicates.TopicNameMatches"
predicates.isOurTableChangeOurField.pattern: "our-api.our_db.our_table"
transforms.filter.type: "com.redhat.insights.kafka.connect.transforms.Filter"
transforms.filter.if: "!!record.value() && record.value().get('op') == 'u' && record.value().get('before').get('our_field') != record.value().get('after').get('our_field')"
transforms.filter.predicate: "isOurTableChangeOurField"
transforms.unwrap.type: "io.debezium.transforms.ExtractNewRecordState"
transforms.unwrap.drop.tombstones: "false"
transforms.unwrap.delete.handling.mode: "drop"
transforms.extract.type: "org.apache.kafka.connect.transforms.ExtractField{{.DOLLAR_SIGN}}Key"
transforms.extract.field: "id"
此配置将此消息发布给Kafka。从Kowl捕获。
您可以看到我们具有原始记录ID并更改了字段新值。
到目前为止没问题。实际上,我们有问题:)我们的字段是MySQL中的DateTime类型,但Debezium以UnixTime的形式发布。
第一个问题我们如何使用格式的DateTime(例如Yyyy-MM-DD HH:II:MM)发布此问题?
让我们继续前进。
这是实际问题。我们已经搜索了很多,但是所有示例都将整个数据记录到Couchbase。但是我们已经在Couchbase中创建了此记录,只想最新数据。实际上,我们也操纵数据。
这是来自couchbase
我们只想更改bill.dateaccepped couchbase中的字段。尝试了一些YAML配置,但在接收器上没有成功。
这是水槽配置
- name: "our-sink-connector-1"
config:
connector.class: "com.couchbase.connect.kafka.CouchbaseSinkConnector"
tasks.max: "2"
topics: "our-api.our_db.our_table"
couchbase.seed.nodes: "dev-couchbase-couchbase-cluster.couchbase.svc.cluster.local"
couchbase.bootstrap.timeout: "10s"
couchbase.bucket: "our_bucket"
couchbase.topic.to.collection: "our-api.our_db.our_table=our_bucket._default.ourCollection"
couchbase.username: "*******"
couchbase.password: "*******"
key.converter: "org.apache.kafka.connect.storage.StringConverter"
key.converter.schemas.enable: "false"
value.converter: "org.apache.kafka.connect.json.JsonConverter"
value.converter.schemas.enable: "false"
connection.bucket : "our_bucket"
connection.cluster_address: "couchbase://couchbase-srv.couchbase"
couchbase.document.id: "${/id}"
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(2)
对您的第一个问题的部分答案。一种方法是您可以使用SPI转换器将UnixDateTime转换为字符串。如果要转换所有数据记录,并且输入消息包含许多DateTime字段,则可以查看JDBCTYPE并进行转换
https://debezium.io/debemumentation/decocumentation/documentation/reference/Reference/Reference/Stable/Stable/Stable/Deeflovement/Converment/Converters/converters。 html
对于提取I/U,您可以编写一个自定义SMT(单个消息变换),该自定义记录和后记录后具有操作类型(I/U/D),并在字段之前和之后进行比较三角洲。过去,当我为此尝试一些事情时,我遇到了以下内容,这很方便地作为参考。这样,您就有一个三角洲字段和一个键,它可以更新而不是更新完整文档(尽管水槽必须在某个时候支持它)
https://github.com/michelin/kafka-connect-transnect-transforms-qlik-replicate
Partial answer to your first question. One approach would be that You can use an SPI converter to convert the unixdatetime to string. if you want to convert all the datetimes and your input message contains many datetime fields, you can just look at the JDBCType and do the conversion
https://debezium.io/documentation/reference/stable/development/converters.html
As for extracting I/U , you can write a custom SMT (Single message transform) that has before and after records and also has the operation type (I/U/D) and comparing before and after fields extract the delta. In the past when i tried something for this , I bumped upon the following which came in quite handy as a reference. This way you have a delta field and a key and that can just update instead of updating the full document (though the sink has to support it will come in at some point)
https://github.com/michelin/kafka-connect-transforms-qlik-replicate
Couchbase 源连接器不支持观察单个字段。通常,与更改数据捕获相比,CouchBase源连接器更适合复制。请参阅
Couchbase kafka 接收器连接器通过内置
subdocumentsInkhandler
或n1qlSinkHandler
subdocumentsInkhandler subdocumentsInkhandler 。您可以通过配置couchbase.sink.handler
连接器配置属性,并使用 sub Document noreferrer“这是一个配置摘要,该摘要告诉连接器以使用KAFKA记录的整个值更新
bill.dateaccepped
属性。 (您还需要使用单个消息转换来从源记录中提取此字段。)如果内置的接收器处理程序不够灵活,则可以使用
codessinkhandler.java模板。
The Couchbase source connector does not support watching individual fields. In general, the Couchbase source connector is better suited for replication than for change data capture. See the caveats mentioned in the Delivery Guarantees documentation.
The Couchbase Kafka sink connector supports partial document updates via the built-in
SubDocumentSinkHandler
orN1qlSinkHandler
. You can select the sink handler by configuing thecouchbase.sink.handler
connector config property, and customize its behavior with the Sub Document Sink Handler config options.Here's a config snippet that tells the connector to update the
bill.dateAccepted
property with the entire value of the Kafka record. (You'd also need to use a Single Message Transform to extract just this field from the source record.)If the built-in sink handlers are not flexible enough, you can write your own custom sink handler using the
CustomSinkHandler.java
example as a template.