具有多个不同AVRO生产商和交易的Kafkaconsumer

发布于 2025-01-25 02:04:17 字数 15248 浏览 4 评论 0原文

我有一个卡夫卡消费者。它消耗了一个字符串。然后,基于字符串,我们将其转换为不同的AVRO对象,然后将它们发布到不同的主题。我们需要EOS,而我们得到的问题是标有@Primary Works的生产者,但是没有主要失败的情况下,下面的错误。无论如何是否有两个可以满足两者的住所?

kafkaconsumer

  @Configuration
public class KafkaConsumerConfig {

    @Value("${kafka.server}")
    String server;

    @Value("${kafka.consumer.groupid}")
    String groupid;

    @Autowired
    Tracer tracer;

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> config = new HashMap<>();
        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, server);
        config.put(ConsumerConfig.GROUP_ID_CONFIG, groupid);
        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        config.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
        config.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 120000);
        config.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 10000);
        //config.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 15000);       

        return new TracingConsumerFactory<>(new DefaultKafkaConsumerFactory<>(config), tracer);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(
            KafkaAwareTransactionManager<Object, Object> transactionManager) {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<String, String>();
        factory.setConsumerFactory(consumerFactory());
        factory.setAutoStartup(false);
        factory.setConcurrency(2);

        factory.setBatchListener(true);
        factory.getContainerProperties().setAckMode(AckMode.BATCH);
        factory.getContainerProperties().setEosMode(EOSMode.ALPHA);
        factory.getContainerProperties().setTransactionManager(transactionManager);

        return factory;
    }

}

kafkaproducer 1

    @Configuration
public class KafkaProducerConfig {

    @Value("${kafka.server}")
    String server;

    @Autowired
    public Tracer tracer;

    String tranId = "eventsanavro";

    
    @Bean(name = "transactionalProducerFactoryAvro")
    public ProducerFactory<String, TransactionAvroEntity> producerFactoryavro() {
        Map<String, Object> config = new HashMap<>();
        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, server);
        config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, AvroSerializer.class.getName());
        config.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
        config.put(ProducerConfig.ACKS_CONFIG, "all");
        config.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, tranId);
        config.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
        config.put(ProducerConfig.LINGER_MS_CONFIG, "200");
        config.put(ProducerConfig.BATCH_SIZE_CONFIG, Integer.toString(256 * 1024));
        config.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 120000);
        config.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 60000);
        config.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5);
        config.put(ProducerConfig.BUFFER_MEMORY_CONFIG, Integer.toString(32768 * 1024));
        config.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
        config.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, tranId);
        return new TracingProducerFactory<>(new DefaultKafkaProducerFactory<>(config), tracer);
    }

    
    @Qualifier("transactionalProducerFactoryAvro")
    @Bean(name = "transactionalKafkaTemplateAvro")
    public KafkaTemplate<String, TransactionAvroEntity> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactoryavro());
    }

    
    @Qualifier("transactionalProducerFactoryAvro")
    @Bean(name = "transactionalKafkaTransactionManagerAvro")
    public KafkaAwareTransactionManager<?, ?> kafkaTransactionManager(
            ProducerFactory<String, TransactionAvroEntity> producerFactory) {
        return new KafkaTransactionManager<>(producerFactory);
    }
}

kafkaproducer 2

    @Configuration
public class KafkaProducerNonAvroConfig {

    @Value("${kafka.server}")
    String server;

    @Autowired
    public Tracer tracer;
    
    String tranId = "eventsannonavro";
    @Primary
    @Bean(name = "transactionalProducerFactoryNonAvro")
    public ProducerFactory<String, String> producerFactoryNonAvro() {
        Map<String, Object> config = new HashMap<>();
        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, server);
        config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        config.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
        config.put(ProducerConfig.ACKS_CONFIG, "all");
        config.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, tranId);
        config.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
        config.put(ProducerConfig.LINGER_MS_CONFIG, "200");
        config.put(ProducerConfig.BATCH_SIZE_CONFIG, Integer.toString(256 * 1024));     
        config.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG,120000);
        config.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG,60000);                             
        config.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5);        
        config.put(ProducerConfig.BUFFER_MEMORY_CONFIG, Integer.toString(32768* 1024));
        config.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
        config.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, tranId);
        return new TracingProducerFactory<>(new DefaultKafkaProducerFactory<>(config), tracer);
    }

    @Primary
    @Qualifier("transactionalProducerFactoryNonAvro")
    @Bean(name = "transactionalKafkaTemplateNonAvro")
    public KafkaTemplate<String, String> kafkatemplate() {
        return new KafkaTemplate<>(producerFactoryNonAvro());
    }
    
    @Primary
    @Qualifier("transactionalProducerFactoryNonAvro")
    @Bean(name = "transactionalKafkaTransactionManagerNonAvro")
    public KafkaAwareTransactionManager<?, ?> kafkaTransactionManager(ProducerFactory<String, String> producerFactory) {
        return new KafkaTransactionManager<>(producerFactory);
    }
    
}

producerWrapper

@Service
public class KafkaTopicProducer {

    @Autowired
    private KafkaTemplate<String, TransactionAvroEntity> kafkaTemplate;

    @Autowired
    private KafkaTemplate<String, String> kafkaProducerNonAvrokafkaTemplate;

    public void topicProducerAvro(TransactionAvroEntity payload, String topic, Headers headers) {

        ProducerRecord<String, TransactionAvroEntity> producerRecord = new ProducerRecord<String, TransactionAvroEntity>(
                topic, null, UUID.randomUUID().toString(), payload, headers);


        kafkaTemplate.send(producerRecord);

    }

    public void kafkaAvroFlush() {
        kafkaTemplate.flush();
    }

    public void topicProducerNonAvro(String payload, String topic, Headers headers) {

        ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(topic, null,
                UUID.randomUUID().toString(), payload, headers);

        kafkaProducerNonAvrokafkaTemplate.send(producerRecord);

    }

    public void kafkaNonAvroFlush() {
        kafkaProducerNonAvrokafkaTemplate.flush();
    }
}

错误 引起的:可能的解决方案:在模板范围内运行模板操作。ExecuteIntransaction()操作,在调用模板方法之前使用 @transactional启动交易,在消耗记录

完整堆栈跟踪时由侦听器容器启动的交易中运行

2022-05-03 09:35:11,358  INFO  [nerMoz-0-C-1] o.a.kafka.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-ifhEventSanitizer-1, groupId=ifhEventSanitizer] Seeking to offset 0 for partition za.local.file.singleLineGLTransactionEvent.1-0 
 2022-05-03 09:35:11,883  INFO  [nerMoz-0-C-1] o.a.kafka.clients.producer.KafkaProducer : [Producer clientId=producer-eventsanavroifhEventSanitizer.za.local.file.singleLineGLTransactionEvent.1.0, transactionalId=eventsanavroifhEventSanitizer.za.local.file.singleLineGLTransactionEvent.1.0] Aborting incomplete transaction 
 2022-05-03 09:35:11,884  ERROR [nerMoz-0-C-1] essageListenerContainer$ListenerConsumer : Transaction rolled back 
org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method 'public boolean com.fnb.fin.ifhEventSanitizer.kafka.KafkaConsumerMoz.consume(java.util.List<org.apache.kafka.clients.consumer.ConsumerRecord<java.lang.String, java.lang.String>>,org.apache.kafka.clients.consumer.Consumer<?, ?>)' threw exception; nested exception is java.lang.IllegalStateException: No transaction is in process; possible solutions: run the template operation within the scope of a template.executeInTransaction() operation, start a transaction with @Transactional before invoking the template method, run in a transaction started by a listener container when consuming a record; nested exception is java.lang.IllegalStateException: No transaction is in process; possible solutions: run the template operation within the scope of a template.executeInTransaction() operation, start a transaction with @Transactional before invoking the template method, run in a transaction started by a listener container when consuming a record
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:2372)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeBatchOnMessage(KafkaMessageListenerContainer.java:2008)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeBatchOnMessageWithRecordsOrList(KafkaMessageListenerContainer.java:1978)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeBatchOnMessage(KafkaMessageListenerContainer.java:1930)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeBatchListener(KafkaMessageListenerContainer.java:1842)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.access$2100(KafkaMessageListenerContainer.java:518)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer$1.doInTransactionWithoutResult(KafkaMessageListenerContainer.java:1749)
    at org.springframework.transaction.support.TransactionCallbackWithoutResult.doInTransaction(TransactionCallbackWithoutResult.java:36)
    at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:140)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeBatchListenerInTx(KafkaMessageListenerContainer.java:1740)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeBatchListener(KafkaMessageListenerContainer.java:1722)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1704)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1274)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1266)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1161)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.lang.Thread.run(Thread.java:832)
    Suppressed: org.springframework.kafka.listener.ListenerExecutionFailedException: Restored Stack Trace
        at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:363)
        at org.springframework.kafka.listener.adapter.BatchMessagingMessageListenerAdapter.invoke(BatchMessagingMessageListenerAdapter.java:180)
        at org.springframework.kafka.listener.adapter.BatchMessagingMessageListenerAdapter.onMessage(BatchMessagingMessageListenerAdapter.java:172)
        at org.springframework.kafka.listener.adapter.BatchMessagingMessageListenerAdapter.onMessage(BatchMessagingMessageListenerAdapter.java:61)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeBatchOnMessage(KafkaMessageListenerContainer.java:1988)
Caused by: java.lang.IllegalStateException: No transaction is in process; possible solutions: run the template operation within the scope of a template.executeInTransaction() operation, start a transaction with @Transactional before invoking the template method, run in a transaction started by a listener container when consuming a record
    at org.springframework.util.Assert.state(Assert.java:76)
    at org.springframework.kafka.core.KafkaTemplate.getTheProducer(KafkaTemplate.java:657)
    at org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:569)
    at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:406)
    at com.fnb.fin.ifhEventSanitizer.kafka.KafkaTopicProducer.topicProducerNonAvro(KafkaTopicProducer.java:44)
    at com.fnb.fin.ifhEventSanitizer.kafka.KafkaConsumerMoz.consume(KafkaConsumerMoz.java:108)
    at jdk.internal.reflect.GeneratedMethodAccessor111.invoke(Unknown Source)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:564)
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:171)
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:120)
    at org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:56)
    at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:347)
    at org.springframework.kafka.listener.adapter.BatchMessagingMessageListenerAdapter.invoke(BatchMessagingMessageListenerAdapter.java:180)
    at org.springframework.kafka.listener.adapter.BatchMessagingMessageListenerAdapter.onMessage(BatchMessagingMessageListenerAdapter.java:172)
    at org.springframework.kafka.listener.adapter.BatchMessagingMessageListenerAdapter.onMessage(BatchMessagingMessageListenerAdapter.java:61)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeBatchOnMessage(KafkaMessageListenerContainer.java:1988)
    ... 16 common frames omitted

I have a single kafka consumer. It consumes a string. Based on the string we then convert it to different avro object and publish them to different topics. We require EOS and the issue we are getting is the producer marked with @Primary works however the one without primary fails with the error below. Is there anyway to accomodate both?

KafkaConsumer

  @Configuration
public class KafkaConsumerConfig {

    @Value("${kafka.server}")
    String server;

    @Value("${kafka.consumer.groupid}")
    String groupid;

    @Autowired
    Tracer tracer;

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> config = new HashMap<>();
        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, server);
        config.put(ConsumerConfig.GROUP_ID_CONFIG, groupid);
        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        config.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
        config.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 120000);
        config.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 10000);
        //config.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 15000);       

        return new TracingConsumerFactory<>(new DefaultKafkaConsumerFactory<>(config), tracer);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(
            KafkaAwareTransactionManager<Object, Object> transactionManager) {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<String, String>();
        factory.setConsumerFactory(consumerFactory());
        factory.setAutoStartup(false);
        factory.setConcurrency(2);

        factory.setBatchListener(true);
        factory.getContainerProperties().setAckMode(AckMode.BATCH);
        factory.getContainerProperties().setEosMode(EOSMode.ALPHA);
        factory.getContainerProperties().setTransactionManager(transactionManager);

        return factory;
    }

}

KafkaProducer 1

    @Configuration
public class KafkaProducerConfig {

    @Value("${kafka.server}")
    String server;

    @Autowired
    public Tracer tracer;

    String tranId = "eventsanavro";

    
    @Bean(name = "transactionalProducerFactoryAvro")
    public ProducerFactory<String, TransactionAvroEntity> producerFactoryavro() {
        Map<String, Object> config = new HashMap<>();
        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, server);
        config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, AvroSerializer.class.getName());
        config.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
        config.put(ProducerConfig.ACKS_CONFIG, "all");
        config.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, tranId);
        config.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
        config.put(ProducerConfig.LINGER_MS_CONFIG, "200");
        config.put(ProducerConfig.BATCH_SIZE_CONFIG, Integer.toString(256 * 1024));
        config.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 120000);
        config.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 60000);
        config.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5);
        config.put(ProducerConfig.BUFFER_MEMORY_CONFIG, Integer.toString(32768 * 1024));
        config.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
        config.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, tranId);
        return new TracingProducerFactory<>(new DefaultKafkaProducerFactory<>(config), tracer);
    }

    
    @Qualifier("transactionalProducerFactoryAvro")
    @Bean(name = "transactionalKafkaTemplateAvro")
    public KafkaTemplate<String, TransactionAvroEntity> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactoryavro());
    }

    
    @Qualifier("transactionalProducerFactoryAvro")
    @Bean(name = "transactionalKafkaTransactionManagerAvro")
    public KafkaAwareTransactionManager<?, ?> kafkaTransactionManager(
            ProducerFactory<String, TransactionAvroEntity> producerFactory) {
        return new KafkaTransactionManager<>(producerFactory);
    }
}

KafkaProducer 2

    @Configuration
public class KafkaProducerNonAvroConfig {

    @Value("${kafka.server}")
    String server;

    @Autowired
    public Tracer tracer;
    
    String tranId = "eventsannonavro";
    @Primary
    @Bean(name = "transactionalProducerFactoryNonAvro")
    public ProducerFactory<String, String> producerFactoryNonAvro() {
        Map<String, Object> config = new HashMap<>();
        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, server);
        config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        config.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
        config.put(ProducerConfig.ACKS_CONFIG, "all");
        config.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, tranId);
        config.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
        config.put(ProducerConfig.LINGER_MS_CONFIG, "200");
        config.put(ProducerConfig.BATCH_SIZE_CONFIG, Integer.toString(256 * 1024));     
        config.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG,120000);
        config.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG,60000);                             
        config.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5);        
        config.put(ProducerConfig.BUFFER_MEMORY_CONFIG, Integer.toString(32768* 1024));
        config.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
        config.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, tranId);
        return new TracingProducerFactory<>(new DefaultKafkaProducerFactory<>(config), tracer);
    }

    @Primary
    @Qualifier("transactionalProducerFactoryNonAvro")
    @Bean(name = "transactionalKafkaTemplateNonAvro")
    public KafkaTemplate<String, String> kafkatemplate() {
        return new KafkaTemplate<>(producerFactoryNonAvro());
    }
    
    @Primary
    @Qualifier("transactionalProducerFactoryNonAvro")
    @Bean(name = "transactionalKafkaTransactionManagerNonAvro")
    public KafkaAwareTransactionManager<?, ?> kafkaTransactionManager(ProducerFactory<String, String> producerFactory) {
        return new KafkaTransactionManager<>(producerFactory);
    }
    
}

ProducerWrapper

@Service
public class KafkaTopicProducer {

    @Autowired
    private KafkaTemplate<String, TransactionAvroEntity> kafkaTemplate;

    @Autowired
    private KafkaTemplate<String, String> kafkaProducerNonAvrokafkaTemplate;

    public void topicProducerAvro(TransactionAvroEntity payload, String topic, Headers headers) {

        ProducerRecord<String, TransactionAvroEntity> producerRecord = new ProducerRecord<String, TransactionAvroEntity>(
                topic, null, UUID.randomUUID().toString(), payload, headers);


        kafkaTemplate.send(producerRecord);

    }

    public void kafkaAvroFlush() {
        kafkaTemplate.flush();
    }

    public void topicProducerNonAvro(String payload, String topic, Headers headers) {

        ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(topic, null,
                UUID.randomUUID().toString(), payload, headers);

        kafkaProducerNonAvrokafkaTemplate.send(producerRecord);

    }

    public void kafkaNonAvroFlush() {
        kafkaProducerNonAvrokafkaTemplate.flush();
    }
}

ERROR
Caused by: java.lang.IllegalStateException: No transaction is in process; possible solutions: run the template operation within the scope of a template.executeInTransaction() operation, start a transaction with @Transactional before invoking the template method, run in a transaction started by a listener container when consuming a record

Full Stack Trace

2022-05-03 09:35:11,358  INFO  [nerMoz-0-C-1] o.a.kafka.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-ifhEventSanitizer-1, groupId=ifhEventSanitizer] Seeking to offset 0 for partition za.local.file.singleLineGLTransactionEvent.1-0 
 2022-05-03 09:35:11,883  INFO  [nerMoz-0-C-1] o.a.kafka.clients.producer.KafkaProducer : [Producer clientId=producer-eventsanavroifhEventSanitizer.za.local.file.singleLineGLTransactionEvent.1.0, transactionalId=eventsanavroifhEventSanitizer.za.local.file.singleLineGLTransactionEvent.1.0] Aborting incomplete transaction 
 2022-05-03 09:35:11,884  ERROR [nerMoz-0-C-1] essageListenerContainer$ListenerConsumer : Transaction rolled back 
org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method 'public boolean com.fnb.fin.ifhEventSanitizer.kafka.KafkaConsumerMoz.consume(java.util.List<org.apache.kafka.clients.consumer.ConsumerRecord<java.lang.String, java.lang.String>>,org.apache.kafka.clients.consumer.Consumer<?, ?>)' threw exception; nested exception is java.lang.IllegalStateException: No transaction is in process; possible solutions: run the template operation within the scope of a template.executeInTransaction() operation, start a transaction with @Transactional before invoking the template method, run in a transaction started by a listener container when consuming a record; nested exception is java.lang.IllegalStateException: No transaction is in process; possible solutions: run the template operation within the scope of a template.executeInTransaction() operation, start a transaction with @Transactional before invoking the template method, run in a transaction started by a listener container when consuming a record
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:2372)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeBatchOnMessage(KafkaMessageListenerContainer.java:2008)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeBatchOnMessageWithRecordsOrList(KafkaMessageListenerContainer.java:1978)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeBatchOnMessage(KafkaMessageListenerContainer.java:1930)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeBatchListener(KafkaMessageListenerContainer.java:1842)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.access$2100(KafkaMessageListenerContainer.java:518)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer$1.doInTransactionWithoutResult(KafkaMessageListenerContainer.java:1749)
    at org.springframework.transaction.support.TransactionCallbackWithoutResult.doInTransaction(TransactionCallbackWithoutResult.java:36)
    at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:140)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeBatchListenerInTx(KafkaMessageListenerContainer.java:1740)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeBatchListener(KafkaMessageListenerContainer.java:1722)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1704)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1274)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1266)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1161)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.lang.Thread.run(Thread.java:832)
    Suppressed: org.springframework.kafka.listener.ListenerExecutionFailedException: Restored Stack Trace
        at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:363)
        at org.springframework.kafka.listener.adapter.BatchMessagingMessageListenerAdapter.invoke(BatchMessagingMessageListenerAdapter.java:180)
        at org.springframework.kafka.listener.adapter.BatchMessagingMessageListenerAdapter.onMessage(BatchMessagingMessageListenerAdapter.java:172)
        at org.springframework.kafka.listener.adapter.BatchMessagingMessageListenerAdapter.onMessage(BatchMessagingMessageListenerAdapter.java:61)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeBatchOnMessage(KafkaMessageListenerContainer.java:1988)
Caused by: java.lang.IllegalStateException: No transaction is in process; possible solutions: run the template operation within the scope of a template.executeInTransaction() operation, start a transaction with @Transactional before invoking the template method, run in a transaction started by a listener container when consuming a record
    at org.springframework.util.Assert.state(Assert.java:76)
    at org.springframework.kafka.core.KafkaTemplate.getTheProducer(KafkaTemplate.java:657)
    at org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:569)
    at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:406)
    at com.fnb.fin.ifhEventSanitizer.kafka.KafkaTopicProducer.topicProducerNonAvro(KafkaTopicProducer.java:44)
    at com.fnb.fin.ifhEventSanitizer.kafka.KafkaConsumerMoz.consume(KafkaConsumerMoz.java:108)
    at jdk.internal.reflect.GeneratedMethodAccessor111.invoke(Unknown Source)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:564)
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:171)
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:120)
    at org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:56)
    at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:347)
    at org.springframework.kafka.listener.adapter.BatchMessagingMessageListenerAdapter.invoke(BatchMessagingMessageListenerAdapter.java:180)
    at org.springframework.kafka.listener.adapter.BatchMessagingMessageListenerAdapter.onMessage(BatchMessagingMessageListenerAdapter.java:172)
    at org.springframework.kafka.listener.adapter.BatchMessagingMessageListenerAdapter.onMessage(BatchMessagingMessageListenerAdapter.java:61)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeBatchOnMessage(KafkaMessageListenerContainer.java:1988)
    ... 16 common frames omitted

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

苏大泽ㄣ 2025-02-01 02:04:17

kafkatransactionmanager只能从一个工厂中的生产者中启动交易;即使它可以启动两个,您也会失去EOS的保证,因为它们会是不同的交易,因此,如果您执行两者都会发送给两者,则它们不会进行相同的交易。

要解决此问题,您应该使用一个使用deLegatingByTypeserializerdeLegatingBytopicSerialializer的生​​产商工厂。

例如

public ProducerFactory<String, Object> producerFactory() {
    ...
    Map<Class<?>, Serializer> delegates = new LinkedHashMap<>(); // retains the order when iterating
    delegates.put(String.class, new StringSerializer());
    delegates.put(Object.class, new JsonSerializer<>());
    DelegatingByTypeSerializer dbts = new DelegatingByTypeSerializer(delegates, true);
    return new TracingProducerFactory<>(
        new DefaultKafkaProducerFactory<>(config, new StringSerializer(), dbts), tracer);
}

The KafkaTransactionManager can only start a transaction in a producer from one factory; even if it could start two, you would lose EOS guarantees since they would be different transactions so, if you perform sends to both, they won't be in the same transaction.

To solve this problem, you should use one producer factory with a DelegatingByTypeSerializer or DelegatingByTopicSerializer.

e.g.

public ProducerFactory<String, Object> producerFactory() {
    ...
    Map<Class<?>, Serializer> delegates = new LinkedHashMap<>(); // retains the order when iterating
    delegates.put(String.class, new StringSerializer());
    delegates.put(Object.class, new JsonSerializer<>());
    DelegatingByTypeSerializer dbts = new DelegatingByTypeSerializer(delegates, true);
    return new TracingProducerFactory<>(
        new DefaultKafkaProducerFactory<>(config, new StringSerializer(), dbts), tracer);
}
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文