kafka with PublishSubscribechannel投掷Java.lang.outofmemoryerror:直接缓冲器内存

发布于 2025-01-20 17:01:34 字数 7253 浏览 4 评论 0原文

假设我对kafkaconsumer的配置进行了以下配置,我的kafka正在将purplishsubscribechannel与taskexecutor一起使用。

    @Bean
    @InboundChannelAdapter(channel = "someInputChannel",poller = @Poller(fixedDelay = "5000", taskExecutor = "taskexecutor"))
    public KafkaMessageSource getKafkaMessageSource() {

        KafkaMessageSource kafkaMessageSource = new KafkaMessageSource (consumerFactory, new ConsumerProperties("topic"));
        kafkaMessageSource.getConsumerProperties().setClientId("listner");
        kafkaMessageSource.setMessageConverter(messageConverter());
        kafkaMessageSource.setPayloadType(CutsomRequest.class);
        return kafkaMessageSource;
    }   

threadpooltaskexecuter


   @Bean(name = "taskexecutor")
    public ThreadPoolTaskExecutor queryRequestTaskExecutor() {
        ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
        threadPoolTaskExecutor.setCorePoolSize(poolSize);
        threadPoolTaskExecutor.setMaxPoolSize(maxPoolSize);
        threadPoolTaskExecutor.setThreadNamePrefix("Request-");
        threadPoolTaskExecutor.setWaitForTasksToCompleteOnShutdown(true);
        return threadPoolTaskExecutor;
    }

dorequest

 @Bean
    public MessageChannel doRequest() {
        return new PublishSubscribeChannel(taskexecutor);
    }

我面临的

问题是java.lang.outofmemoryerror:下面的直接缓冲记忆是我问题的log stacktrace: -


2022-04-08 09:25:06.859 ERROR 9 --- [scheduling-1] o.s.i.c.MessagePublishingErrorHandler    : failure occurred in messaging task

java.lang.OutOfMemoryError: Direct buffer memory
        at java.nio.Bits.reserveMemory(Bits.java:695) ~[?:1.8.0_311]
        at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123) ~[?:1.8.0_311]
        at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311) ~[?:1.8.0_311]
        at sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:241) ~[?:1.8.0_311]
        at sun.nio.ch.IOUtil.read(IOUtil.java:195) ~[?:1.8.0_311]
        at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:378) ~[?:1.8.0_311]
        at org.apache.kafka.common.network.PlaintextTransportLayer.read(PlaintextTransportLayer.java:103) ~[kafka-clients-2.8.1.jar!/:?]
        at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:118) ~[kafka-clients-2.8.1.jar!/:?]
        at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:452) ~[kafka-clients-2.8.1.jar!/:?]
        at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:402) ~[kafka-clients-2.8.1.jar!/:?]
        at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:674) ~[kafka-clients-2.8.1.jar!/:?]
        at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:576) ~[kafka-clients-2.8.1.jar!/:?]
        at org.apache.kafka.common.network.Selector.poll(Selector.java:481) ~[kafka-clients-2.8.1.jar!/:?]
        at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:561) ~[kafka-clients-2.8.1.jar!/:?]
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:265) ~[kafka-clients-2.8.1.jar!/:?]
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236) ~[kafka-clients-2.8.1.jar!/:?]
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:227) ~[kafka-clients-2.8.1.jar!/:?]
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:164) ~[kafka-clients-2.8.1.jar!/:?]
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:257) ~[kafka-clients-2.8.1.jar!/:?]
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:480) ~[kafka-clients-2.8.1.jar!/:?]
        at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1261) ~[kafka-clients-2.8.1.jar!/:?]
        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1230) ~[kafka-clients-2.8.1.jar!/:?]
        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1210) ~[kafka-clients-2.8.1.jar!/:?]
        at org.springframework.integration.kafka.inbound.KafkaMessageSource.doReceive(KafkaMessageSource.java:441) ~[spring-integration-kafka-3.3.1.RELEASE.jar!/:3.3.1.RELEASE]
        at org.springframework.integration.endpoint.AbstractMessageSource.receive(AbstractMessageSource.java:184) ~[spring-integration-core-5.3.2.RELEASE.jar!/:5.3.2.RELEASE]
        at org.springframework.integration.endpoint.SourcePollingChannelAdapter.receiveMessage(SourcePollingChannelAdapter.java:212) ~[spring-integration-core-5.3.2.RELEASE.jar!/:5.3.2.RELEASE]
        at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:407) ~[spring-integration-core-5.3.2.RELEASE.jar!/:5.3.2.RELEASE]
        at org.springframework.integration.endpoint.AbstractPollingEndpoint.pollForMessage(AbstractPollingEndpoint.java:376) ~[spring-integration-core-5.3.2.RELEASE.jar!/:5.3.2.RELEASE]
        at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$null$3(AbstractPollingEndpoint.java:323) ~[spring-integration-core-5.3.2.RELEASE.jar!/:5.3.2.RELEASE]
        at org.springframework.integration.util.ErrorHandlingTaskExecutor.lambda$execute$0(ErrorHandlingTaskExecutor.java:57) ~[spring-integration-core-5.3.2.RELEASE.jar!/:5.3.2.RELEASE]
        at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50) ~[spring-core-5.2.19.RELEASE.jar!/:5.2.19.RELEASE]
        at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:55) ~[spring-integration-core-5.3.2.RELEASE.jar!/:5.3.2.RELEASE]
        at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$createPoller$4(AbstractPollingEndpoint.java:320) ~[spring-integration-core-5.3.2.RELEASE.jar!/:5.3.2.RELEASE]
        at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54) [spring-context-5.2.9.RELEASE.jar!/:5.2.9.RELEASE]
        at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:93) [spring-context-5.2.9.RELEASE.jar!/:5.2.9.RELEASE]
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_311]
        at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_311]
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) [?:1.8.0_311]
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) [?:1.8.0_311]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_311]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_311]
        at java.lang.Thread.run(Thread.java:748) [?:1.8.0_311]

有人可以帮助我解决这个问题。谢谢

Let say i have below configuration for KafkaConsumer my Kafka is using PublishSubscribeChannel with taskexecutor.

    @Bean
    @InboundChannelAdapter(channel = "someInputChannel",poller = @Poller(fixedDelay = "5000", taskExecutor = "taskexecutor"))
    public KafkaMessageSource getKafkaMessageSource() {

        KafkaMessageSource kafkaMessageSource = new KafkaMessageSource (consumerFactory, new ConsumerProperties("topic"));
        kafkaMessageSource.getConsumerProperties().setClientId("listner");
        kafkaMessageSource.setMessageConverter(messageConverter());
        kafkaMessageSource.setPayloadType(CutsomRequest.class);
        return kafkaMessageSource;
    }   

ThreadPoolTaskExecutor


   @Bean(name = "taskexecutor")
    public ThreadPoolTaskExecutor queryRequestTaskExecutor() {
        ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
        threadPoolTaskExecutor.setCorePoolSize(poolSize);
        threadPoolTaskExecutor.setMaxPoolSize(maxPoolSize);
        threadPoolTaskExecutor.setThreadNamePrefix("Request-");
        threadPoolTaskExecutor.setWaitForTasksToCompleteOnShutdown(true);
        return threadPoolTaskExecutor;
    }

Dorequest

 @Bean
    public MessageChannel doRequest() {
        return new PublishSubscribeChannel(taskexecutor);
    }

Issue I am facing is java.lang.OutOfMemoryError: Direct buffer memory

Below is log Stacktrace for my issue:-


2022-04-08 09:25:06.859 ERROR 9 --- [scheduling-1] o.s.i.c.MessagePublishingErrorHandler    : failure occurred in messaging task

java.lang.OutOfMemoryError: Direct buffer memory
        at java.nio.Bits.reserveMemory(Bits.java:695) ~[?:1.8.0_311]
        at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123) ~[?:1.8.0_311]
        at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311) ~[?:1.8.0_311]
        at sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:241) ~[?:1.8.0_311]
        at sun.nio.ch.IOUtil.read(IOUtil.java:195) ~[?:1.8.0_311]
        at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:378) ~[?:1.8.0_311]
        at org.apache.kafka.common.network.PlaintextTransportLayer.read(PlaintextTransportLayer.java:103) ~[kafka-clients-2.8.1.jar!/:?]
        at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:118) ~[kafka-clients-2.8.1.jar!/:?]
        at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:452) ~[kafka-clients-2.8.1.jar!/:?]
        at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:402) ~[kafka-clients-2.8.1.jar!/:?]
        at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:674) ~[kafka-clients-2.8.1.jar!/:?]
        at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:576) ~[kafka-clients-2.8.1.jar!/:?]
        at org.apache.kafka.common.network.Selector.poll(Selector.java:481) ~[kafka-clients-2.8.1.jar!/:?]
        at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:561) ~[kafka-clients-2.8.1.jar!/:?]
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:265) ~[kafka-clients-2.8.1.jar!/:?]
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236) ~[kafka-clients-2.8.1.jar!/:?]
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:227) ~[kafka-clients-2.8.1.jar!/:?]
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:164) ~[kafka-clients-2.8.1.jar!/:?]
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:257) ~[kafka-clients-2.8.1.jar!/:?]
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:480) ~[kafka-clients-2.8.1.jar!/:?]
        at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1261) ~[kafka-clients-2.8.1.jar!/:?]
        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1230) ~[kafka-clients-2.8.1.jar!/:?]
        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1210) ~[kafka-clients-2.8.1.jar!/:?]
        at org.springframework.integration.kafka.inbound.KafkaMessageSource.doReceive(KafkaMessageSource.java:441) ~[spring-integration-kafka-3.3.1.RELEASE.jar!/:3.3.1.RELEASE]
        at org.springframework.integration.endpoint.AbstractMessageSource.receive(AbstractMessageSource.java:184) ~[spring-integration-core-5.3.2.RELEASE.jar!/:5.3.2.RELEASE]
        at org.springframework.integration.endpoint.SourcePollingChannelAdapter.receiveMessage(SourcePollingChannelAdapter.java:212) ~[spring-integration-core-5.3.2.RELEASE.jar!/:5.3.2.RELEASE]
        at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:407) ~[spring-integration-core-5.3.2.RELEASE.jar!/:5.3.2.RELEASE]
        at org.springframework.integration.endpoint.AbstractPollingEndpoint.pollForMessage(AbstractPollingEndpoint.java:376) ~[spring-integration-core-5.3.2.RELEASE.jar!/:5.3.2.RELEASE]
        at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$null$3(AbstractPollingEndpoint.java:323) ~[spring-integration-core-5.3.2.RELEASE.jar!/:5.3.2.RELEASE]
        at org.springframework.integration.util.ErrorHandlingTaskExecutor.lambda$execute$0(ErrorHandlingTaskExecutor.java:57) ~[spring-integration-core-5.3.2.RELEASE.jar!/:5.3.2.RELEASE]
        at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50) ~[spring-core-5.2.19.RELEASE.jar!/:5.2.19.RELEASE]
        at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:55) ~[spring-integration-core-5.3.2.RELEASE.jar!/:5.3.2.RELEASE]
        at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$createPoller$4(AbstractPollingEndpoint.java:320) ~[spring-integration-core-5.3.2.RELEASE.jar!/:5.3.2.RELEASE]
        at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54) [spring-context-5.2.9.RELEASE.jar!/:5.2.9.RELEASE]
        at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:93) [spring-context-5.2.9.RELEASE.jar!/:5.2.9.RELEASE]
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_311]
        at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_311]
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) [?:1.8.0_311]
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) [?:1.8.0_311]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_311]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_311]
        at java.lang.Thread.run(Thread.java:748) [?:1.8.0_311]

Can someone help me out with this issue. Thanks

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文