Kafka Connect。 OffsetStoragereader仅在第一个民意调查()中引发异常。在下一个Poll()中获取sourceOffset无例外

发布于 2025-02-09 05:37:50 字数 7029 浏览 2 评论 0原文

线

context.offsetstoragereader()。offset(sourcePartition());

在第一次民意调查中产生例外。 在下一次民意测验中,没有例外。是否可以在不包装 getlatestSourceOffset()的情况下进行额外检查而进行修复,例如添加字段以确定它是否是第一次民意调查?还是没有办法避免它,我们应该添加检查?

kafka-connect-api版本:0.10.2.0-cp1

2022-06-19 05:52:34,538 ERROR [pool-1-thread-1] (OffsetStorageReaderImpl.java:102) - CRITICAL: Failed to deserialize offset data when getting offsets for task with namespace CryptoPanicSourceConnector. No value for this data will be returned, which may break the task or cause it to skip some data. This could either be due to an error in the connector implementation or incompatible schema.
org.apache.kafka.connect.errors.DataException: JsonConverter with schemas.enable requires "schema" and "payload" fields and may not contain additional fields. If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration.
        at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:309)
        at org.apache.kafka.connect.storage.OffsetStorageReaderImpl.offsets(OffsetStorageReaderImpl.java:96)
        at org.apache.kafka.connect.storage.OffsetStorageReaderImpl.offset(OffsetStorageReaderImpl.java:54)
        at com.delphian.bush.CryptoPanicSourceTask.getLatestSourceOffset(CryptoPanicSourceTask.java:97)
        at com.delphian.bush.CryptoPanicSourceTask.poll(CryptoPanicSourceTask.java:61)
        at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:162)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:139)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:182)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)

worker.properties

bootstrap.servers=localhost:29092
key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=true

internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=true
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter.schemas.enable=true

rest.port=8086
rest.host.name=127.0.0.1
offset.storage.file.filename=offsets/standalone.offsets
offset.flush.interval.ms=10000

sourcetask

    public List<SourceRecord> poll() throws InterruptedException {
        List<SourceRecord> records = new ArrayList<>();
        Optional<Long> sourceOffset = getLatestSourceOffset();
        CryptoNewsResponse newsResponse = // getNewsFromApi

        // Filter which news add to records based on sourceOffset. Shortened for brevity 
        for (CryptoNews news : filteredNews) {
            records.add(generateRecordFromNews(news));
        }

        return records;
    }
    
    
   private Optional<Long> getLatestSourceOffset() {
        Map<String, Object> offset = context.offsetStorageReader().offset(sourcePartition());
        if (offset != null) {
            Object id = offset.get("id");
            if (id != null) {
                Long latestOffset = Long.valueOf((String) id);
                return Optional.of(latestOffset);
            }
        }
        return Optional.empty();
    }
    
    private SourceRecord generateRecordFromNews(CryptoNews cryptoNews) {
        return new SourceRecord(
                sourcePartition(),
                sourceOffset(cryptoNews),
                config.getString(TOPIC_CONFIG),
                null,
                CryptoNewsSchema.NEWS_KEY_SCHEMA,
                buildRecordKey(cryptoNews),
                CryptoNewsSchema.NEWS_SCHEMA,
                buildRecordValue(cryptoNews),
                Instant.now().toEpochMilli()
        );
    }
    
    

    private Map<String, String> sourceOffset(CryptoNews cryptoNews) {
        Map<String, String> map = new HashMap<>();
        map.put(CryptoNewsSchema.ID_FIELD, cryptoNews.getId());
        return map;
    }

更新

我不使用 avro protobuf 。 我的新闻模式:

public static final Schema NEWS_SCHEMA = SchemaBuilder.struct()
            .name(SCHEMA_NAME)
            .version(FIRST_VERSION)
            .field(NewsSourceSchema.SCHEMA_NAME,  SOURCE_SCHEMA)
            .field(CurrencySchema.SCHEMA_NAME, SchemaBuilder.array(CURRENCY_SCHEMA).optional())
            .field(KIND_FIELD, Schema.OPTIONAL_STRING_SCHEMA)
            .field(DOMAIN_FIELD, Schema.OPTIONAL_STRING_SCHEMA)
            .field(TITLE_FIELD, Schema.OPTIONAL_STRING_SCHEMA)
            .field(PUBLISHED_AT_FIELD, Schema.OPTIONAL_STRING_SCHEMA)
            .field(SLUG_FIELD, Schema.OPTIONAL_STRING_SCHEMA)
            .field(ID_FIELD, Schema.OPTIONAL_STRING_SCHEMA)
            .field(URL_FIELD, Schema.OPTIONAL_STRING_SCHEMA)
            .field(CREATED_AT_FIELD, Schema.OPTIONAL_STRING_SCHEMA)
            .build();
            
     public Struct toConnectData(CryptoNews cryptoNews) {
        Struct struct = new Struct(CryptoNewsSchema.NEWS_SCHEMA)
                .put(NewsSourceSchema.SCHEMA_NAME, NewsSourceConverter.INSTANCE.toConnectData(cryptoNews.getSource()))
                .put(CryptoNewsSchema.KIND_FIELD, cryptoNews.getKind())
                .put(CryptoNewsSchema.DOMAIN_FIELD, cryptoNews.getDomain())
                .put(CryptoNewsSchema.TITLE_FIELD, cryptoNews.getTitle())
                .put(CryptoNewsSchema.PUBLISHED_AT_FIELD, cryptoNews.getPublishedAt())
                .put(CryptoNewsSchema.SLUG_FIELD, cryptoNews.getSlug())
                .put(CryptoNewsSchema.ID_FIELD, cryptoNews.getId())
                .put(CryptoNewsSchema.URL_FIELD, cryptoNews.getUrl())
                .put(CryptoNewsSchema.CREATED_AT_FIELD, cryptoNews.getCreatedAt());

        List<Currency> currencies = Optional.ofNullable(cryptoNews.getCurrencies()).orElse(new ArrayList<>());
        final List<Struct> items = currencies.stream()
                .map(CONVERTER::toConnectData)
                .collect(Collectors.toList());
        struct.put(CurrencySchema.SCHEMA_NAME, items);
        return struct;
    }

更新2

connector.properties

name=CryptoPanicSourceConnector
tasks.max=1
connector.class=com.delphian.bush.CryptoPanicSourceConnector
topic=crypto-news

启动命令:

connect-standalone config/worker.properties config/custom-connector.properties

Line

context.offsetStorageReader().offset(sourcePartition());

produces exception at the first poll.
On the next polling, there is no exception. Is it possible to fix it without wrapping extra checking around getLatestSourceOffset() like adding field to determine if it's the first poll? Or there is no way to avoid it and we should add checking?

kafka-connect-api version: 0.10.2.0-cp1

2022-06-19 05:52:34,538 ERROR [pool-1-thread-1] (OffsetStorageReaderImpl.java:102) - CRITICAL: Failed to deserialize offset data when getting offsets for task with namespace CryptoPanicSourceConnector. No value for this data will be returned, which may break the task or cause it to skip some data. This could either be due to an error in the connector implementation or incompatible schema.
org.apache.kafka.connect.errors.DataException: JsonConverter with schemas.enable requires "schema" and "payload" fields and may not contain additional fields. If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration.
        at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:309)
        at org.apache.kafka.connect.storage.OffsetStorageReaderImpl.offsets(OffsetStorageReaderImpl.java:96)
        at org.apache.kafka.connect.storage.OffsetStorageReaderImpl.offset(OffsetStorageReaderImpl.java:54)
        at com.delphian.bush.CryptoPanicSourceTask.getLatestSourceOffset(CryptoPanicSourceTask.java:97)
        at com.delphian.bush.CryptoPanicSourceTask.poll(CryptoPanicSourceTask.java:61)
        at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:162)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:139)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:182)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)

worker.properties

bootstrap.servers=localhost:29092
key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=true

internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=true
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter.schemas.enable=true

rest.port=8086
rest.host.name=127.0.0.1
offset.storage.file.filename=offsets/standalone.offsets
offset.flush.interval.ms=10000

SourceTask

    public List<SourceRecord> poll() throws InterruptedException {
        List<SourceRecord> records = new ArrayList<>();
        Optional<Long> sourceOffset = getLatestSourceOffset();
        CryptoNewsResponse newsResponse = // getNewsFromApi

        // Filter which news add to records based on sourceOffset. Shortened for brevity 
        for (CryptoNews news : filteredNews) {
            records.add(generateRecordFromNews(news));
        }

        return records;
    }
    
    
   private Optional<Long> getLatestSourceOffset() {
        Map<String, Object> offset = context.offsetStorageReader().offset(sourcePartition());
        if (offset != null) {
            Object id = offset.get("id");
            if (id != null) {
                Long latestOffset = Long.valueOf((String) id);
                return Optional.of(latestOffset);
            }
        }
        return Optional.empty();
    }
    
    private SourceRecord generateRecordFromNews(CryptoNews cryptoNews) {
        return new SourceRecord(
                sourcePartition(),
                sourceOffset(cryptoNews),
                config.getString(TOPIC_CONFIG),
                null,
                CryptoNewsSchema.NEWS_KEY_SCHEMA,
                buildRecordKey(cryptoNews),
                CryptoNewsSchema.NEWS_SCHEMA,
                buildRecordValue(cryptoNews),
                Instant.now().toEpochMilli()
        );
    }
    
    

    private Map<String, String> sourceOffset(CryptoNews cryptoNews) {
        Map<String, String> map = new HashMap<>();
        map.put(CryptoNewsSchema.ID_FIELD, cryptoNews.getId());
        return map;
    }

UPDATE

I don't use Avro and Protobuf.
My news schema:

public static final Schema NEWS_SCHEMA = SchemaBuilder.struct()
            .name(SCHEMA_NAME)
            .version(FIRST_VERSION)
            .field(NewsSourceSchema.SCHEMA_NAME,  SOURCE_SCHEMA)
            .field(CurrencySchema.SCHEMA_NAME, SchemaBuilder.array(CURRENCY_SCHEMA).optional())
            .field(KIND_FIELD, Schema.OPTIONAL_STRING_SCHEMA)
            .field(DOMAIN_FIELD, Schema.OPTIONAL_STRING_SCHEMA)
            .field(TITLE_FIELD, Schema.OPTIONAL_STRING_SCHEMA)
            .field(PUBLISHED_AT_FIELD, Schema.OPTIONAL_STRING_SCHEMA)
            .field(SLUG_FIELD, Schema.OPTIONAL_STRING_SCHEMA)
            .field(ID_FIELD, Schema.OPTIONAL_STRING_SCHEMA)
            .field(URL_FIELD, Schema.OPTIONAL_STRING_SCHEMA)
            .field(CREATED_AT_FIELD, Schema.OPTIONAL_STRING_SCHEMA)
            .build();
            
     public Struct toConnectData(CryptoNews cryptoNews) {
        Struct struct = new Struct(CryptoNewsSchema.NEWS_SCHEMA)
                .put(NewsSourceSchema.SCHEMA_NAME, NewsSourceConverter.INSTANCE.toConnectData(cryptoNews.getSource()))
                .put(CryptoNewsSchema.KIND_FIELD, cryptoNews.getKind())
                .put(CryptoNewsSchema.DOMAIN_FIELD, cryptoNews.getDomain())
                .put(CryptoNewsSchema.TITLE_FIELD, cryptoNews.getTitle())
                .put(CryptoNewsSchema.PUBLISHED_AT_FIELD, cryptoNews.getPublishedAt())
                .put(CryptoNewsSchema.SLUG_FIELD, cryptoNews.getSlug())
                .put(CryptoNewsSchema.ID_FIELD, cryptoNews.getId())
                .put(CryptoNewsSchema.URL_FIELD, cryptoNews.getUrl())
                .put(CryptoNewsSchema.CREATED_AT_FIELD, cryptoNews.getCreatedAt());

        List<Currency> currencies = Optional.ofNullable(cryptoNews.getCurrencies()).orElse(new ArrayList<>());
        final List<Struct> items = currencies.stream()
                .map(CONVERTER::toConnectData)
                .collect(Collectors.toList());
        struct.put(CurrencySchema.SCHEMA_NAME, items);
        return struct;
    }

UPDATE 2

connector.properties

name=CryptoPanicSourceConnector
tasks.max=1
connector.class=com.delphian.bush.CryptoPanicSourceConnector
topic=crypto-news

Startup command:

connect-standalone config/worker.properties config/custom-connector.properties

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

忘东忘西忘不掉你 2025-02-16 05:37:50

当使用Connect的普通JSON数据时,您可能会看到此错误消息:org.apache.kafka.connect.errors.dataexception:带有schemas的jsondeserializer。带有schemas.able需要“ schema”和“有效载荷”字段,并且可能不包含其他字段。

您需要将schemas.shemas.shemas的参数设置为false的纯文本,而没有模式。

bootstrap.servers=localhost:29092

key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false

value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false

When using plain JSON data with Connect, you may see this error message: org.apache.kafka.connect.errors.DataException: JsonDeserializer with schemas.enable requires "schema" and "payload" fields and may not contain additional fields.

You will need to set the schemas.enable parameters for the converter to false for plain text with no schema.

bootstrap.servers=localhost:29092

key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false

value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文