如何使 Confluence sftp 连接器重复处理 csv 文件

发布于 2025-01-14 11:52:10 字数 3116 浏览 5 评论 0原文

美好的一天,

我正在尝试点击 Confluence sftp 连接器的链接: https://docs.confluence.io/kafka- connect-sftp/current/source-connector/index.html

以下是我的 sftp.json,基本上没有什么不同,但我只是输入了我的本地信息:

{
  "name": "CsvSFTP",
  "config": {
    "tasks.max": "1",
    "connector.class": "io.confluent.connect.sftp.SftpCsvSourceConnector",
    "cleanup.policy":"MOVE",
    "behavior.on.error":"IGNORE",
    "input.path": "/home/meow/Workspace/confluentPlatform/confluent-7.0.1",
    "error.path": "/home/meow/Workspace/confluentPlatform/confluent-7.0.1/error",
    "finished.path": "/home/meow/Workspace/confluentPlatform/confluent-7.0.1/finished",
    "input.file.pattern": "csv-sftp-source.csv",
    "sftp.username":"meow",
    "sftp.password":"password",
    "sftp.host":"localhost",
    "sftp.port":"22",
    "kafka.topic": "sftp-testing-topic",
    "csv.first.row.as.header": "true",
    "schema.generation.enabled": "true"
  }
}

之后,我运行以下命令来运行连接器:

confluent local services connect connector load CsvSFTP --config sftp.json

接下来,我将相同的 csv 文件上传到输入文件夹。是的,我看到文件消失并被移至 finished.path

我使用以下消费者命令来检查推送到主题的数据:

[meow@localhost bin]$ ./kafka-avro-console-consumer     --bootstrap-server localhost:9092     --property schema.registry.url=http://localhost:8081     --topic sftp-testing-topic2     --from-beginning
{"id":{"string":"1"},"first_name":{"string":"Salmon"},"last_name":{"string":"Baitman"},"email":{"string":"[email protected]"},"gender":{"string":"Male"},"ip_address":{"string":"120.181.75.98"},"last_login":{"string":"2015-03-01T06:01:15Z"},"account_balance":{"string":"17462.66"},"country":{"string":"IT"},"favorite_color":{"string":"#f09bc0"}}
{"id":{"string":"2"},"first_name":{"string":"Debby"},"last_name":{"string":"Brea"},"email":{"string":"[email protected]"},"gender":{"string":"Female"},"ip_address":{"string":"153.239.187.49"},"last_login":{"string":"2018-10-21T12:27:12Z"},"account_balance":{"string":"14693.49"},"country":{"string":"CZ"},"favorite_color":{"string":"#73893a"}}

到目前为止一切都很好,现在一切正常。

之后,我使用回 csv 文件,并将名字从“Salmon”编辑为“Salmon2”。然后我再次上传csv文件,但此时该文件尚未被处理。当我检查连接器状态及其运行时,即使我检查 connect.log,我也只看到它打印没有生成任何记录:

[2022-03-16 17:14:22,129] INFO [CsvSFTP|task-0|offsets] WorkerSourceTask{id=CsvSFTP2-0} Either no records were produced by the task since the last offset commit, or every record has been filtered out by a transformation or dropped due to transformation or conversion errors. (org.apache.kafka.connect.runtime.WorkerSourceTask:503)

然后,我卸载连接器并再次加载它,我看到文件再次消失并移动到 <代码>完成.路径。我预计消费者会打印另外 2 行记录,其中 1 行包含我对名字的更改,即“Salmon2”,但事实并非如此,消费者只是保持不变。

我可以知道我做错了什么吗?或者说这就是预期的结果?

Good day,

I am trying to follow this link for the Confluent sftp connector:
https://docs.confluent.io/kafka-connect-sftp/current/source-connector/index.html

The following is my sftp.json, basically nothing much different, but I just put in my local info:

{
  "name": "CsvSFTP",
  "config": {
    "tasks.max": "1",
    "connector.class": "io.confluent.connect.sftp.SftpCsvSourceConnector",
    "cleanup.policy":"MOVE",
    "behavior.on.error":"IGNORE",
    "input.path": "/home/meow/Workspace/confluentPlatform/confluent-7.0.1",
    "error.path": "/home/meow/Workspace/confluentPlatform/confluent-7.0.1/error",
    "finished.path": "/home/meow/Workspace/confluentPlatform/confluent-7.0.1/finished",
    "input.file.pattern": "csv-sftp-source.csv",
    "sftp.username":"meow",
    "sftp.password":"password",
    "sftp.host":"localhost",
    "sftp.port":"22",
    "kafka.topic": "sftp-testing-topic",
    "csv.first.row.as.header": "true",
    "schema.generation.enabled": "true"
  }
}

After this, I run the following command to run the connector:

confluent local services connect connector load CsvSFTP --config sftp.json

Next, I upload the same csv file to the input folder. Yes, I saw the file disappear and being move to finished.path.

I use the following consumer command to check the data being push to topic:

[meow@localhost bin]$ ./kafka-avro-console-consumer     --bootstrap-server localhost:9092     --property schema.registry.url=http://localhost:8081     --topic sftp-testing-topic2     --from-beginning
{"id":{"string":"1"},"first_name":{"string":"Salmon"},"last_name":{"string":"Baitman"},"email":{"string":"[email protected]"},"gender":{"string":"Male"},"ip_address":{"string":"120.181.75.98"},"last_login":{"string":"2015-03-01T06:01:15Z"},"account_balance":{"string":"17462.66"},"country":{"string":"IT"},"favorite_color":{"string":"#f09bc0"}}
{"id":{"string":"2"},"first_name":{"string":"Debby"},"last_name":{"string":"Brea"},"email":{"string":"[email protected]"},"gender":{"string":"Female"},"ip_address":{"string":"153.239.187.49"},"last_login":{"string":"2018-10-21T12:27:12Z"},"account_balance":{"string":"14693.49"},"country":{"string":"CZ"},"favorite_color":{"string":"#73893a"}}

So far so good, everything is working fine now.

After this, I use back the csv file, and edit the first name from 'Salmon' to 'Salmon2'. And then I upload again the csv file, but at this time, the file not being processed. As I check the connector status, its running, even I check the connect.log, I only saw it printing no records were produced:

[2022-03-16 17:14:22,129] INFO [CsvSFTP|task-0|offsets] WorkerSourceTask{id=CsvSFTP2-0} Either no records were produced by the task since the last offset commit, or every record has been filtered out by a transformation or dropped due to transformation or conversion errors. (org.apache.kafka.connect.runtime.WorkerSourceTask:503)

Then, I unload the connector and load it again, and I saw the file disappear again and move to finished.path. I expected the consumer will print another 2 line of record, which 1 of it contain my changes on the first name, which is "Salmon2", but it didnt, the consumer just remain as the same.

May I know is there any mistaken I did? Or this is the expected result?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

染柒℉ 2025-01-21 11:52:10

这听起来像是预期的行为。源连接器(大部分)维护偏移量主题中的状态。如果它处理了该文件,那么如果连接器重新启动或以其他方式重新加载,它会跟踪它不应该再次执行该操作。

您需要修改此项 ,或更改连接器的名称以使其对于“重新开始”是唯一的

This sounds like expected behavior. Source connectors (mostly) maintain state in an offsets topic. If it processed the file, then it tracks that it shouldn't do it again, if the connector ever restarted or is otherwise reloaded.

You'll need to modify this, or change the name of the connector to make it be unique to "start over"

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文