kafka with PublishSubscribechannel投掷Java.lang.outofmemoryerror:直接缓冲器内存
假设我对kafkaconsumer的配置进行了以下配置,我的kafka正在将purplishsubscribechannel与taskexecutor一起使用。
@Bean
@InboundChannelAdapter(channel = "someInputChannel",poller = @Poller(fixedDelay = "5000", taskExecutor = "taskexecutor"))
public KafkaMessageSource getKafkaMessageSource() {
KafkaMessageSource kafkaMessageSource = new KafkaMessageSource (consumerFactory, new ConsumerProperties("topic"));
kafkaMessageSource.getConsumerProperties().setClientId("listner");
kafkaMessageSource.setMessageConverter(messageConverter());
kafkaMessageSource.setPayloadType(CutsomRequest.class);
return kafkaMessageSource;
}
threadpooltaskexecuter
@Bean(name = "taskexecutor")
public ThreadPoolTaskExecutor queryRequestTaskExecutor() {
ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
threadPoolTaskExecutor.setCorePoolSize(poolSize);
threadPoolTaskExecutor.setMaxPoolSize(maxPoolSize);
threadPoolTaskExecutor.setThreadNamePrefix("Request-");
threadPoolTaskExecutor.setWaitForTasksToCompleteOnShutdown(true);
return threadPoolTaskExecutor;
}
dorequest
@Bean
public MessageChannel doRequest() {
return new PublishSubscribeChannel(taskexecutor);
}
我面临的
问题是java.lang.outofmemoryerror:下面的直接缓冲记忆是我问题的log stacktrace: -
2022-04-08 09:25:06.859 ERROR 9 --- [scheduling-1] o.s.i.c.MessagePublishingErrorHandler : failure occurred in messaging task
java.lang.OutOfMemoryError: Direct buffer memory
at java.nio.Bits.reserveMemory(Bits.java:695) ~[?:1.8.0_311]
at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123) ~[?:1.8.0_311]
at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311) ~[?:1.8.0_311]
at sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:241) ~[?:1.8.0_311]
at sun.nio.ch.IOUtil.read(IOUtil.java:195) ~[?:1.8.0_311]
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:378) ~[?:1.8.0_311]
at org.apache.kafka.common.network.PlaintextTransportLayer.read(PlaintextTransportLayer.java:103) ~[kafka-clients-2.8.1.jar!/:?]
at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:118) ~[kafka-clients-2.8.1.jar!/:?]
at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:452) ~[kafka-clients-2.8.1.jar!/:?]
at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:402) ~[kafka-clients-2.8.1.jar!/:?]
at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:674) ~[kafka-clients-2.8.1.jar!/:?]
at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:576) ~[kafka-clients-2.8.1.jar!/:?]
at org.apache.kafka.common.network.Selector.poll(Selector.java:481) ~[kafka-clients-2.8.1.jar!/:?]
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:561) ~[kafka-clients-2.8.1.jar!/:?]
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:265) ~[kafka-clients-2.8.1.jar!/:?]
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236) ~[kafka-clients-2.8.1.jar!/:?]
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:227) ~[kafka-clients-2.8.1.jar!/:?]
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:164) ~[kafka-clients-2.8.1.jar!/:?]
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:257) ~[kafka-clients-2.8.1.jar!/:?]
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:480) ~[kafka-clients-2.8.1.jar!/:?]
at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1261) ~[kafka-clients-2.8.1.jar!/:?]
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1230) ~[kafka-clients-2.8.1.jar!/:?]
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1210) ~[kafka-clients-2.8.1.jar!/:?]
at org.springframework.integration.kafka.inbound.KafkaMessageSource.doReceive(KafkaMessageSource.java:441) ~[spring-integration-kafka-3.3.1.RELEASE.jar!/:3.3.1.RELEASE]
at org.springframework.integration.endpoint.AbstractMessageSource.receive(AbstractMessageSource.java:184) ~[spring-integration-core-5.3.2.RELEASE.jar!/:5.3.2.RELEASE]
at org.springframework.integration.endpoint.SourcePollingChannelAdapter.receiveMessage(SourcePollingChannelAdapter.java:212) ~[spring-integration-core-5.3.2.RELEASE.jar!/:5.3.2.RELEASE]
at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:407) ~[spring-integration-core-5.3.2.RELEASE.jar!/:5.3.2.RELEASE]
at org.springframework.integration.endpoint.AbstractPollingEndpoint.pollForMessage(AbstractPollingEndpoint.java:376) ~[spring-integration-core-5.3.2.RELEASE.jar!/:5.3.2.RELEASE]
at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$null$3(AbstractPollingEndpoint.java:323) ~[spring-integration-core-5.3.2.RELEASE.jar!/:5.3.2.RELEASE]
at org.springframework.integration.util.ErrorHandlingTaskExecutor.lambda$execute$0(ErrorHandlingTaskExecutor.java:57) ~[spring-integration-core-5.3.2.RELEASE.jar!/:5.3.2.RELEASE]
at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50) ~[spring-core-5.2.19.RELEASE.jar!/:5.2.19.RELEASE]
at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:55) ~[spring-integration-core-5.3.2.RELEASE.jar!/:5.3.2.RELEASE]
at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$createPoller$4(AbstractPollingEndpoint.java:320) ~[spring-integration-core-5.3.2.RELEASE.jar!/:5.3.2.RELEASE]
at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54) [spring-context-5.2.9.RELEASE.jar!/:5.2.9.RELEASE]
at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:93) [spring-context-5.2.9.RELEASE.jar!/:5.2.9.RELEASE]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_311]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_311]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) [?:1.8.0_311]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) [?:1.8.0_311]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_311]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_311]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_311]
有人可以帮助我解决这个问题。谢谢
Let say i have below configuration for KafkaConsumer my Kafka is using PublishSubscribeChannel with taskexecutor.
@Bean
@InboundChannelAdapter(channel = "someInputChannel",poller = @Poller(fixedDelay = "5000", taskExecutor = "taskexecutor"))
public KafkaMessageSource getKafkaMessageSource() {
KafkaMessageSource kafkaMessageSource = new KafkaMessageSource (consumerFactory, new ConsumerProperties("topic"));
kafkaMessageSource.getConsumerProperties().setClientId("listner");
kafkaMessageSource.setMessageConverter(messageConverter());
kafkaMessageSource.setPayloadType(CutsomRequest.class);
return kafkaMessageSource;
}
ThreadPoolTaskExecutor
@Bean(name = "taskexecutor")
public ThreadPoolTaskExecutor queryRequestTaskExecutor() {
ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
threadPoolTaskExecutor.setCorePoolSize(poolSize);
threadPoolTaskExecutor.setMaxPoolSize(maxPoolSize);
threadPoolTaskExecutor.setThreadNamePrefix("Request-");
threadPoolTaskExecutor.setWaitForTasksToCompleteOnShutdown(true);
return threadPoolTaskExecutor;
}
Dorequest
@Bean
public MessageChannel doRequest() {
return new PublishSubscribeChannel(taskexecutor);
}
Issue I am facing is java.lang.OutOfMemoryError: Direct buffer memory
Below is log Stacktrace for my issue:-
2022-04-08 09:25:06.859 ERROR 9 --- [scheduling-1] o.s.i.c.MessagePublishingErrorHandler : failure occurred in messaging task
java.lang.OutOfMemoryError: Direct buffer memory
at java.nio.Bits.reserveMemory(Bits.java:695) ~[?:1.8.0_311]
at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123) ~[?:1.8.0_311]
at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311) ~[?:1.8.0_311]
at sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:241) ~[?:1.8.0_311]
at sun.nio.ch.IOUtil.read(IOUtil.java:195) ~[?:1.8.0_311]
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:378) ~[?:1.8.0_311]
at org.apache.kafka.common.network.PlaintextTransportLayer.read(PlaintextTransportLayer.java:103) ~[kafka-clients-2.8.1.jar!/:?]
at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:118) ~[kafka-clients-2.8.1.jar!/:?]
at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:452) ~[kafka-clients-2.8.1.jar!/:?]
at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:402) ~[kafka-clients-2.8.1.jar!/:?]
at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:674) ~[kafka-clients-2.8.1.jar!/:?]
at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:576) ~[kafka-clients-2.8.1.jar!/:?]
at org.apache.kafka.common.network.Selector.poll(Selector.java:481) ~[kafka-clients-2.8.1.jar!/:?]
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:561) ~[kafka-clients-2.8.1.jar!/:?]
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:265) ~[kafka-clients-2.8.1.jar!/:?]
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236) ~[kafka-clients-2.8.1.jar!/:?]
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:227) ~[kafka-clients-2.8.1.jar!/:?]
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:164) ~[kafka-clients-2.8.1.jar!/:?]
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:257) ~[kafka-clients-2.8.1.jar!/:?]
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:480) ~[kafka-clients-2.8.1.jar!/:?]
at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1261) ~[kafka-clients-2.8.1.jar!/:?]
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1230) ~[kafka-clients-2.8.1.jar!/:?]
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1210) ~[kafka-clients-2.8.1.jar!/:?]
at org.springframework.integration.kafka.inbound.KafkaMessageSource.doReceive(KafkaMessageSource.java:441) ~[spring-integration-kafka-3.3.1.RELEASE.jar!/:3.3.1.RELEASE]
at org.springframework.integration.endpoint.AbstractMessageSource.receive(AbstractMessageSource.java:184) ~[spring-integration-core-5.3.2.RELEASE.jar!/:5.3.2.RELEASE]
at org.springframework.integration.endpoint.SourcePollingChannelAdapter.receiveMessage(SourcePollingChannelAdapter.java:212) ~[spring-integration-core-5.3.2.RELEASE.jar!/:5.3.2.RELEASE]
at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:407) ~[spring-integration-core-5.3.2.RELEASE.jar!/:5.3.2.RELEASE]
at org.springframework.integration.endpoint.AbstractPollingEndpoint.pollForMessage(AbstractPollingEndpoint.java:376) ~[spring-integration-core-5.3.2.RELEASE.jar!/:5.3.2.RELEASE]
at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$null$3(AbstractPollingEndpoint.java:323) ~[spring-integration-core-5.3.2.RELEASE.jar!/:5.3.2.RELEASE]
at org.springframework.integration.util.ErrorHandlingTaskExecutor.lambda$execute$0(ErrorHandlingTaskExecutor.java:57) ~[spring-integration-core-5.3.2.RELEASE.jar!/:5.3.2.RELEASE]
at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50) ~[spring-core-5.2.19.RELEASE.jar!/:5.2.19.RELEASE]
at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:55) ~[spring-integration-core-5.3.2.RELEASE.jar!/:5.3.2.RELEASE]
at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$createPoller$4(AbstractPollingEndpoint.java:320) ~[spring-integration-core-5.3.2.RELEASE.jar!/:5.3.2.RELEASE]
at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54) [spring-context-5.2.9.RELEASE.jar!/:5.2.9.RELEASE]
at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:93) [spring-context-5.2.9.RELEASE.jar!/:5.2.9.RELEASE]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_311]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_311]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) [?:1.8.0_311]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) [?:1.8.0_311]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_311]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_311]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_311]
Can someone help me out with this issue. Thanks
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论