kafka生产者发送null键而不是字符串
我正在使用生产者将消息发送到Kafka主题。
当Junit测试时,我发现我的应用程序代码中的生产商(但在我的Junit测试类中不在)中正在发送一个空键,尽管我提供了使用它的字符串键。
代码如下:
主应用程序类
final Producer<String, HashSet<String>> actualApplicationProducer;
ApplicationInstance(String bootstrapServers) // constructor
{
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.CLIENT_ID_CONFIG, "ActualClient");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, CustomSerializer.class.getName());
props.put(ProducerConfig.LINGER_MS_CONFIG, lingerBatchMS);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, Math.min(maxBatchSizeBytes,1000000));
actualApplicationProducer = new KafkaProducer<>(props);
}
public void doStuff()
{
HashSet<String> values = new HashSet<String>();
String key = "applicationKey";
// THIS LINE IS SENDING A NULL KEY
actualApplicationProducer.send(new ProducerRecord<>(topicName, key, values));
}
,但是,在我的Junit类中:
@EmbeddedKafka
@ExtendWith(SpringExtension.class)
@SuppressWarnings("static-method")
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
public class CIFFileProcessorTests
{
/** An Embedded Kafka Broker that can be used for unit testing purposes. */
@Autowired
private EmbeddedKafkaBroker embeddedKafkaBroker;
@BeforeAll
public void setUpBeforeClass(@TempDir File globalTablesDir, @TempDir File rootDir) throws Exception
{
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.CLIENT_ID_CONFIG, "JUnitClient");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, CustomSerializer.class.getName());
props.put(ProducerConfig.LINGER_MS_CONFIG, lingerBatchMS);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, Math.min(maxBatchSizeBytes,1000000));
try(Producer<String, HashSet<String>> junitProducer = new Producer<>(props))
{
HashSet<String> values = new HashSet<>();
// Here, I'm sending a record, just like in my main application code, but it's sending the key correctly and not null
junitProducer.send(new ProducerRecord<>(topicName,"junitKey",values));
}
@Test
public void test()
{
ApplicationInstance sut = new ApplicationInstance(embeddedKafkaBroker.getBrokersAsString());
sut.doStuff();
// "records" is a LinkedBlockingQueue, populated by a KafkaMessageListenerContainer which is monitoring the topic for records using a MessageListener
ConsumerRecord<String, HashSet<String>> record = records.poll(1,TimeUnit.SECONDS);
assertEquals("junitKey", record.key()); // TEST FAILS - expected "junitKey" but returned null
}
自定义序列化器:
try (final ByteArrayOutputStream baos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(baos))
{
oos.writeObject(object);
return baos.toByteArray();
}
有人知道为什么 kafkaproducer
当我明确指定字符串时会发送null键吗?
---更新---
我尝试检查元数据,生产者确实在发送密钥,而不是null:
RecordMetadata info = actualApplicationProducer.send(new ProducerRecord<>(topicName, key, values)).get();
System.out.println("INFO - partition: " + info.partition() + ", topic: " + info.topic() + ", offset: " + info.offset() + ", timestamp: "+ info.timestamp() + ", keysize: " + info.serializedKeySize() + ", valuesize: " + info.serializedValueSize());
输出:
信息 - 分区:0,主题:主题名称,偏移:2,时间戳:1656060840304,Keysize:14,alutialize:6258
Keysize均为&gt; 0表明null未传递给主题。
因此,也许是对主题阅读的问题?
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(2)
事实证明,我正在为我的
kafkamessagelistenerconcontainer
使用其他的求职者类,它不知道该如何处理所提供的字符串Turns out, I was using a different Deserializer class for my
KafkaMessageListenerContainer
, which didn't know what to do with the String as provided不确定为什么要使用bytearrayoutputstream或ObjectOutputStream来序列化kafka生产者记录,这可能是您的要求。在这种情况下,您可以从
但是可以轻松地在生产者记录中注入密钥。例如,如果您想从AVRO模式生成生产者记录并使用断言来注入记录密钥和值,则可以做类似的事情。
您可以参考 https://technology.amis.amis.nl/soa/kafka/generate-random-json-data-data-from-an-an-an-avro-schema-schema-using-java/
使用JSONAVROCONVERTER:
Not sure why you want to use ByteArrayOutputStream or ObjectOutputStream for serializing KAFKA producer records, that may be your requirement. In such case, you may refer the producer section from https://dzone.com/articles/kafka-producer-and-consumer-example
But injecting key in the producer record can be easily done. For example, if you want generate a Producer Record from an AVRO schema and use assert to inject record key and value, you can do something like this.
You can refer https://technology.amis.nl/soa/kafka/generate-random-json-data-from-an-avro-schema-using-java/
You can convert it to SpecifiRecords using JSONAVROConverter: