为每个kafka消息致电远程REST API

发布于 2025-02-09 23:29:15 字数 163 浏览 0 评论 0原文

我正在Kafka生产商中读取文件(逐行),并在流中读取每条记录(即每行),而Kafka消费者正在收到此记录。Further我需要将其发送给KAFKA消费者的REST API服务以进行一些处理我的回应将发送给其他话题,依此类推。 我不确定如何使用生产者-Compumer -Chafka API进行操作? 需要帮助

I am reading a file (line by line) in kafka producer and sending each record(i.e per line) across the stream and kafka consumer is receiving this record.Further i need to send it to rest api service from kafka consumer to do some processing and my response would be send to some other topic and so on.
I am not sure how can we do it using producer -consumer kafka api?
Need help

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

情绪 2025-02-16 23:29:15

这很简单。这就是您需要与Kafka Consumer Client一起开发Spring Boot应用程序的方式,该应用程序与任何后端REST API集成在一起。

  1. 一个可以使消费者对象的类
package x.x.x.x.x.

@Service

public class KAFKAConsumer implements Runnable{

    public void run(String topicName) {
        try {
            Consumer<String, String> consumer = new KafkaConsumer<>();
            consumer.subscribe(Collections.singleton(topicName));
                while (true) {
                    try {
                        LOGGER.debug("Polling topic for new events...");
                        ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(5000));
                        for (TopicPartition partition : consumerRecords.partitions()) {
                            List<ConsumerRecord<String, String>> partitionRecords = consumerRecords.records(partition);
                            for (ConsumerRecord<String, String> record : partitionRecords) {

                                // Calling REST API for each record

                                invokeRestAPIClass(Parameters....)

                                try {
                                    consumer.commitSync();
                                } catch (CommitFailedException e) {
                                }
                            }
                        }
                    } catch (SerializationException e1) {
                        continue;
                    } catch (Exception e2) {
                        continue;
                    }
                }

            } finally {
                consumer.close();
            }
        } catch (Exception e) {
        }
    }
  1. 定义REST API模板的类,

您可以 https://howtodoinjava.com/spring-boot2/resttemplate/spring-resttul-client-client-resttemplate-example/ 作为构建REST API模板的参考),根据REST API请求和响应结构,通过POJO类映射您的请求。

  1. 实施春季启动启动主类
package x.x.x.x.x.

@SpringBootApplication(scanBasePackages = "x.x.x.x.x")

@EnableAspectJAutoProxy

public class SpringBootStartup implements CommandLineRunner {

    @Inject
    private KAFKAConsumer consumer;

    @Override
    public void run(String... args){
        try {
            Thread springBootThread = new Thread(consumer);
            springBootThread.start();
        }catch(Exception ex)
        {
        }
    }

    public static void main(String[] args) throws Exception {
        SpringApplication.run(SpringBootStartup.class, args);
    }

}

It is pretty simple. This is how you need to develop a Spring Boot Application with KAFKA Consumer Client that integrates with any backend REST API.

  1. A Class to instatiate a consumer object
package x.x.x.x.x.

@Service

public class KAFKAConsumer implements Runnable{

    public void run(String topicName) {
        try {
            Consumer<String, String> consumer = new KafkaConsumer<>();
            consumer.subscribe(Collections.singleton(topicName));
                while (true) {
                    try {
                        LOGGER.debug("Polling topic for new events...");
                        ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(5000));
                        for (TopicPartition partition : consumerRecords.partitions()) {
                            List<ConsumerRecord<String, String>> partitionRecords = consumerRecords.records(partition);
                            for (ConsumerRecord<String, String> record : partitionRecords) {

                                // Calling REST API for each record

                                invokeRestAPIClass(Parameters....)

                                try {
                                    consumer.commitSync();
                                } catch (CommitFailedException e) {
                                }
                            }
                        }
                    } catch (SerializationException e1) {
                        continue;
                    } catch (Exception e2) {
                        continue;
                    }
                }

            } finally {
                consumer.close();
            }
        } catch (Exception e) {
        }
    }
  1. A class to define REST API Template

You can this https://howtodoinjava.com/spring-boot2/resttemplate/spring-restful-client-resttemplate-example/ as a reference to construct a REST API template and include invokeRestAPIClass(Parameters....), to map your request throuhg POJO Classes based on the REST API request and response structure.

  1. Implementing a Spring Boot Startup main class
package x.x.x.x.x.

@SpringBootApplication(scanBasePackages = "x.x.x.x.x")

@EnableAspectJAutoProxy

public class SpringBootStartup implements CommandLineRunner {

    @Inject
    private KAFKAConsumer consumer;

    @Override
    public void run(String... args){
        try {
            Thread springBootThread = new Thread(consumer);
            springBootThread.start();
        }catch(Exception ex)
        {
        }
    }

    public static void main(String[] args) throws Exception {
        SpringApplication.run(SpringBootStartup.class, args);
    }

}
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文