java lambda函数connnect msk群集超时
我在同一VPC的MSK群集安全组中创建了Java Lambda功能。但是,当lambda执行代码时,在CloudWatch:
org.apache.kafka.common.errors.TimeoutException
我的Java创建这样的主题代码:
public String handleRequest(SQSEvent input, Context context) {
LambdaLogger logger = context.getLogger();
if(bootStrapServer == null) {
System.out.println("missing boot strap server env var");
return "Error, bootStrapServer env var missing";
}
Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServer);
props.put(AdminClientConfig.CLIENT_ID_CONFIG, "java-data-screaming-demo-lambda");
props.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, "PLAINTEXT");
try {
this.createTopic("TestLambdaTopic", props, logger);
} catch (Exception e) {
logger.log("err in creating topic: " + gson.toJson(e));
}
return "Ok";
}
public void createTopic(String topicName, Properties properties, LambdaLogger logger ) throws Exception {
try (Admin admin = Admin.create(properties)) {
int partitions = 1;
short replicationFactor = 2;
NewTopic newTopic = new NewTopic(topicName, partitions, replicationFactor);
List<NewTopic> topics = new ArrayList<NewTopic>();
topics.add(newTopic);
CreateTopicsResult result = admin.createTopics(topics);
// get the async result for the new topic creation
KafkaFuture<Void> future = result.values().get(topicName);
// call get() to block until topic creation has completed or failed
future.get();
if (future.isDone()) {
logger.log("future is done");
}
logger.log("what is result from create topics: " + gson.toJson(result));
}
}
I have created a java lambda function within same vpc, security group of MSK cluster. But when lambda execute the code, in cloudwatch:
org.apache.kafka.common.errors.TimeoutException
My java creating topic code like this:
public String handleRequest(SQSEvent input, Context context) {
LambdaLogger logger = context.getLogger();
if(bootStrapServer == null) {
System.out.println("missing boot strap server env var");
return "Error, bootStrapServer env var missing";
}
Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServer);
props.put(AdminClientConfig.CLIENT_ID_CONFIG, "java-data-screaming-demo-lambda");
props.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, "PLAINTEXT");
try {
this.createTopic("TestLambdaTopic", props, logger);
} catch (Exception e) {
logger.log("err in creating topic: " + gson.toJson(e));
}
return "Ok";
}
public void createTopic(String topicName, Properties properties, LambdaLogger logger ) throws Exception {
try (Admin admin = Admin.create(properties)) {
int partitions = 1;
short replicationFactor = 2;
NewTopic newTopic = new NewTopic(topicName, partitions, replicationFactor);
List<NewTopic> topics = new ArrayList<NewTopic>();
topics.add(newTopic);
CreateTopicsResult result = admin.createTopics(topics);
// get the async result for the new topic creation
KafkaFuture<Void> future = result.values().get(topicName);
// call get() to block until topic creation has completed or failed
future.get();
if (future.isDone()) {
logger.log("future is done");
}
logger.log("what is result from create topics: " + gson.toJson(result));
}
}
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
最后,我弄清楚了,这是VPC。我使用公司生产VPC及其子网或安全组设置有问题。
很奇怪的是,在VPC中部署客户端EC2可以访问cluser,只是关注AWS MSK教程,并且客户端EC2可以创建主题,发送和接收消息。
但是在VPC中部署lambda功能无法以某种方式工作。如果我以后找出原因,我将在此处进行更新。目前,我使用默认的VPC,该VPC没有太多的安全设置,并且可以正常工作。 lambda-&gt; MSK群集可以创建主题。
Finally I figure it out, it is the vpc. I used companies production vpc and its subnets or security groups setting have problem.
What is weird is that deploy a client EC2 in the vpc can access the cluser, just following aws msk tutorial and that client EC2 can create topics, send and receive messages.
But deploy a Lambda function in the vpc is not working somehow. I will update in here if I find out why later. Currently I use the default vpc, which dont have much security settings, and it works. Lambda -> MSK cluster can create topic.