Kafka Connect。如何处理自定义对象列表,指定模式和构建sourcerecord值

发布于 2025-02-06 05:04:17 字数 3714 浏览 2 评论 0原文

我有DTO Cryptonews。其中包含

List<Currencies> currencies

我想将“货币” 字段保存到 sourcerecord 时。 无法弄清楚如何:

  1. 在模式中声明它。
  2. 建筑价值时将其传递给结构对象。

我的尝试在这个例外结束了: 架构类型结构的无效Java对象:com.dto.currencies类

kafka connect不提供明确的示例,当列表中的对象需要其架构时,如何处理案例。 我还尝试采用与Kafka测试用例相似的方法,但行之有效。 https://github.com/apache/kafka/kafka/blob/trunk/connect/api/src/src/test/java/java/java/org/opache/kafkace/kafka/connect/data/structta/sstructtest.java-uncstrate.java-ul95-l95-l95-l98 < /a>

如何做?

kafka-connect-api version: 0.10.2.0-cp1
value and key converter: org.apache.kafka.connect.json.JsonConverter
no avro used


CryptoNews implements Serializable {

   // omitted fields

    private List<Currencies> currencies;

}

class Currencies {

    private String code;
    private String title;
    private String slug;
    private String url;

}

shemaconfiguration

public static final Integer FIRST_VERSION = 1;
public static final String CURRENCIES_SCHEMA_NAME = "currencies";
 
 
 
public static final Schema NEWS_SCHEMA = SchemaBuilder.struct().name("News")
            .version(FIRST_VERSION)
            .field(CURRENCIES_SCHEMA_NAME, CURRENCIES_SCHEMA)
            // simple fields ommited for brevity.
            .build();
 
    
 
public static final Schema CURRENCIES_SCHEMA = SchemaBuilder.array(
            SchemaBuilder.struct()
            .field(CODE_FIELD, Schema.OPTIONAL_STRING_SCHEMA)
            .field(TITLE_FIELD, Schema.OPTIONAL_STRING_SCHEMA)
            .field(SLUG_FIELD, Schema.OPTIONAL_STRING_SCHEMA)
            .field(URL_FIELD, Schema.OPTIONAL_STRING_SCHEMA)
            .optional()
            .build()
    )
            .optional()
            .name(CURRENCIES_SCHEMA_NAME)
            .version(FIRST_VERSION)
            .build();

sourcetask

return new SourceRecord(
                sourcePartition(),
                sourceOffset(cryptoNews),
                config.getString(TOPIC_CONFIG),
                null,
                CryptoNewsSchema.NEWS_KEY_SCHEMA,
                buildRecordKey(cryptoNews),
                CryptoNewsSchema.NEWS_SCHEMA,
                buildRecordValue(cryptoNews),
                Instant.now().toEpochMilli()
        );

 
 
public Struct buildRecordValue(CryptoNews cryptoNews){
        Struct valueStruct = new Struct(CryptoNewsSchema.NEWS_SCHEMA);
 
        // Produces Invalid Java object for schema type STRUCT: class com.dto.Currencies
        List<Currencies> currencies = cryptoNews.getCurrencies();
        if (currencies != null) {
            valueStruct.put(CurrenciesSchema.CURRENCIES_SCHEMA_NAME, currencies);
        }
 
        return valueStruct;
}

更新:

worker.properties

bootstrap.servers=localhost:29092
key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=true

internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=true
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter.schemas.enable=true

rest.port=8086
rest.host.name=127.0.0.1
offset.storage.file.filename=offsets/standalone.offsets
offset.flush.interval.ms=10000

I have dto CryptoNews. Which contains

List<Currencies> currencies

I would like to save "currencies" field to SourceRecord when constructing it.
Can't figure out how to:

  1. Declare it in schema.
  2. Pass it to Struct object when building value.

My attempts end in this exception:
Invalid Java object for schema type STRUCT: class com.dto.Currencies

Kafka Connect doesn't provide explicit example how to do handle case, when object in List requires it's own Schema.
I also tried to apply similar approach as in Kafka test cases, but it doesn't work. https://github.com/apache/kafka/blob/trunk/connect/api/src/test/java/org/apache/kafka/connect/data/StructTest.java#L95-L98

How to do this?

kafka-connect-api version: 0.10.2.0-cp1
value and key converter: org.apache.kafka.connect.json.JsonConverter
no avro used


CryptoNews implements Serializable {

   // omitted fields

    private List<Currencies> currencies;

}

class Currencies {

    private String code;
    private String title;
    private String slug;
    private String url;

}

SchemaConfiguration

public static final Integer FIRST_VERSION = 1;
public static final String CURRENCIES_SCHEMA_NAME = "currencies";
 
 
 
public static final Schema NEWS_SCHEMA = SchemaBuilder.struct().name("News")
            .version(FIRST_VERSION)
            .field(CURRENCIES_SCHEMA_NAME, CURRENCIES_SCHEMA)
            // simple fields ommited for brevity.
            .build();
 
    
 
public static final Schema CURRENCIES_SCHEMA = SchemaBuilder.array(
            SchemaBuilder.struct()
            .field(CODE_FIELD, Schema.OPTIONAL_STRING_SCHEMA)
            .field(TITLE_FIELD, Schema.OPTIONAL_STRING_SCHEMA)
            .field(SLUG_FIELD, Schema.OPTIONAL_STRING_SCHEMA)
            .field(URL_FIELD, Schema.OPTIONAL_STRING_SCHEMA)
            .optional()
            .build()
    )
            .optional()
            .name(CURRENCIES_SCHEMA_NAME)
            .version(FIRST_VERSION)
            .build();

SourceTask

return new SourceRecord(
                sourcePartition(),
                sourceOffset(cryptoNews),
                config.getString(TOPIC_CONFIG),
                null,
                CryptoNewsSchema.NEWS_KEY_SCHEMA,
                buildRecordKey(cryptoNews),
                CryptoNewsSchema.NEWS_SCHEMA,
                buildRecordValue(cryptoNews),
                Instant.now().toEpochMilli()
        );

 
 
public Struct buildRecordValue(CryptoNews cryptoNews){
        Struct valueStruct = new Struct(CryptoNewsSchema.NEWS_SCHEMA);
 
        // Produces Invalid Java object for schema type STRUCT: class com.dto.Currencies
        List<Currencies> currencies = cryptoNews.getCurrencies();
        if (currencies != null) {
            valueStruct.put(CurrenciesSchema.CURRENCIES_SCHEMA_NAME, currencies);
        }
 
        return valueStruct;
}

UPDATE:

worker.properties

bootstrap.servers=localhost:29092
key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=true

internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=true
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter.schemas.enable=true

rest.port=8086
rest.host.name=127.0.0.1
offset.storage.file.filename=offsets/standalone.offsets
offset.flush.interval.ms=10000

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

万劫不复 2025-02-13 05:04:17

您需要提供list&lt; struct&gt;

是一个完整的单元测试示例

首先

public interface ConnectPOJOConverter<T> {
  Schema getSchema();
  T fromConnectData(Struct s);
  Struct toConnectData(T t);
}

class ArrayStructTest {

  public static final Schema CURRENCY_ITEM_SCHEMA = SchemaBuilder.struct()
      .version(1)
      .name(Currency.class.getName())
      .doc("A currency item")
      .field("code", Schema.OPTIONAL_STRING_SCHEMA)
      .field("title", Schema.OPTIONAL_STRING_SCHEMA)
      .field("slug", Schema.OPTIONAL_STRING_SCHEMA)
      .field("url", Schema.OPTIONAL_STRING_SCHEMA)
      .build();

  static final ConnectPOJOConverter<Currency> CONVERTER = new CurrencyConverter();

  @Test
  void myTest() {
    // Given
    List<Currency> currencies = new ArrayList<>();
    // TODO: Get from external source
    currencies.add(new Currency("200", "Hello", "/slug", "http://localhost"));
    currencies.add(new Currency("200", "World", "/slug", "http://localhost"));

    // When: build Connect Struct data
    Schema valueSchema = SchemaBuilder.struct()
        .name("CryptoNews")
        .doc("A record holding a list of currency items")
        .version(1)
        .field("currencies", SchemaBuilder.array(CURRENCY_ITEM_SCHEMA).required().build())
        .build();
    final List<Struct> items = currencies.stream()
        .map(CONVERTER::toConnectData)
        .collect(Collectors.toList());
    // In the SourceTask, this is what goes into the SourceRecord along with the valueSchema
    Struct value = new Struct(valueSchema);
    value.put("currencies", items);

    // Then
    assertDoesNotThrow(value::validate);
    Object itemsFromStruct = value.get("currencies");
    assertInstanceOf(List.class, itemsFromStruct);
    //noinspection unchecked
    List<Object> data = (List<Object>) itemsFromStruct; // could also use List<Struct>
    assertEquals(2, data.size(), "same size");
    assertInstanceOf(Struct.class, data.get(0), "Object list still has type information");
    Struct firstStruct = (Struct) data.get(0);
    assertEquals("Hello", firstStruct.get("title"));
    currencies = data.stream()
        .map(o -> (Struct) o)
        .map(CONVERTER::fromConnectData)
        .filter(Objects::nonNull)  // in case converter has errors, could return null
        .collect(Collectors.toList());
    assertTrue(currencies.size() <= data.size());
    assertEquals("World", currencies.get(1).getTitle(), "struct parsing data worked");
  }

  static class CurrencyConverter implements ConnectPOJOConverter<Currency> {

    @Override
    public Schema getSchema() {
      return CURRENCY_ITEM_SCHEMA;
    }

    @Override
    public Currency fromConnectData(Struct s) {
      // simple conversion, but more complex types could throw errors
      return new Currency(
          s.getString("code"),
          s.getString("title"),
          s.getString("url"),
          s.getString("slug")
      );
    }

    @Override
    public Struct toConnectData(Currency c) {
      Struct s = new Struct(getSchema());
      s.put("code", c.getCode());
      s.put("title", c.getTitle());
      s.put("url", c.getUrl());
      s.put("slug", c.getSlug());
      return s;
    }
  }

}

,一个将有助于替代方法的接口就是仅使用字符串架构,并使用Jackson Objectmmapper要获取JSON字符串,然后让JSONCONVERTER处理其余部分。

final ObjectMapper om = new ObjectMapper();
final Schema valueSchema = Schema.STRING_SCHEMA;
output.put("schema", new TextNode("TODO")); // replace with JSONConverter schema

// for-each currency
Map<String, JsonNode> output = new HashMap<>();
try {
  output.put("payload", om.readTree(om.writeValueAsBytes(currency))); // write and parse to not double-encode
  
  String value = om.writeValueAsString(output);
  SourceRecord r = new SourceRecord(...., valueSchema, value);
  records.add(r); // poll return result
} catch (IOException e) {
  // TODO: handle
}
// end for-each

return records;

You need to provide a List<Struct>

Here's a full unit test example

First, an interface that will help

public interface ConnectPOJOConverter<T> {
  Schema getSchema();
  T fromConnectData(Struct s);
  Struct toConnectData(T t);
}

class ArrayStructTest {

  public static final Schema CURRENCY_ITEM_SCHEMA = SchemaBuilder.struct()
      .version(1)
      .name(Currency.class.getName())
      .doc("A currency item")
      .field("code", Schema.OPTIONAL_STRING_SCHEMA)
      .field("title", Schema.OPTIONAL_STRING_SCHEMA)
      .field("slug", Schema.OPTIONAL_STRING_SCHEMA)
      .field("url", Schema.OPTIONAL_STRING_SCHEMA)
      .build();

  static final ConnectPOJOConverter<Currency> CONVERTER = new CurrencyConverter();

  @Test
  void myTest() {
    // Given
    List<Currency> currencies = new ArrayList<>();
    // TODO: Get from external source
    currencies.add(new Currency("200", "Hello", "/slug", "http://localhost"));
    currencies.add(new Currency("200", "World", "/slug", "http://localhost"));

    // When: build Connect Struct data
    Schema valueSchema = SchemaBuilder.struct()
        .name("CryptoNews")
        .doc("A record holding a list of currency items")
        .version(1)
        .field("currencies", SchemaBuilder.array(CURRENCY_ITEM_SCHEMA).required().build())
        .build();
    final List<Struct> items = currencies.stream()
        .map(CONVERTER::toConnectData)
        .collect(Collectors.toList());
    // In the SourceTask, this is what goes into the SourceRecord along with the valueSchema
    Struct value = new Struct(valueSchema);
    value.put("currencies", items);

    // Then
    assertDoesNotThrow(value::validate);
    Object itemsFromStruct = value.get("currencies");
    assertInstanceOf(List.class, itemsFromStruct);
    //noinspection unchecked
    List<Object> data = (List<Object>) itemsFromStruct; // could also use List<Struct>
    assertEquals(2, data.size(), "same size");
    assertInstanceOf(Struct.class, data.get(0), "Object list still has type information");
    Struct firstStruct = (Struct) data.get(0);
    assertEquals("Hello", firstStruct.get("title"));
    currencies = data.stream()
        .map(o -> (Struct) o)
        .map(CONVERTER::fromConnectData)
        .filter(Objects::nonNull)  // in case converter has errors, could return null
        .collect(Collectors.toList());
    assertTrue(currencies.size() <= data.size());
    assertEquals("World", currencies.get(1).getTitle(), "struct parsing data worked");
  }

  static class CurrencyConverter implements ConnectPOJOConverter<Currency> {

    @Override
    public Schema getSchema() {
      return CURRENCY_ITEM_SCHEMA;
    }

    @Override
    public Currency fromConnectData(Struct s) {
      // simple conversion, but more complex types could throw errors
      return new Currency(
          s.getString("code"),
          s.getString("title"),
          s.getString("url"),
          s.getString("slug")
      );
    }

    @Override
    public Struct toConnectData(Currency c) {
      Struct s = new Struct(getSchema());
      s.put("code", c.getCode());
      s.put("title", c.getTitle());
      s.put("url", c.getUrl());
      s.put("slug", c.getSlug());
      return s;
    }
  }

}

The alternative approach is to just use a String schema, and use Jackson ObjectMapper to get a JSON string, then let JSONConverter handle the rest.

final ObjectMapper om = new ObjectMapper();
final Schema valueSchema = Schema.STRING_SCHEMA;
output.put("schema", new TextNode("TODO")); // replace with JSONConverter schema

// for-each currency
Map<String, JsonNode> output = new HashMap<>();
try {
  output.put("payload", om.readTree(om.writeValueAsBytes(currency))); // write and parse to not double-encode
  
  String value = om.writeValueAsString(output);
  SourceRecord r = new SourceRecord(...., valueSchema, value);
  records.add(r); // poll return result
} catch (IOException e) {
  // TODO: handle
}
// end for-each

return records;
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文