骆驼Elasticsearch组件抛出连接被拒绝
我正在使用和Elastic 7.7实例全部在Kubernetes群集上使用服务。
在与此路线进行简单集成时,我会被拒绝:
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.impl.DefaultCamelContext;
import org.apache.camel.component.elasticsearch.ElasticsearchComponent;
public class Routes extends RouteBuilder {
@Override
public void configure() throws Exception {
ElasticsearchComponent elasticsearchComponent = new ElasticsearchComponent();
elasticsearchComponent.setHostAddresses("elasticsearch-rlam-service:9200");
getContext().addComponent("elasticsearch-rest", elasticsearchComponent);
from("kafka:dbz?brokers={{kafka.bootstrap.address}}&groupId=apps&autoOffsetReset=earliest")
.choice()
.when().simple("${body} == 'null'")
.log("Null!")
.otherwise()
.log("Message: ${body}")
.to("elasticsearch-rest://elasticsearch?hostAddresses=elasticsearch-rlam-service:9200&operation=INDEX&indexName=dbz")
.endChoice();
}
}
对于此测试,呼叫ElasticSearchComponent不是强制性的,因为我不是通过凭据的端口9300连接,但仍将其删除并添加 hostaddresses
属性无法解决此问题。
StackTrace:
2022-06-20 00:11:35,837 INFO [route1] (Camel (camel-1) thread #1 - KafkaConsumer[dbz]) Null!
2022-06-20 00:11:35,849 INFO [route1] (Camel (camel-1) thread #1 - KafkaConsumer[dbz]) Message: {"last_name":"Ketchmar","id":1004,"first_name":"Anne","email":"[email protected]"}
2022-06-20 00:11:36,116 ERROR [org.apa.cam.pro.err.DefaultErrorHandler] (Camel (camel-1) thread #1 - KafkaConsumer[dbz]) Failed delivery for (MessageId: 274D73D47B2829F-0000000000000000 on ExchangeId: 274D73D47B2829F-0000000000000000). Exhausted after delivery attempt: 1 caught: java.net.ConnectException: Connection refused
Message History (source location and message history is disabled)
---------------------------------------------------------------------------------------------------------------------------------------
Source ID Processor Elapsed (ms)
route1/route1 from[kafka://dbz?autoOffsetReset=earliest&brokers= 272
...
route1/to1 elasticsearch-rest://elasticsearch?hostAddresses=e 0
Stacktrace
---------------------------------------------------------------------------------------------------------------------------------------: java.net.ConnectException: Connection refused
at org.elasticsearch.client.RestClient.extractAndWrapCause(RestClient.java:918)
at org.elasticsearch.client.RestClient.performRequest(RestClient.java:299)
at org.elasticsearch.client.RestClient.performRequest(RestClient.java:287)
at org.elasticsearch.client.RestHighLevelClient.internalPerformRequest(RestHighLevelClient.java:1632)
at org.elasticsearch.client.RestHighLevelClient.performRequest(RestHighLevelClient.java:1602)
at org.elasticsearch.client.RestHighLevelClient.performRequestAndParseEntity(RestHighLevelClient.java:1572)
at org.elasticsearch.client.RestHighLevelClient.index(RestHighLevelClient.java:989)
at org.apache.camel.component.elasticsearch.ElasticsearchProducer.process(ElasticsearchProducer.java:170)
at org.apache.camel.support.AsyncProcessorConverterHelper$ProcessorToAsyncProcessorBridge.process(AsyncProcessorConverterHelper.java:66)
at org.apache.camel.processor.SendProcessor.process(SendProcessor.java:172)
at org.apache.camel.processor.errorhandler.RedeliveryErrorHandler$SimpleTask.run(RedeliveryErrorHandler.java:471)
at org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.schedule(DefaultReactiveExecutor.java:193)
at org.apache.camel.impl.engine.DefaultReactiveExecutor.scheduleMain(DefaultReactiveExecutor.java:64)
at org.apache.camel.processor.Pipeline.process(Pipeline.java:184)
at org.apache.camel.impl.engine.CamelInternalProcessor.process(CamelInternalProcessor.java:399)
at org.apache.camel.impl.engine.DefaultAsyncProcessorAwaitManager.process(DefaultAsyncProcessorAwaitManager.java:83)
at org.apache.camel.support.AsyncProcessorSupport.process(AsyncProcessorSupport.java:41)
at org.apache.camel.component.kafka.consumer.support.KafkaRecordProcessor.processExchange(KafkaRecordProcessor.java:109)
at org.apache.camel.component.kafka.consumer.support.KafkaRecordProcessorFacade.processRecord(KafkaRecordProcessorFacade.java:120)
at org.apache.camel.component.kafka.consumer.support.KafkaRecordProcessorFacade.processPolledRecords(KafkaRecordProcessorFacade.java:80)
at org.apache.camel.component.kafka.KafkaFetchRecords.startPolling(KafkaFetchRecords.java:280)
at org.apache.camel.component.kafka.KafkaFetchRecords.run(KafkaFetchRecords.java:181)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.net.ConnectException: Connection refused
at java.base/sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at java.base/sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:777)
at org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor.processEvent(DefaultConnectingIOReactor.java:174)
at org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor.processEvents(DefaultConnectingIOReactor.java:148)
at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor.execute(AbstractMultiworkerIOReactor.java:351)
at org.apache.http.impl.nio.conn.PoolingNHttpClientConnectionManager.execute(PoolingNHttpClientConnectionManager.java:221)
at org.apache.http.impl.nio.client.CloseableHttpAsyncClientBase$1.run(CloseableHttpAsyncClientBase.java:64)
... 1 more
我尝试从另一个豆荚到达ES,而且效果很好
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论