Kafka Microsoftsqlserversource connect-主题中的重复条目
我们需要设置Kafka Microsoftsqlserverserce Connect。 这是为了捕获在Azure SQL数据库中的一个销售表中执行的所有交易(插入/更新)。
为了带来对上述源连接的支持,我们最初在数据库和表级别启用了CDC。 我们还从源表中创建了一个视图,该视图将是源连接的输入(Connector Configuration中的TableType =视图)。 一旦我们在连接器和数据库级别上完成设置后,就可以看到消息与连接器一起自动创建的各个主题,就像在表级别发生新的更新/插入时一样。
我们在测试时观察到的一种奇怪的行为是,当我们停止测试时,主题中收到的最后一条消息开始重复,直到发出新的消息到达。
您能帮我们了解这是否是系统行为吗? 还是我们错过了导致这些重复条目的任何配置。 请指导我们如何解决上述重复问题。
附加快照
连接器摘要
Connector Class = MicrosoftSqlServerSource
Max Tasks = 1
kafka.auth.mode = SERVICE_ACCOUNT
kafka.service.account.id = **********
topic.prefix = ***********
connection.host = **************8
connection.port = 1433
connection.user = ***************
db.name = **************88
table.whitelist = item_status_view
timestamp.column.name = ProcessedDateTime
incrementing.column.name = SalesandRefundItemStatusID
table.types = VIEW
schema.pattern = dbo
db.timezone = Europe/London
mode = timestamp+incrementing
timestamp.initial = -1
poll.interval.ms = 10000
batch.max.rows = 1000
timestamp.delay.interval.ms = 30000
output.data.format = JSON
We have a requirement to set up Kafka MicrosoftSqlServerSource connect.
This is to capture all the transactions(insert/update) performing in one of the sales table in Azure SQL database.
In order to bring the support for the above source connect, we have initially enabled CDC at both database and table level.
We also created a view out of the source table which will be the input for the source connect( TableType = VIEW in connector configuration).
Once we complete the set up at both connector as well as database level, we could see messages flowing to the respective topic created automatically along with the connector as when a new updations/insertions happened at the table level.
One strange behavior we observed while testing is that when we stopped the testing, the last message received in the topic starts getting duplicated until a new message arrived.
Could you please help us to understand whether this is a system behavior?
Or Did we miss any configuration that has resulted in these duplicate entries.
Please guide us on how we can tackle the above duplicate issue.
Attaching the snapshot
Connector Summary
Connector Class = MicrosoftSqlServerSource
Max Tasks = 1
kafka.auth.mode = SERVICE_ACCOUNT
kafka.service.account.id = **********
topic.prefix = ***********
connection.host = **************8
connection.port = 1433
connection.user = ***************
db.name = **************88
table.whitelist = item_status_view
timestamp.column.name = ProcessedDateTime
incrementing.column.name = SalesandRefundItemStatusID
table.types = VIEW
schema.pattern = dbo
db.timezone = Europe/London
mode = timestamp+incrementing
timestamp.initial = -1
poll.interval.ms = 10000
batch.max.rows = 1000
timestamp.delay.interval.ms = 30000
output.data.format = JSON
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
您所描述的是由
IT控制的应该保存最后的时间戳,然后仅查询时间戳比最后一个……如果您要大于,或等于,那肯定是应该报告的错误。
或者您应该
作为替代方案,则可以使用Debezium(运行自己的连接器,不要使用Contruent Cloud Offering)来真正流所有表操作。
What you're describing is controlled by
It should save the last timestamp, then query only for timestamps greater than the last... If you are getting greater than or equal to, then that is certainly a bug that should be reported.
Or you should read the docs
As an alternative, you could use Debezium (run your own Connector, not use Confluent Cloud offering) to truly stream all table operations.