Java ThreadPoolExecutor 在使用 ArrayBlockingQueue 时卡住

发布于 2024-09-05 20:17:27 字数 8361 浏览 4 评论 0 原文

我正在开发一些应用程序并使用 ThreadPoolExecutor 来处理各种任务。 ThreadPoolExecutor 在持续一段时间后就会卡住。为了在更简单的环境中模拟这一点,我编写了一个简单的代码,可以在其中模拟该问题。

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class MyThreadPoolExecutor {
    private int poolSize = 10;
    private int maxPoolSize = 50;
    private long keepAliveTime = 10;
    private ThreadPoolExecutor threadPool = null;
    private final ArrayBlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(
            100000);

    public MyThreadPoolExecutor() {
        threadPool = new ThreadPoolExecutor(poolSize, maxPoolSize,
                keepAliveTime, TimeUnit.SECONDS, queue);
        threadPool.setRejectedExecutionHandler(new RejectedExecutionHandler() {

            @Override
            public void rejectedExecution(Runnable runnable,
                    ThreadPoolExecutor threadPoolExecutor) {
                System.out
                        .println("Execution rejected. Please try restarting the application.");
            }

        });
    }

    public void runTask(Runnable task) {
        threadPool.execute(task);
    }

    public void shutDown() {
        threadPool.shutdownNow();
    }
    public ThreadPoolExecutor getThreadPool() {
        return threadPool;
    }

    public void setThreadPool(ThreadPoolExecutor threadPool) {
        this.threadPool = threadPool;
    }

    public static void main(String[] args) {
        MyThreadPoolExecutor mtpe = new MyThreadPoolExecutor();
        for (int i = 0; i < 1000; i++) {
            final int j = i;
            mtpe.runTask(new Runnable() {

                @Override
                public void run() {
                    System.out.println(j);
                }

            });
        }
    }
}

尝试执行此代码几次。它通常会在控制台上打印出数字,当所有线程结束时,它就存在。但有时,它完成了所有任务,然后没有被终止。线程转储如下:

MyThreadPoolExecutor [Java Application] 
    MyThreadPoolExecutor at localhost:2619 (Suspended)  
        Daemon System Thread [Attach Listener] (Suspended)  
        Daemon System Thread [Signal Dispatcher] (Suspended)    
        Daemon System Thread [Finalizer] (Suspended)    
            Object.wait(long) line: not available [native method]   
            ReferenceQueue<T>.remove(long) line: not available    
            ReferenceQueue<T>.remove() line: not available    
            Finalizer$FinalizerThread.run() line: not available 
        Daemon System Thread [Reference Handler] (Suspended)    
            Object.wait(long) line: not available [native method]   
            Reference$Lock(Object).wait() line: 485 
            Reference$ReferenceHandler.run() line: not available    
        Thread [pool-1-thread-1] (Suspended)    
            Unsafe.park(boolean, long) line: not available [native method]  
            LockSupport.park(Object) line: not available    
            AbstractQueuedSynchronizer$ConditionObject.await() line: not available  
            ArrayBlockingQueue<E>.take() line: not available  
            ThreadPoolExecutor.getTask() line: not available    
            ThreadPoolExecutor$Worker.run() line: not available 
            Thread.run() line: not available    
        Thread [pool-1-thread-2] (Suspended)    
            Unsafe.park(boolean, long) line: not available [native method]  
            LockSupport.park(Object) line: not available    
            AbstractQueuedSynchronizer$ConditionObject.await() line: not available  
            ArrayBlockingQueue<E>.take() line: not available  
            ThreadPoolExecutor.getTask() line: not available    
            ThreadPoolExecutor$Worker.run() line: not available 
            Thread.run() line: not available    
        Thread [pool-1-thread-3] (Suspended)    
            Unsafe.park(boolean, long) line: not available [native method]  
            LockSupport.park(Object) line: not available    
            AbstractQueuedSynchronizer$ConditionObject.await() line: not available  
            ArrayBlockingQueue<E>.take() line: not available  
            ThreadPoolExecutor.getTask() line: not available    
            ThreadPoolExecutor$Worker.run() line: not available 
            Thread.run() line: not available    
        Thread [pool-1-thread-4] (Suspended)    
            Unsafe.park(boolean, long) line: not available [native method]  
            LockSupport.park(Object) line: not available    
            AbstractQueuedSynchronizer$ConditionObject.await() line: not available  
            ArrayBlockingQueue<E>.take() line: not available  
            ThreadPoolExecutor.getTask() line: not available    
            ThreadPoolExecutor$Worker.run() line: not available 
            Thread.run() line: not available    
        Thread [pool-1-thread-6] (Suspended)    
            Unsafe.park(boolean, long) line: not available [native method]  
            LockSupport.park(Object) line: not available    
            AbstractQueuedSynchronizer$ConditionObject.await() line: not available  
            ArrayBlockingQueue<E>.take() line: not available  
            ThreadPoolExecutor.getTask() line: not available    
            ThreadPoolExecutor$Worker.run() line: not available 
            Thread.run() line: not available    
        Thread [pool-1-thread-8] (Suspended)    
            Unsafe.park(boolean, long) line: not available [native method]  
            LockSupport.park(Object) line: not available    
            AbstractQueuedSynchronizer$ConditionObject.await() line: not available  
            ArrayBlockingQueue<E>.take() line: not available  
            ThreadPoolExecutor.getTask() line: not available    
            ThreadPoolExecutor$Worker.run() line: not available 
            Thread.run() line: not available    
        Thread [pool-1-thread-5] (Suspended)    
            Unsafe.park(boolean, long) line: not available [native method]  
            LockSupport.park(Object) line: not available    
            AbstractQueuedSynchronizer$ConditionObject.await() line: not available  
            ArrayBlockingQueue<E>.take() line: not available  
            ThreadPoolExecutor.getTask() line: not available    
            ThreadPoolExecutor$Worker.run() line: not available 
            Thread.run() line: not available    
        Thread [pool-1-thread-10] (Suspended)   
            Unsafe.park(boolean, long) line: not available [native method]  
            LockSupport.park(Object) line: not available    
            AbstractQueuedSynchronizer$ConditionObject.await() line: not available  
            ArrayBlockingQueue<E>.take() line: not available  
            ThreadPoolExecutor.getTask() line: not available    
            ThreadPoolExecutor$Worker.run() line: not available 
            Thread.run() line: not available    
        Thread [pool-1-thread-9] (Suspended)    
            Unsafe.park(boolean, long) line: not available [native method]  
            LockSupport.park(Object) line: not available    
            AbstractQueuedSynchronizer$ConditionObject.await() line: not available  
            ArrayBlockingQueue<E>.take() line: not available  
            ThreadPoolExecutor.getTask() line: not available    
            ThreadPoolExecutor$Worker.run() line: not available 
            Thread.run() line: not available    
        Thread [pool-1-thread-7] (Suspended)    
            Unsafe.park(boolean, long) line: not available [native method]  
            LockSupport.park(Object) line: not available    
            AbstractQueuedSynchronizer$ConditionObject.await() line: not available  
            ArrayBlockingQueue<E>.take() line: not available  
            ThreadPoolExecutor.getTask() line: not available    
            ThreadPoolExecutor$Worker.run() line: not available 
            Thread.run() line: not available    
        Thread [DestroyJavaVM] (Suspended)  
    C:\Program Files\Java\jre1.6.0_07\bin\javaw.exe (Jun 17, 2010 10:42:33 AM)  

在我的实际应用程序中,ThreadPoolExecutor 线程进入这种状态,然后停止响应。

问候, 拉维·拉奥

I'm working on some application and using ThreadPoolExecutor for handling various tasks. ThreadPoolExecutor is getting stuck after some duration. To simulate this in a simpler environment, I've written a simple code where I'm able to simulate the issue.

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class MyThreadPoolExecutor {
    private int poolSize = 10;
    private int maxPoolSize = 50;
    private long keepAliveTime = 10;
    private ThreadPoolExecutor threadPool = null;
    private final ArrayBlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(
            100000);

    public MyThreadPoolExecutor() {
        threadPool = new ThreadPoolExecutor(poolSize, maxPoolSize,
                keepAliveTime, TimeUnit.SECONDS, queue);
        threadPool.setRejectedExecutionHandler(new RejectedExecutionHandler() {

            @Override
            public void rejectedExecution(Runnable runnable,
                    ThreadPoolExecutor threadPoolExecutor) {
                System.out
                        .println("Execution rejected. Please try restarting the application.");
            }

        });
    }

    public void runTask(Runnable task) {
        threadPool.execute(task);
    }

    public void shutDown() {
        threadPool.shutdownNow();
    }
    public ThreadPoolExecutor getThreadPool() {
        return threadPool;
    }

    public void setThreadPool(ThreadPoolExecutor threadPool) {
        this.threadPool = threadPool;
    }

    public static void main(String[] args) {
        MyThreadPoolExecutor mtpe = new MyThreadPoolExecutor();
        for (int i = 0; i < 1000; i++) {
            final int j = i;
            mtpe.runTask(new Runnable() {

                @Override
                public void run() {
                    System.out.println(j);
                }

            });
        }
    }
}

Try executing this code a few times. It normally print outs the number on console and when all threads end, it exists. But at times, it finished all task and then is not getting terminated. The thread dump is as follows:

MyThreadPoolExecutor [Java Application] 
    MyThreadPoolExecutor at localhost:2619 (Suspended)  
        Daemon System Thread [Attach Listener] (Suspended)  
        Daemon System Thread [Signal Dispatcher] (Suspended)    
        Daemon System Thread [Finalizer] (Suspended)    
            Object.wait(long) line: not available [native method]   
            ReferenceQueue<T>.remove(long) line: not available    
            ReferenceQueue<T>.remove() line: not available    
            Finalizer$FinalizerThread.run() line: not available 
        Daemon System Thread [Reference Handler] (Suspended)    
            Object.wait(long) line: not available [native method]   
            Reference$Lock(Object).wait() line: 485 
            Reference$ReferenceHandler.run() line: not available    
        Thread [pool-1-thread-1] (Suspended)    
            Unsafe.park(boolean, long) line: not available [native method]  
            LockSupport.park(Object) line: not available    
            AbstractQueuedSynchronizer$ConditionObject.await() line: not available  
            ArrayBlockingQueue<E>.take() line: not available  
            ThreadPoolExecutor.getTask() line: not available    
            ThreadPoolExecutor$Worker.run() line: not available 
            Thread.run() line: not available    
        Thread [pool-1-thread-2] (Suspended)    
            Unsafe.park(boolean, long) line: not available [native method]  
            LockSupport.park(Object) line: not available    
            AbstractQueuedSynchronizer$ConditionObject.await() line: not available  
            ArrayBlockingQueue<E>.take() line: not available  
            ThreadPoolExecutor.getTask() line: not available    
            ThreadPoolExecutor$Worker.run() line: not available 
            Thread.run() line: not available    
        Thread [pool-1-thread-3] (Suspended)    
            Unsafe.park(boolean, long) line: not available [native method]  
            LockSupport.park(Object) line: not available    
            AbstractQueuedSynchronizer$ConditionObject.await() line: not available  
            ArrayBlockingQueue<E>.take() line: not available  
            ThreadPoolExecutor.getTask() line: not available    
            ThreadPoolExecutor$Worker.run() line: not available 
            Thread.run() line: not available    
        Thread [pool-1-thread-4] (Suspended)    
            Unsafe.park(boolean, long) line: not available [native method]  
            LockSupport.park(Object) line: not available    
            AbstractQueuedSynchronizer$ConditionObject.await() line: not available  
            ArrayBlockingQueue<E>.take() line: not available  
            ThreadPoolExecutor.getTask() line: not available    
            ThreadPoolExecutor$Worker.run() line: not available 
            Thread.run() line: not available    
        Thread [pool-1-thread-6] (Suspended)    
            Unsafe.park(boolean, long) line: not available [native method]  
            LockSupport.park(Object) line: not available    
            AbstractQueuedSynchronizer$ConditionObject.await() line: not available  
            ArrayBlockingQueue<E>.take() line: not available  
            ThreadPoolExecutor.getTask() line: not available    
            ThreadPoolExecutor$Worker.run() line: not available 
            Thread.run() line: not available    
        Thread [pool-1-thread-8] (Suspended)    
            Unsafe.park(boolean, long) line: not available [native method]  
            LockSupport.park(Object) line: not available    
            AbstractQueuedSynchronizer$ConditionObject.await() line: not available  
            ArrayBlockingQueue<E>.take() line: not available  
            ThreadPoolExecutor.getTask() line: not available    
            ThreadPoolExecutor$Worker.run() line: not available 
            Thread.run() line: not available    
        Thread [pool-1-thread-5] (Suspended)    
            Unsafe.park(boolean, long) line: not available [native method]  
            LockSupport.park(Object) line: not available    
            AbstractQueuedSynchronizer$ConditionObject.await() line: not available  
            ArrayBlockingQueue<E>.take() line: not available  
            ThreadPoolExecutor.getTask() line: not available    
            ThreadPoolExecutor$Worker.run() line: not available 
            Thread.run() line: not available    
        Thread [pool-1-thread-10] (Suspended)   
            Unsafe.park(boolean, long) line: not available [native method]  
            LockSupport.park(Object) line: not available    
            AbstractQueuedSynchronizer$ConditionObject.await() line: not available  
            ArrayBlockingQueue<E>.take() line: not available  
            ThreadPoolExecutor.getTask() line: not available    
            ThreadPoolExecutor$Worker.run() line: not available 
            Thread.run() line: not available    
        Thread [pool-1-thread-9] (Suspended)    
            Unsafe.park(boolean, long) line: not available [native method]  
            LockSupport.park(Object) line: not available    
            AbstractQueuedSynchronizer$ConditionObject.await() line: not available  
            ArrayBlockingQueue<E>.take() line: not available  
            ThreadPoolExecutor.getTask() line: not available    
            ThreadPoolExecutor$Worker.run() line: not available 
            Thread.run() line: not available    
        Thread [pool-1-thread-7] (Suspended)    
            Unsafe.park(boolean, long) line: not available [native method]  
            LockSupport.park(Object) line: not available    
            AbstractQueuedSynchronizer$ConditionObject.await() line: not available  
            ArrayBlockingQueue<E>.take() line: not available  
            ThreadPoolExecutor.getTask() line: not available    
            ThreadPoolExecutor$Worker.run() line: not available 
            Thread.run() line: not available    
        Thread [DestroyJavaVM] (Suspended)  
    C:\Program Files\Java\jre1.6.0_07\bin\javaw.exe (Jun 17, 2010 10:42:33 AM)  

In my actual application,ThreadPoolExecutor threads go in this state and then it stops responding.

Regards,
Ravi Rao

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

黯淡〆 2024-09-12 20:17:27

在您的 main 方法中,您永远不会调用 mtpe.shutdown()ThreadPoolExecutor 将尝试保留其corePoolSize 无限期地存活。有时,您很幸运,有多个 corePoolSize 线程处于活动状态,因此每个工作线程都会进入条件逻辑分支,允许其在指定的 10 秒超时期限后终止。但是,正如您所注意到的,有时情况并非如此,因此执行器中的每个线程都会阻塞 ArrayBlockingQueue.take() 并等待新任务。

另外,请注意, ExecutorService.shutdown()ExecutorService.shutdownNow()。如果您按照包装器实现的指示调用 ExecutorService.shutdownNow(),您有时会删除一些尚未分配执行的任务。

更新:自从我最初的回答以来,ThreadPoolExecutor实现已经改变,使得原始帖子中的程序永远不应该退出。

In your main method, you never call mtpe.shutdown(). A ThreadPoolExecutor will attempt to keep its corePoolSize alive indefinitely. Sometimes, you get lucky and you have more than corePoolSize threads alive, so every worker thread will go into a conditional logic branch that allows it to terminate after your specified timeout period of 10 seconds. However, as you have noticed, sometimes this is not the case so every thread in the executor will block on ArrayBlockingQueue.take() and wait for a new task.

Also, please note, there is a significant difference between ExecutorService.shutdown() and ExecutorService.shutdownNow(). If you call ExecutorService.shutdownNow() as your wrapper implementation indicates, you will on occasion drop some tasks which have not been assigned for execution.

Update: Since my original answer, the ThreadPoolExecutor implementation has changed such that the program in the original post should never exit.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文