java lambda函数connnect msk群集超时

发布于 2025-02-07 09:16:04 字数 1944 浏览 2 评论 0原文

我在同一VPC的MSK群集安全组中创建了Java Lambda功能。但是,当lambda执行代码时,在CloudWatch:

org.apache.kafka.common.errors.TimeoutException

我的Java创建这样的主题代码:

public String handleRequest(SQSEvent input, Context context) {

        LambdaLogger logger = context.getLogger();
        if(bootStrapServer == null) {
            System.out.println("missing boot strap server env var");
            return "Error, bootStrapServer env var missing";
        }

        Properties props = new Properties();
        props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServer);
        props.put(AdminClientConfig.CLIENT_ID_CONFIG, "java-data-screaming-demo-lambda");
        props.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, "PLAINTEXT");

        try {
            this.createTopic("TestLambdaTopic", props, logger);
        } catch (Exception e) {
            logger.log("err in creating topic: " + gson.toJson(e));
        }


        return "Ok";
    }

    public void createTopic(String topicName, Properties properties, LambdaLogger logger ) throws Exception {
        try (Admin admin = Admin.create(properties)) {
            int partitions = 1;
            short replicationFactor = 2;
            NewTopic newTopic = new NewTopic(topicName, partitions, replicationFactor);

            List<NewTopic> topics = new ArrayList<NewTopic>();
            topics.add(newTopic);

            CreateTopicsResult result = admin.createTopics(topics);
            // get the async result for the new topic creation
            KafkaFuture<Void> future = result.values().get(topicName);

            // call get() to block until topic creation has completed or failed
            future.get();
            if (future.isDone()) {
                logger.log("future is done");
            }

            logger.log("what is result from create topics: " + gson.toJson(result));
        }
    }

I have created a java lambda function within same vpc, security group of MSK cluster. But when lambda execute the code, in cloudwatch:

org.apache.kafka.common.errors.TimeoutException

My java creating topic code like this:

public String handleRequest(SQSEvent input, Context context) {

        LambdaLogger logger = context.getLogger();
        if(bootStrapServer == null) {
            System.out.println("missing boot strap server env var");
            return "Error, bootStrapServer env var missing";
        }

        Properties props = new Properties();
        props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServer);
        props.put(AdminClientConfig.CLIENT_ID_CONFIG, "java-data-screaming-demo-lambda");
        props.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, "PLAINTEXT");

        try {
            this.createTopic("TestLambdaTopic", props, logger);
        } catch (Exception e) {
            logger.log("err in creating topic: " + gson.toJson(e));
        }


        return "Ok";
    }

    public void createTopic(String topicName, Properties properties, LambdaLogger logger ) throws Exception {
        try (Admin admin = Admin.create(properties)) {
            int partitions = 1;
            short replicationFactor = 2;
            NewTopic newTopic = new NewTopic(topicName, partitions, replicationFactor);

            List<NewTopic> topics = new ArrayList<NewTopic>();
            topics.add(newTopic);

            CreateTopicsResult result = admin.createTopics(topics);
            // get the async result for the new topic creation
            KafkaFuture<Void> future = result.values().get(topicName);

            // call get() to block until topic creation has completed or failed
            future.get();
            if (future.isDone()) {
                logger.log("future is done");
            }

            logger.log("what is result from create topics: " + gson.toJson(result));
        }
    }

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

他夏了夏天 2025-02-14 09:16:04

最后,我弄清楚了,这是VPC。我使用公司生产VPC及其子网或安全组设置有问题。

很奇怪的是,在VPC中部署客户端EC2可以访问cluser,只是关注AWS MSK教程,并且客户端EC2可以创建主题,发送和接收消息。

但是在VPC中部署lambda功能无法以某种方式工作。如果我以后找出原因,我将在此处进行更新。目前,我使用默认的VPC,该VPC没有太多的安全设置,并且可以正常工作。 lambda-&gt; MSK群集可以创建主题。

Finally I figure it out, it is the vpc. I used companies production vpc and its subnets or security groups setting have problem.

What is weird is that deploy a client EC2 in the vpc can access the cluser, just following aws msk tutorial and that client EC2 can create topics, send and receive messages.

But deploy a Lambda function in the vpc is not working somehow. I will update in here if I find out why later. Currently I use the default vpc, which dont have much security settings, and it works. Lambda -> MSK cluster can create topic.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文