骆驼Elasticsearch组件抛出连接被拒绝

发布于 2025-02-08 15:41:05 字数 6194 浏览 1 评论 0 原文

我正在使用和Elastic 7.7实例全部在Kubernetes群集上使用服务。

在与此路线进行简单集成时,我会被拒绝:

import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.impl.DefaultCamelContext;

import org.apache.camel.component.elasticsearch.ElasticsearchComponent;

public class Routes extends RouteBuilder {
    
    @Override
    public void configure() throws Exception {

        ElasticsearchComponent elasticsearchComponent = new ElasticsearchComponent();
        elasticsearchComponent.setHostAddresses("elasticsearch-rlam-service:9200");
        getContext().addComponent("elasticsearch-rest", elasticsearchComponent);

        from("kafka:dbz?brokers={{kafka.bootstrap.address}}&groupId=apps&autoOffsetReset=earliest")
            .choice()
                .when().simple("${body} == 'null'")
                    .log("Null!")
                .otherwise()
                    .log("Message: ${body}")
                    .to("elasticsearch-rest://elasticsearch?hostAddresses=elasticsearch-rlam-service:9200&operation=INDEX&indexName=dbz")
            .endChoice();
    }
}

对于此测试,呼叫ElasticSearchComponent不是强制性的,因为我不是通过凭据的端口9300连接,但仍将其删除并添加 hostaddresses 属性无法解决此问题。

StackTrace:

2022-06-20 00:11:35,837 INFO [route1] (Camel (camel-1) thread #1 - KafkaConsumer[dbz]) Null!
2022-06-20 00:11:35,849 INFO [route1] (Camel (camel-1) thread #1 - KafkaConsumer[dbz]) Message: {"last_name":"Ketchmar","id":1004,"first_name":"Anne","email":"[email protected]"}
2022-06-20 00:11:36,116 ERROR [org.apa.cam.pro.err.DefaultErrorHandler] (Camel (camel-1) thread #1 - KafkaConsumer[dbz]) Failed delivery for (MessageId: 274D73D47B2829F-0000000000000000 on ExchangeId: 274D73D47B2829F-0000000000000000). Exhausted after delivery attempt: 1 caught: java.net.ConnectException: Connection refused
Message History (source location and message history is disabled)
---------------------------------------------------------------------------------------------------------------------------------------
Source ID Processor Elapsed (ms)
route1/route1 from[kafka://dbz?autoOffsetReset=earliest&brokers= 272
...
route1/to1 elasticsearch-rest://elasticsearch?hostAddresses=e 0
Stacktrace
---------------------------------------------------------------------------------------------------------------------------------------: java.net.ConnectException: Connection refused
at org.elasticsearch.client.RestClient.extractAndWrapCause(RestClient.java:918)
at org.elasticsearch.client.RestClient.performRequest(RestClient.java:299)
at org.elasticsearch.client.RestClient.performRequest(RestClient.java:287)
at org.elasticsearch.client.RestHighLevelClient.internalPerformRequest(RestHighLevelClient.java:1632)
at org.elasticsearch.client.RestHighLevelClient.performRequest(RestHighLevelClient.java:1602)
at org.elasticsearch.client.RestHighLevelClient.performRequestAndParseEntity(RestHighLevelClient.java:1572)
at org.elasticsearch.client.RestHighLevelClient.index(RestHighLevelClient.java:989)
at org.apache.camel.component.elasticsearch.ElasticsearchProducer.process(ElasticsearchProducer.java:170)
at org.apache.camel.support.AsyncProcessorConverterHelper$ProcessorToAsyncProcessorBridge.process(AsyncProcessorConverterHelper.java:66)
at org.apache.camel.processor.SendProcessor.process(SendProcessor.java:172)
at org.apache.camel.processor.errorhandler.RedeliveryErrorHandler$SimpleTask.run(RedeliveryErrorHandler.java:471)
at org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.schedule(DefaultReactiveExecutor.java:193)
at org.apache.camel.impl.engine.DefaultReactiveExecutor.scheduleMain(DefaultReactiveExecutor.java:64)
at org.apache.camel.processor.Pipeline.process(Pipeline.java:184)
at org.apache.camel.impl.engine.CamelInternalProcessor.process(CamelInternalProcessor.java:399)
at org.apache.camel.impl.engine.DefaultAsyncProcessorAwaitManager.process(DefaultAsyncProcessorAwaitManager.java:83)
at org.apache.camel.support.AsyncProcessorSupport.process(AsyncProcessorSupport.java:41)
at org.apache.camel.component.kafka.consumer.support.KafkaRecordProcessor.processExchange(KafkaRecordProcessor.java:109)
at org.apache.camel.component.kafka.consumer.support.KafkaRecordProcessorFacade.processRecord(KafkaRecordProcessorFacade.java:120)
at org.apache.camel.component.kafka.consumer.support.KafkaRecordProcessorFacade.processPolledRecords(KafkaRecordProcessorFacade.java:80)
at org.apache.camel.component.kafka.KafkaFetchRecords.startPolling(KafkaFetchRecords.java:280)
at org.apache.camel.component.kafka.KafkaFetchRecords.run(KafkaFetchRecords.java:181)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.net.ConnectException: Connection refused
at java.base/sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at java.base/sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:777)
at org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor.processEvent(DefaultConnectingIOReactor.java:174)
at org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor.processEvents(DefaultConnectingIOReactor.java:148)
at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor.execute(AbstractMultiworkerIOReactor.java:351)
at org.apache.http.impl.nio.conn.PoolingNHttpClientConnectionManager.execute(PoolingNHttpClientConnectionManager.java:221)
at org.apache.http.impl.nio.client.CloseableHttpAsyncClientBase$1.run(CloseableHttpAsyncClientBase.java:64)
... 1 more

我尝试从另一个豆荚到达ES,而且效果很好

I'm using latest Camel 3.17 on Camel-K with Elasticsearch REST Component and Elastic 7.7 instance all on Kubernetes Cluster with Services.

I'm getting Connection refused when running a simple integration with this Route:

import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.impl.DefaultCamelContext;

import org.apache.camel.component.elasticsearch.ElasticsearchComponent;

public class Routes extends RouteBuilder {
    
    @Override
    public void configure() throws Exception {

        ElasticsearchComponent elasticsearchComponent = new ElasticsearchComponent();
        elasticsearchComponent.setHostAddresses("elasticsearch-rlam-service:9200");
        getContext().addComponent("elasticsearch-rest", elasticsearchComponent);

        from("kafka:dbz?brokers={{kafka.bootstrap.address}}&groupId=apps&autoOffsetReset=earliest")
            .choice()
                .when().simple("${body} == 'null'")
                    .log("Null!")
                .otherwise()
                    .log("Message: ${body}")
                    .to("elasticsearch-rest://elasticsearch?hostAddresses=elasticsearch-rlam-service:9200&operation=INDEX&indexName=dbz")
            .endChoice();
    }
}

The call for ElasticSearchComponent is not mandatory for this test since I'm not connecting through port 9300 with credentials, but still removing it and adding hostAddresses property doesn't resolve this issue.

The stacktrace:

2022-06-20 00:11:35,837 INFO [route1] (Camel (camel-1) thread #1 - KafkaConsumer[dbz]) Null!
2022-06-20 00:11:35,849 INFO [route1] (Camel (camel-1) thread #1 - KafkaConsumer[dbz]) Message: {"last_name":"Ketchmar","id":1004,"first_name":"Anne","email":"[email protected]"}
2022-06-20 00:11:36,116 ERROR [org.apa.cam.pro.err.DefaultErrorHandler] (Camel (camel-1) thread #1 - KafkaConsumer[dbz]) Failed delivery for (MessageId: 274D73D47B2829F-0000000000000000 on ExchangeId: 274D73D47B2829F-0000000000000000). Exhausted after delivery attempt: 1 caught: java.net.ConnectException: Connection refused
Message History (source location and message history is disabled)
---------------------------------------------------------------------------------------------------------------------------------------
Source ID Processor Elapsed (ms)
route1/route1 from[kafka://dbz?autoOffsetReset=earliest&brokers= 272
...
route1/to1 elasticsearch-rest://elasticsearch?hostAddresses=e 0
Stacktrace
---------------------------------------------------------------------------------------------------------------------------------------: java.net.ConnectException: Connection refused
at org.elasticsearch.client.RestClient.extractAndWrapCause(RestClient.java:918)
at org.elasticsearch.client.RestClient.performRequest(RestClient.java:299)
at org.elasticsearch.client.RestClient.performRequest(RestClient.java:287)
at org.elasticsearch.client.RestHighLevelClient.internalPerformRequest(RestHighLevelClient.java:1632)
at org.elasticsearch.client.RestHighLevelClient.performRequest(RestHighLevelClient.java:1602)
at org.elasticsearch.client.RestHighLevelClient.performRequestAndParseEntity(RestHighLevelClient.java:1572)
at org.elasticsearch.client.RestHighLevelClient.index(RestHighLevelClient.java:989)
at org.apache.camel.component.elasticsearch.ElasticsearchProducer.process(ElasticsearchProducer.java:170)
at org.apache.camel.support.AsyncProcessorConverterHelper$ProcessorToAsyncProcessorBridge.process(AsyncProcessorConverterHelper.java:66)
at org.apache.camel.processor.SendProcessor.process(SendProcessor.java:172)
at org.apache.camel.processor.errorhandler.RedeliveryErrorHandler$SimpleTask.run(RedeliveryErrorHandler.java:471)
at org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.schedule(DefaultReactiveExecutor.java:193)
at org.apache.camel.impl.engine.DefaultReactiveExecutor.scheduleMain(DefaultReactiveExecutor.java:64)
at org.apache.camel.processor.Pipeline.process(Pipeline.java:184)
at org.apache.camel.impl.engine.CamelInternalProcessor.process(CamelInternalProcessor.java:399)
at org.apache.camel.impl.engine.DefaultAsyncProcessorAwaitManager.process(DefaultAsyncProcessorAwaitManager.java:83)
at org.apache.camel.support.AsyncProcessorSupport.process(AsyncProcessorSupport.java:41)
at org.apache.camel.component.kafka.consumer.support.KafkaRecordProcessor.processExchange(KafkaRecordProcessor.java:109)
at org.apache.camel.component.kafka.consumer.support.KafkaRecordProcessorFacade.processRecord(KafkaRecordProcessorFacade.java:120)
at org.apache.camel.component.kafka.consumer.support.KafkaRecordProcessorFacade.processPolledRecords(KafkaRecordProcessorFacade.java:80)
at org.apache.camel.component.kafka.KafkaFetchRecords.startPolling(KafkaFetchRecords.java:280)
at org.apache.camel.component.kafka.KafkaFetchRecords.run(KafkaFetchRecords.java:181)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.net.ConnectException: Connection refused
at java.base/sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at java.base/sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:777)
at org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor.processEvent(DefaultConnectingIOReactor.java:174)
at org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor.processEvents(DefaultConnectingIOReactor.java:148)
at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor.execute(AbstractMultiworkerIOReactor.java:351)
at org.apache.http.impl.nio.conn.PoolingNHttpClientConnectionManager.execute(PoolingNHttpClientConnectionManager.java:221)
at org.apache.http.impl.nio.client.CloseableHttpAsyncClientBase$1.run(CloseableHttpAsyncClientBase.java:64)
... 1 more

I've tried reaching ES from another pod and it works just fine

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文