Kafka Connect。 OffsetStoragereader仅在第一个民意调查()中引发异常。在下一个Poll()中获取sourceOffset无例外
线
context.offsetstoragereader()。offset(sourcePartition());
在第一次民意调查中产生例外。 在下一次民意测验中,没有例外。是否可以在不包装 getlatestSourceOffset()的情况下进行额外检查而进行修复,例如添加字段以确定它是否是第一次民意调查?还是没有办法避免它,我们应该添加检查?
kafka-connect-api版本:0.10.2.0-cp1
2022-06-19 05:52:34,538 ERROR [pool-1-thread-1] (OffsetStorageReaderImpl.java:102) - CRITICAL: Failed to deserialize offset data when getting offsets for task with namespace CryptoPanicSourceConnector. No value for this data will be returned, which may break the task or cause it to skip some data. This could either be due to an error in the connector implementation or incompatible schema.
org.apache.kafka.connect.errors.DataException: JsonConverter with schemas.enable requires "schema" and "payload" fields and may not contain additional fields. If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration.
at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:309)
at org.apache.kafka.connect.storage.OffsetStorageReaderImpl.offsets(OffsetStorageReaderImpl.java:96)
at org.apache.kafka.connect.storage.OffsetStorageReaderImpl.offset(OffsetStorageReaderImpl.java:54)
at com.delphian.bush.CryptoPanicSourceTask.getLatestSourceOffset(CryptoPanicSourceTask.java:97)
at com.delphian.bush.CryptoPanicSourceTask.poll(CryptoPanicSourceTask.java:61)
at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:162)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:139)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:182)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
worker.properties
bootstrap.servers=localhost:29092
key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=true
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=true
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter.schemas.enable=true
rest.port=8086
rest.host.name=127.0.0.1
offset.storage.file.filename=offsets/standalone.offsets
offset.flush.interval.ms=10000
sourcetask
public List<SourceRecord> poll() throws InterruptedException {
List<SourceRecord> records = new ArrayList<>();
Optional<Long> sourceOffset = getLatestSourceOffset();
CryptoNewsResponse newsResponse = // getNewsFromApi
// Filter which news add to records based on sourceOffset. Shortened for brevity
for (CryptoNews news : filteredNews) {
records.add(generateRecordFromNews(news));
}
return records;
}
private Optional<Long> getLatestSourceOffset() {
Map<String, Object> offset = context.offsetStorageReader().offset(sourcePartition());
if (offset != null) {
Object id = offset.get("id");
if (id != null) {
Long latestOffset = Long.valueOf((String) id);
return Optional.of(latestOffset);
}
}
return Optional.empty();
}
private SourceRecord generateRecordFromNews(CryptoNews cryptoNews) {
return new SourceRecord(
sourcePartition(),
sourceOffset(cryptoNews),
config.getString(TOPIC_CONFIG),
null,
CryptoNewsSchema.NEWS_KEY_SCHEMA,
buildRecordKey(cryptoNews),
CryptoNewsSchema.NEWS_SCHEMA,
buildRecordValue(cryptoNews),
Instant.now().toEpochMilli()
);
}
private Map<String, String> sourceOffset(CryptoNews cryptoNews) {
Map<String, String> map = new HashMap<>();
map.put(CryptoNewsSchema.ID_FIELD, cryptoNews.getId());
return map;
}
更新
我不使用 avro 和 protobuf 。 我的新闻模式:
public static final Schema NEWS_SCHEMA = SchemaBuilder.struct()
.name(SCHEMA_NAME)
.version(FIRST_VERSION)
.field(NewsSourceSchema.SCHEMA_NAME, SOURCE_SCHEMA)
.field(CurrencySchema.SCHEMA_NAME, SchemaBuilder.array(CURRENCY_SCHEMA).optional())
.field(KIND_FIELD, Schema.OPTIONAL_STRING_SCHEMA)
.field(DOMAIN_FIELD, Schema.OPTIONAL_STRING_SCHEMA)
.field(TITLE_FIELD, Schema.OPTIONAL_STRING_SCHEMA)
.field(PUBLISHED_AT_FIELD, Schema.OPTIONAL_STRING_SCHEMA)
.field(SLUG_FIELD, Schema.OPTIONAL_STRING_SCHEMA)
.field(ID_FIELD, Schema.OPTIONAL_STRING_SCHEMA)
.field(URL_FIELD, Schema.OPTIONAL_STRING_SCHEMA)
.field(CREATED_AT_FIELD, Schema.OPTIONAL_STRING_SCHEMA)
.build();
public Struct toConnectData(CryptoNews cryptoNews) {
Struct struct = new Struct(CryptoNewsSchema.NEWS_SCHEMA)
.put(NewsSourceSchema.SCHEMA_NAME, NewsSourceConverter.INSTANCE.toConnectData(cryptoNews.getSource()))
.put(CryptoNewsSchema.KIND_FIELD, cryptoNews.getKind())
.put(CryptoNewsSchema.DOMAIN_FIELD, cryptoNews.getDomain())
.put(CryptoNewsSchema.TITLE_FIELD, cryptoNews.getTitle())
.put(CryptoNewsSchema.PUBLISHED_AT_FIELD, cryptoNews.getPublishedAt())
.put(CryptoNewsSchema.SLUG_FIELD, cryptoNews.getSlug())
.put(CryptoNewsSchema.ID_FIELD, cryptoNews.getId())
.put(CryptoNewsSchema.URL_FIELD, cryptoNews.getUrl())
.put(CryptoNewsSchema.CREATED_AT_FIELD, cryptoNews.getCreatedAt());
List<Currency> currencies = Optional.ofNullable(cryptoNews.getCurrencies()).orElse(new ArrayList<>());
final List<Struct> items = currencies.stream()
.map(CONVERTER::toConnectData)
.collect(Collectors.toList());
struct.put(CurrencySchema.SCHEMA_NAME, items);
return struct;
}
更新2
connector.properties
name=CryptoPanicSourceConnector
tasks.max=1
connector.class=com.delphian.bush.CryptoPanicSourceConnector
topic=crypto-news
启动命令:
connect-standalone config/worker.properties config/custom-connector.properties
Line
context.offsetStorageReader().offset(sourcePartition());
produces exception at the first poll.
On the next polling, there is no exception. Is it possible to fix it without wrapping extra checking around getLatestSourceOffset() like adding field to determine if it's the first poll? Or there is no way to avoid it and we should add checking?
kafka-connect-api version: 0.10.2.0-cp1
2022-06-19 05:52:34,538 ERROR [pool-1-thread-1] (OffsetStorageReaderImpl.java:102) - CRITICAL: Failed to deserialize offset data when getting offsets for task with namespace CryptoPanicSourceConnector. No value for this data will be returned, which may break the task or cause it to skip some data. This could either be due to an error in the connector implementation or incompatible schema.
org.apache.kafka.connect.errors.DataException: JsonConverter with schemas.enable requires "schema" and "payload" fields and may not contain additional fields. If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration.
at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:309)
at org.apache.kafka.connect.storage.OffsetStorageReaderImpl.offsets(OffsetStorageReaderImpl.java:96)
at org.apache.kafka.connect.storage.OffsetStorageReaderImpl.offset(OffsetStorageReaderImpl.java:54)
at com.delphian.bush.CryptoPanicSourceTask.getLatestSourceOffset(CryptoPanicSourceTask.java:97)
at com.delphian.bush.CryptoPanicSourceTask.poll(CryptoPanicSourceTask.java:61)
at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:162)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:139)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:182)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
worker.properties
bootstrap.servers=localhost:29092
key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=true
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=true
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter.schemas.enable=true
rest.port=8086
rest.host.name=127.0.0.1
offset.storage.file.filename=offsets/standalone.offsets
offset.flush.interval.ms=10000
SourceTask
public List<SourceRecord> poll() throws InterruptedException {
List<SourceRecord> records = new ArrayList<>();
Optional<Long> sourceOffset = getLatestSourceOffset();
CryptoNewsResponse newsResponse = // getNewsFromApi
// Filter which news add to records based on sourceOffset. Shortened for brevity
for (CryptoNews news : filteredNews) {
records.add(generateRecordFromNews(news));
}
return records;
}
private Optional<Long> getLatestSourceOffset() {
Map<String, Object> offset = context.offsetStorageReader().offset(sourcePartition());
if (offset != null) {
Object id = offset.get("id");
if (id != null) {
Long latestOffset = Long.valueOf((String) id);
return Optional.of(latestOffset);
}
}
return Optional.empty();
}
private SourceRecord generateRecordFromNews(CryptoNews cryptoNews) {
return new SourceRecord(
sourcePartition(),
sourceOffset(cryptoNews),
config.getString(TOPIC_CONFIG),
null,
CryptoNewsSchema.NEWS_KEY_SCHEMA,
buildRecordKey(cryptoNews),
CryptoNewsSchema.NEWS_SCHEMA,
buildRecordValue(cryptoNews),
Instant.now().toEpochMilli()
);
}
private Map<String, String> sourceOffset(CryptoNews cryptoNews) {
Map<String, String> map = new HashMap<>();
map.put(CryptoNewsSchema.ID_FIELD, cryptoNews.getId());
return map;
}
UPDATE
I don't use Avro and Protobuf.
My news schema:
public static final Schema NEWS_SCHEMA = SchemaBuilder.struct()
.name(SCHEMA_NAME)
.version(FIRST_VERSION)
.field(NewsSourceSchema.SCHEMA_NAME, SOURCE_SCHEMA)
.field(CurrencySchema.SCHEMA_NAME, SchemaBuilder.array(CURRENCY_SCHEMA).optional())
.field(KIND_FIELD, Schema.OPTIONAL_STRING_SCHEMA)
.field(DOMAIN_FIELD, Schema.OPTIONAL_STRING_SCHEMA)
.field(TITLE_FIELD, Schema.OPTIONAL_STRING_SCHEMA)
.field(PUBLISHED_AT_FIELD, Schema.OPTIONAL_STRING_SCHEMA)
.field(SLUG_FIELD, Schema.OPTIONAL_STRING_SCHEMA)
.field(ID_FIELD, Schema.OPTIONAL_STRING_SCHEMA)
.field(URL_FIELD, Schema.OPTIONAL_STRING_SCHEMA)
.field(CREATED_AT_FIELD, Schema.OPTIONAL_STRING_SCHEMA)
.build();
public Struct toConnectData(CryptoNews cryptoNews) {
Struct struct = new Struct(CryptoNewsSchema.NEWS_SCHEMA)
.put(NewsSourceSchema.SCHEMA_NAME, NewsSourceConverter.INSTANCE.toConnectData(cryptoNews.getSource()))
.put(CryptoNewsSchema.KIND_FIELD, cryptoNews.getKind())
.put(CryptoNewsSchema.DOMAIN_FIELD, cryptoNews.getDomain())
.put(CryptoNewsSchema.TITLE_FIELD, cryptoNews.getTitle())
.put(CryptoNewsSchema.PUBLISHED_AT_FIELD, cryptoNews.getPublishedAt())
.put(CryptoNewsSchema.SLUG_FIELD, cryptoNews.getSlug())
.put(CryptoNewsSchema.ID_FIELD, cryptoNews.getId())
.put(CryptoNewsSchema.URL_FIELD, cryptoNews.getUrl())
.put(CryptoNewsSchema.CREATED_AT_FIELD, cryptoNews.getCreatedAt());
List<Currency> currencies = Optional.ofNullable(cryptoNews.getCurrencies()).orElse(new ArrayList<>());
final List<Struct> items = currencies.stream()
.map(CONVERTER::toConnectData)
.collect(Collectors.toList());
struct.put(CurrencySchema.SCHEMA_NAME, items);
return struct;
}
UPDATE 2
connector.properties
name=CryptoPanicSourceConnector
tasks.max=1
connector.class=com.delphian.bush.CryptoPanicSourceConnector
topic=crypto-news
Startup command:
connect-standalone config/worker.properties config/custom-connector.properties
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
当使用Connect的普通JSON数据时,您可能会看到此错误消息:org.apache.kafka.connect.errors.dataexception:带有schemas的jsondeserializer。带有schemas.able需要“ schema”和“有效载荷”字段,并且可能不包含其他字段。
您需要将schemas.shemas.shemas的参数设置为false的纯文本,而没有模式。
When using plain JSON data with Connect, you may see this error message: org.apache.kafka.connect.errors.DataException: JsonDeserializer with schemas.enable requires "schema" and "payload" fields and may not contain additional fields.
You will need to set the schemas.enable parameters for the converter to false for plain text with no schema.