Debezium 连接器任务处于未分配状态

发布于 2025-01-11 17:31:51 字数 2628 浏览 1 评论 0原文

今天,三个节点中有一个不同步,但又恢复了。 现在,当我检查连接器任务的状态时,即使连接器处于“运行”状态,它也会显示为“未分配”。 工作人员以分布式模式运行。

我尝试重新启动连接器,但它仍然未分配并指向带回集群的同一工作节点。

以下是我的其中一个工作人员的属性文件,该文件在所有工作人员中都是相同的:

bootstrap.servers=something:9092
group.id=ffb-supply-kafka-connect
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
offset.storage.topic=supply-kafka-connect-offsets
config.storage.topic=supply-kafka-connect-configs
status.storage.topic=supply-kafka-connect-status
plugin.path=/var/lib/3p-kafka/connect-plugins/,/usr/share/3p-kafka/libs

连接器配置:

  "connector.class": "io.debezium.connector.mysql.MySqlConnector",
  "snapshot.locking.mode": "minimal",
  "transforms.insertKey.fields": "external_shipment_id",
  "tasks.max": "1",
  "database.history.kafka.topic": "seller_oms_log_history_20220304200010",
  "transforms": "unwrap,insertKey,extractKey,alterKey",
  "transforms.extractKey.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
  "include.schema.changes": "false",
  "table.whitelist": "seller_oms.shipment_log",
  "database.history.kafka.recovery.poll.interval.ms": "5000",
  "transforms.unwrap.drop.tombstones": "false",
  "database.history.skip.unparseable.ddl": "true",
  "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
  "value.converter": "org.apache.kafka.connect.json.JsonConverter",
  "database.whitelist": "seller_oms",
  "transforms.alterKey.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
  "key.converter": "org.apache.kafka.connect.json.JsonConverter",
  "database.user": "cdc_user",
  "transforms.extractKey.field": "external_shipment_id",
  "database.server.id": "20220304200010",
  "transforms.insertKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
  "database.history.kafka.bootstrap.servers": "XX:9092,YY:9092,ZZ:9092",
  "transforms.alterKey.renames": "__source_ts_ms:updated_by_debezium_at",
  "database.server.name": "seller_oms_log",
  "heartbeat.interval.ms": "5000",
  "database.port": "3306",
  "key.converter.schemas.enable": "false",
  "database.hostname": "master.in",
  "database.password": "XXYYSS",
  "value.converter.schemas.enable": "false",
  "name": "shipment-log-connector-20220304200010",
  "errors.tolerance": "all",
  "transforms.unwrap.add.fields": "source.ts_ms",
  "snapshot.mode": "schema_only"

我创建了一个具有相同配置的新连接器,它在现有连接器失败的同一工作节点上工作正常。 我不确定为什么旧管道没有出现并进入任务的运行状态。 当节点与集群断开连接时是否需要执行某些操作,如何在连接器返回后恢复该连接器? 根据我的理解,如果一个工人宕机了,它应该自动将任务分配给另一个工人。

Today one of the nodes out of 3 went out of sync and was brought back up.
Now when I check the status of the connector task it shows as UNASSIGNED even though the connector is in a RUNNING state.
The workers are running in distributed mode.

I tried to restart the connector, but it's still UNASSIGNED and points to the same worker node which was brought back into the cluster.

Following is my properties file of one of the worker which is same across all workers:

bootstrap.servers=something:9092
group.id=ffb-supply-kafka-connect
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
offset.storage.topic=supply-kafka-connect-offsets
config.storage.topic=supply-kafka-connect-configs
status.storage.topic=supply-kafka-connect-status
plugin.path=/var/lib/3p-kafka/connect-plugins/,/usr/share/3p-kafka/libs

Connector config:

  "connector.class": "io.debezium.connector.mysql.MySqlConnector",
  "snapshot.locking.mode": "minimal",
  "transforms.insertKey.fields": "external_shipment_id",
  "tasks.max": "1",
  "database.history.kafka.topic": "seller_oms_log_history_20220304200010",
  "transforms": "unwrap,insertKey,extractKey,alterKey",
  "transforms.extractKey.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
  "include.schema.changes": "false",
  "table.whitelist": "seller_oms.shipment_log",
  "database.history.kafka.recovery.poll.interval.ms": "5000",
  "transforms.unwrap.drop.tombstones": "false",
  "database.history.skip.unparseable.ddl": "true",
  "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
  "value.converter": "org.apache.kafka.connect.json.JsonConverter",
  "database.whitelist": "seller_oms",
  "transforms.alterKey.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
  "key.converter": "org.apache.kafka.connect.json.JsonConverter",
  "database.user": "cdc_user",
  "transforms.extractKey.field": "external_shipment_id",
  "database.server.id": "20220304200010",
  "transforms.insertKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
  "database.history.kafka.bootstrap.servers": "XX:9092,YY:9092,ZZ:9092",
  "transforms.alterKey.renames": "__source_ts_ms:updated_by_debezium_at",
  "database.server.name": "seller_oms_log",
  "heartbeat.interval.ms": "5000",
  "database.port": "3306",
  "key.converter.schemas.enable": "false",
  "database.hostname": "master.in",
  "database.password": "XXYYSS",
  "value.converter.schemas.enable": "false",
  "name": "shipment-log-connector-20220304200010",
  "errors.tolerance": "all",
  "transforms.unwrap.add.fields": "source.ts_ms",
  "snapshot.mode": "schema_only"

I created a new connector with same configs and it works fine on the same worker node where it was failing for existing connector.
I am not sure why the old pipeline didn't come up and got into RUNNING state for task.
Is there something that needs to be done when node gets disconnected from the cluster, how to resume that connector once it is back?
As per my understanding, if one worker goes down, it should Automatically assign the task to another worker.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

忘年祭陌 2025-01-18 17:31:51

连接器恢复后是否恢复?

运行分布式连接脚本后,它应该重新平衡任务。

否则,会有一个 /restart API 端点

如果一个worker宕机了,它应该自动将任务分配给另一个worker

根据我的经验,当任务实际上失败时,它们仍然失败,并且如果是临时故障并且日志没有显示任何有用的内容,则需要重新启动端点。但是,您的 errors.tolerance 设置可能有助于在一定程度上隔离问题

resume that connector once it is back?

After you run the connect-distributed script, it should rebalance the tasks.

Otherwise, there's a /restart API endpoint

if one worker goes down, it should Automatically assign the task to another worker

In my experience, when tasks actually fail, they remain failed, and the restart endpoint needs hit, if it's a temp failure and the logs don't show anything useful. However, your errors.tolerance setting may help isolate the problem somewhat

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文