kafka生产者发送null键而不是字符串

发布于 2025-02-09 20:30:44 字数 4197 浏览 2 评论 0 原文

我正在使用生产者将消息发送到Kafka主题。

当Junit测试时,我发现我的应用程序代码中的生产商(但在我的Junit测试类中不在)中正在发送一个空键,尽管我提供了使用它的字符串键。

代码如下:

主应用程序类

final Producer<String, HashSet<String>> actualApplicationProducer;

ApplicationInstance(String bootstrapServers) // constructor
{
    Properties props = new Properties();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.put(ProducerConfig.CLIENT_ID_CONFIG, "ActualClient");
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, CustomSerializer.class.getName());
    props.put(ProducerConfig.LINGER_MS_CONFIG, lingerBatchMS);
    props.put(ProducerConfig.BATCH_SIZE_CONFIG, Math.min(maxBatchSizeBytes,1000000));

    actualApplicationProducer = new KafkaProducer<>(props);
}

public void doStuff()
{
    HashSet<String> values = new HashSet<String>();
    String key = "applicationKey";
    // THIS LINE IS SENDING A NULL KEY
    actualApplicationProducer.send(new ProducerRecord<>(topicName, key, values));
}

,但是,在我的Junit类中:

@EmbeddedKafka
@ExtendWith(SpringExtension.class)
@SuppressWarnings("static-method")
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
public class CIFFileProcessorTests 
{
    /** An Embedded Kafka Broker that can be used for unit testing purposes. */
    @Autowired
    private EmbeddedKafkaBroker embeddedKafkaBroker;

@BeforeAll
    public void setUpBeforeClass(@TempDir File globalTablesDir, @TempDir File rootDir) throws Exception 
    {
    Properties props = new Properties();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.put(ProducerConfig.CLIENT_ID_CONFIG, "JUnitClient");
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, CustomSerializer.class.getName());
    props.put(ProducerConfig.LINGER_MS_CONFIG, lingerBatchMS);
    props.put(ProducerConfig.BATCH_SIZE_CONFIG, Math.min(maxBatchSizeBytes,1000000));
        try(Producer<String, HashSet<String>> junitProducer = new Producer<>(props))
        {
            HashSet<String> values = new HashSet<>();
            // Here, I'm sending a record, just like in my main application code, but it's sending the key correctly and not null
            junitProducer.send(new ProducerRecord<>(topicName,"junitKey",values));
        }

    @Test
    public void test()
    {
        ApplicationInstance sut = new ApplicationInstance(embeddedKafkaBroker.getBrokersAsString());
sut.doStuff();
        
        // "records" is a LinkedBlockingQueue, populated by a KafkaMessageListenerContainer which is monitoring the topic for records using a MessageListener
        ConsumerRecord<String, HashSet<String>> record = records.poll(1,TimeUnit.SECONDS);
        assertEquals("junitKey", record.key()); // TEST FAILS - expected "junitKey" but returned null
    }

自定义序列化器:

try (final ByteArrayOutputStream baos = new ByteArrayOutputStream();
                  ObjectOutputStream oos = new ObjectOutputStream(baos)) 
        {
            oos.writeObject(object);
            return baos.toByteArray();
        }

有人知道为什么 kafkaproducer 当我明确指定字符串时会发送null键吗?

---更新---

我尝试检查元数据,生产者确实在发送密钥,而不是null:

RecordMetadata info = actualApplicationProducer.send(new ProducerRecord<>(topicName, key, values)).get(); 
System.out.println("INFO - partition: " + info.partition() + ", topic: " + info.topic() + ", offset: " + info.offset() + ", timestamp: "+ info.timestamp() + ", keysize: " + info.serializedKeySize() + ", valuesize: " + info.serializedValueSize());

输出:

信息 - 分区:0,主题:主题名称,偏移:2,时间戳:1656060840304,Keysize:14,alutialize:6258

Keysize均为&gt; 0表明null未传递给主题。

因此,也许是对主题阅读的问题?

I am using a Producer to send messages to a Kafka topic.

When JUnit testing, I have found that the producer in my application code (but not in my JUnit test class) is sending a null key, despite me providing a String key for it to use.

Code as follows:

Main application class

final Producer<String, HashSet<String>> actualApplicationProducer;

ApplicationInstance(String bootstrapServers) // constructor
{
    Properties props = new Properties();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.put(ProducerConfig.CLIENT_ID_CONFIG, "ActualClient");
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, CustomSerializer.class.getName());
    props.put(ProducerConfig.LINGER_MS_CONFIG, lingerBatchMS);
    props.put(ProducerConfig.BATCH_SIZE_CONFIG, Math.min(maxBatchSizeBytes,1000000));

    actualApplicationProducer = new KafkaProducer<>(props);
}

public void doStuff()
{
    HashSet<String> values = new HashSet<String>();
    String key = "applicationKey";
    // THIS LINE IS SENDING A NULL KEY
    actualApplicationProducer.send(new ProducerRecord<>(topicName, key, values));
}

But, in my junit classes:

@EmbeddedKafka
@ExtendWith(SpringExtension.class)
@SuppressWarnings("static-method")
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
public class CIFFileProcessorTests 
{
    /** An Embedded Kafka Broker that can be used for unit testing purposes. */
    @Autowired
    private EmbeddedKafkaBroker embeddedKafkaBroker;

@BeforeAll
    public void setUpBeforeClass(@TempDir File globalTablesDir, @TempDir File rootDir) throws Exception 
    {
    Properties props = new Properties();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.put(ProducerConfig.CLIENT_ID_CONFIG, "JUnitClient");
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, CustomSerializer.class.getName());
    props.put(ProducerConfig.LINGER_MS_CONFIG, lingerBatchMS);
    props.put(ProducerConfig.BATCH_SIZE_CONFIG, Math.min(maxBatchSizeBytes,1000000));
        try(Producer<String, HashSet<String>> junitProducer = new Producer<>(props))
        {
            HashSet<String> values = new HashSet<>();
            // Here, I'm sending a record, just like in my main application code, but it's sending the key correctly and not null
            junitProducer.send(new ProducerRecord<>(topicName,"junitKey",values));
        }

    @Test
    public void test()
    {
        ApplicationInstance sut = new ApplicationInstance(embeddedKafkaBroker.getBrokersAsString());
sut.doStuff();
        
        // "records" is a LinkedBlockingQueue, populated by a KafkaMessageListenerContainer which is monitoring the topic for records using a MessageListener
        ConsumerRecord<String, HashSet<String>> record = records.poll(1,TimeUnit.SECONDS);
        assertEquals("junitKey", record.key()); // TEST FAILS - expected "junitKey" but returned null
    }

Custom serializer:

try (final ByteArrayOutputStream baos = new ByteArrayOutputStream();
                  ObjectOutputStream oos = new ObjectOutputStream(baos)) 
        {
            oos.writeObject(object);
            return baos.toByteArray();
        }

Does anyone know why the KafkaProducer would send a null key when I explicitly specify a String?

--- Update ---

I have tried inspecting the metadata, and the Producer is indeed sending the key, and not null:

RecordMetadata info = actualApplicationProducer.send(new ProducerRecord<>(topicName, key, values)).get(); 
System.out.println("INFO - partition: " + info.partition() + ", topic: " + info.topic() + ", offset: " + info.offset() + ", timestamp: "+ info.timestamp() + ", keysize: " + info.serializedKeySize() + ", valuesize: " + info.serializedValueSize());

output:

INFO - partition: 0, topic: topicName, offset: 2, timestamp: 1656060840304, keysize: 14, valuesize: 6258

The keysize being > 0 shows that null is not passed to the topic.

So, the issue must be with the reading of the topic, perhaps?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

ζ澈沫 2025-02-16 20:30:44

事实证明,我正在为我的 kafkamessagelistenerconcontainer 使用其他的求职者类,它不知道该如何处理所提供的字符串

Turns out, I was using a different Deserializer class for my KafkaMessageListenerContainer, which didn't know what to do with the String as provided

滥情空心 2025-02-16 20:30:44

不确定为什么要使用bytearrayoutputstream或ObjectOutputStream来序列化kafka生产者记录,这可能是您的要求。在这种情况下,您可以从

但是可以轻松地在生产者记录中注入密钥。例如,如果您想从AVRO模式生成生产者记录并使用断言来注入记录密钥和值,则可以做类似的事情。

  1. 生成AVRO或特定记录,

您可以参考 https://technology.amis.amis.nl/soa/kafka/generate-random-json-data-data-from-an-an-an-avro-schema-schema-using-java/

使用JSONAVROCONVERTER:

public static ProducerRecord<String, CustomEvent> generateRecord(){
    String schemaFile = "AVROSchema.avsc";
    Schema schema = getSchema(JSONFile);
    String json = getJson(dataFile);

    byte[] jsonBytes = json.getBytes(StandardCharsets.UTF_8);

    CustomEventMessage producerRecord = null;
    JsonAvroConverter converter = new JsonAvroConverter();

    try {

        record = converter.convertToSpecificRecord(jsonBytes, CustomEvent.class, schema);
    } catch (Exception e) {

    }

    String recordKey = "YourKey";

    return new ProducerRecord<String, CustomEvent>( topic, recordKey, record);
}
  1. 您可以稍后将producterRecord注入costerRecord。

Not sure why you want to use ByteArrayOutputStream or ObjectOutputStream for serializing KAFKA producer records, that may be your requirement. In such case, you may refer the producer section from https://dzone.com/articles/kafka-producer-and-consumer-example

But injecting key in the producer record can be easily done. For example, if you want generate a Producer Record from an AVRO schema and use assert to inject record key and value, you can do something like this.

  1. Generate a AVRO or Specific records

You can refer https://technology.amis.nl/soa/kafka/generate-random-json-data-from-an-avro-schema-using-java/

You can convert it to SpecifiRecords using JSONAVROConverter:

public static ProducerRecord<String, CustomEvent> generateRecord(){
    String schemaFile = "AVROSchema.avsc";
    Schema schema = getSchema(JSONFile);
    String json = getJson(dataFile);

    byte[] jsonBytes = json.getBytes(StandardCharsets.UTF_8);

    CustomEventMessage producerRecord = null;
    JsonAvroConverter converter = new JsonAvroConverter();

    try {

        record = converter.convertToSpecificRecord(jsonBytes, CustomEvent.class, schema);
    } catch (Exception e) {

    }

    String recordKey = "YourKey";

    return new ProducerRecord<String, CustomEvent>( topic, recordKey, record);
}
  1. You can inject the ProducerRecord into your Assert functions later.
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文