Jsr223 采样器将 null json 发送到 kafka 主题
我正在使用 jsr223 采样器使用 kafka 客户端 jar 将 json 消息发布到 kafka。当我发布消息时,kafka 中的消息为空。有人能告诉我我错过了什么吗?实际上,消息在应用程序中为空。下面是我的代码。
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;
import java.nio.charset.StandardCharsets;
import groovy.json.JsonSlurper;
import java.util.ArrayList;
import org.apache.jmeter.threads.JMeterVariables;
Properties props = new Properties();
props.put("bootstrap.servers", "lxkfkbkomsstg01.lowes.com:9093,lxkfkbkomsstg02.lowes.com:9093,lxkfkbkomsstg03.lowes.com:9093,lxkfkbkomsstg04.lowes.com:9093,lxkfkbkomsstg05.lowes.com:9093");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("compression.type", "none");
props.put("batch.size", "16384");
props.put("linger.ms", "0");
props.put("buffer.memory", "33554432");
props.put("acks", "1");
props.put("send.buffer.bytes", "131072");
props.put("receive.buffer.bytes", "32768");
props.put("security.protocol", "SSL");
//props.put("sasl.kerberos.service.name", "kafka");
//props.put("sasl.mechanism", "GSSAPI");
//props.put("ssl.keystore.type", "JKS");
props.put("ssl.truststore.location", "/Users/rajkumar/Documents/EOMS/eoms-truststore-stage.jks");
props.put("ssl.truststore.password", "4DxYJnVDcPi6E8w3uCS63qoa");
props.put("ssl.endpoint.identification.algorithm", "");
props.put("ssl.protocol", "SSL");
props.put("ssl.truststore.type", "JKS");
String eventType="orbit_pick";
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
List<Header> headers = new ArrayList<Header>();
Header header = new RecordHeader("event_type",eventType.getBytes(StandardCharsets.UTF_8));
headers.add(header);
// headers.add(new RecordHeader("event_type",eventType.getBytes(StandardCharsets.UTF_8)));
Date latestdate = new Date();
ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>("orbit.shipment.lfs.inbound.prf", 1, latestdate.getTime(), "702807441", "{\"SellerOrganizationCode\":\"LOWES\",\"ShipNode\":\"0224\",\"IsShortage\":\"N\",\"ShipmentKey\":\"2022021708351092902896763\",\"ShipmentNo\":\"702807441\",\"Extn\":{\"ExtnPickingHasStartedFlag\":\"Y\",\"ExtnSourceSystem\":\"StoreOrderSvc\",\"ExtnPickerId\":\"98977\",\"ExtnOperation\":\"pick\",\"ExtnInPickupLocker\":\"N\"},\"Instructions\":{\"Instruction\":{\"InstructionText\":\"picking\"},\"Replace\":\"Y\"},\"ShipmentLines\":{\"ShipmentLine\":[{\"BackroomPickedQuantity\":\"1\",\"Quantity\":\"3\",\"CodeValue\":\"\",\"ShipmentLineNo\":\"1\",\"ShipmentSubLineNo\":\"0\",\"ShortageQty\":\"\",\"ItemID\":\"1505\",\"NewShipNode\":\"\",\"Extn\":null}]},\"MessageID\":\"8970549709qqaachwejhk\",\"MessageTimeStamp\":\\"${__time(yyyy-MM-dd'T'hh:mm:ss)}\",\"eventType\":\"orbit_multiple_shipments_customer_pickup\"}", headers);
producer.send(producerRecord);
producer.close();
I am using jsr223 sampler to post json message to kafka using kafka client jar. When I am posting message is going in kafka as null. Can someone tell what I am missing. Actually message is going as null in Application .Below is my code.
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;
import java.nio.charset.StandardCharsets;
import groovy.json.JsonSlurper;
import java.util.ArrayList;
import org.apache.jmeter.threads.JMeterVariables;
Properties props = new Properties();
props.put("bootstrap.servers", "lxkfkbkomsstg01.lowes.com:9093,lxkfkbkomsstg02.lowes.com:9093,lxkfkbkomsstg03.lowes.com:9093,lxkfkbkomsstg04.lowes.com:9093,lxkfkbkomsstg05.lowes.com:9093");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("compression.type", "none");
props.put("batch.size", "16384");
props.put("linger.ms", "0");
props.put("buffer.memory", "33554432");
props.put("acks", "1");
props.put("send.buffer.bytes", "131072");
props.put("receive.buffer.bytes", "32768");
props.put("security.protocol", "SSL");
//props.put("sasl.kerberos.service.name", "kafka");
//props.put("sasl.mechanism", "GSSAPI");
//props.put("ssl.keystore.type", "JKS");
props.put("ssl.truststore.location", "/Users/rajkumar/Documents/EOMS/eoms-truststore-stage.jks");
props.put("ssl.truststore.password", "4DxYJnVDcPi6E8w3uCS63qoa");
props.put("ssl.endpoint.identification.algorithm", "");
props.put("ssl.protocol", "SSL");
props.put("ssl.truststore.type", "JKS");
String eventType="orbit_pick";
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
List<Header> headers = new ArrayList<Header>();
Header header = new RecordHeader("event_type",eventType.getBytes(StandardCharsets.UTF_8));
headers.add(header);
// headers.add(new RecordHeader("event_type",eventType.getBytes(StandardCharsets.UTF_8)));
Date latestdate = new Date();
ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>("orbit.shipment.lfs.inbound.prf", 1, latestdate.getTime(), "702807441", "{\"SellerOrganizationCode\":\"LOWES\",\"ShipNode\":\"0224\",\"IsShortage\":\"N\",\"ShipmentKey\":\"2022021708351092902896763\",\"ShipmentNo\":\"702807441\",\"Extn\":{\"ExtnPickingHasStartedFlag\":\"Y\",\"ExtnSourceSystem\":\"StoreOrderSvc\",\"ExtnPickerId\":\"98977\",\"ExtnOperation\":\"pick\",\"ExtnInPickupLocker\":\"N\"},\"Instructions\":{\"Instruction\":{\"InstructionText\":\"picking\"},\"Replace\":\"Y\"},\"ShipmentLines\":{\"ShipmentLine\":[{\"BackroomPickedQuantity\":\"1\",\"Quantity\":\"3\",\"CodeValue\":\"\",\"ShipmentLineNo\":\"1\",\"ShipmentSubLineNo\":\"0\",\"ShortageQty\":\"\",\"ItemID\":\"1505\",\"NewShipNode\":\"\",\"Extn\":null}]},\"MessageID\":\"8970549709qqaachwejhk\",\"MessageTimeStamp\":\\"${__time(yyyy-MM-dd'T'hh:mm:ss)}\",\"eventType\":\"orbit_multiple_shipments_customer_pickup\"}", headers);
producer.send(producerRecord);
producer.close();
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
我没有看到任何代码,因此预计它不会向 Kafka 发送任何内容。
以下是一些示例,您可以用作参考:
PS 您知道 Pepper-Box - Kafka Load Generator< /a> 插件?您可能会发现它更容易配置和使用。查看 Apache Kafka - 如何加载测试JMeter 文章了解更多信息
I don't see any code so it's expected it doesn't send anything to Kafka.
Here is some sample you can use as a reference:
P.S. Are you aware of Pepper-Box - Kafka Load Generator plugin? You may find it easier to configure and use. Check out Apache Kafka - How to Load Test with JMeter article for more information