ExecutorService 的奇怪行为

发布于 2024-12-13 19:17:30 字数 1067 浏览 1 评论 0原文

我有 5000 个类似的 Callable 任务要在 Executors.newFixedThreadPool(8) 创建的 ExecutorService 的 8 个线程中执行。每个任务都会进入数据库来检索大量数据进行处理。

99% 的时间一切正常,但有时我会在日志文件中看到非常奇怪的执行日志消息,当数据库缓慢或卡住时(不要问为什么),并且 8 个当前正在运行的任务在所有 8 个线程中都已停止且尚未完成,ExecutorService开始提交更多的任务来一一执行!

所以日志显示,在某个时刻,ExecutorService 变得疯狂,开始调用等待队列中越来越多任务的 Callable 的 call() 方法,而不等待前面的任务完成。越来越多的任务向 DB 发送请求,最终导致 DB 崩溃,Java 堆内存耗尽。

看起来 ExecutorService 内部发生了一些奇怪的事情,或者我对情况的理解是错误的。有人见过类似的东西吗?

我的大脑堆栈已经溢出了

。这是来自 Java API 的引用:

Executors.newFixedThreadPool(int nThreads)

创建一个线程池,重用固定数量的线程操作 脱离共享无界队列。在任何时候,最多 nThreads 个线程 将是主动处理任务。如果提交了额外的任务 当所有线程都处于活动状态时,它们将在队列中等待,直到 线程可用。 如果任何线程因失败而终止 在关闭之前执行期间,如果满足以下条件,则新的将取代它: 执行后续任务所需的

我的任务会导致线程死亡,ExecutorService 创建更多线程并向它们提交新的 8 个任务,然后它们死亡,而 ExecutorService 又创建 8 个线程并提交更多 8 个任务,这是否真的会发生?

pss:Callable 的 call() 内的整个操作都被 try catch 包围,因此如果我的操作内发生任何异常,异常将被捕获并记录。这一切都没有发生。该调用被调用并且永远不会返回,而下一个任务被一一调用并且永远不会返回,永远不会完成,也永远不会抛出任何异常。

我怀疑我的任务导致线程池中的线程死亡。怎么可能模仿呢?

I have 5000 similar Callable tasks to be executed in 8 threads of ExecutorService created by Executors.newFixedThreadPool(8). Each task goes to a database to retrieve a lot of data to process.

Everything works fine 99% time, BUT sometimes I see a very strange execution log messages in the log file, when DB is slow or stuck (don't ask why) and 8 currently running tasks are stalled and not finished yet in all 8 threads, ExecutorService starts submitting more tasks to execute one by one!

So the log shows that at some point ExecutorService goes crazy and starts calling call() method of Callable of more and more tasks in the waiting queue without waiting the previous tasks to complete. More and more tasks send requests to DB which finally brings DB to it's knees and Java heap memory becomes exhausted.

It looks like something strange is happening inside ExecutorService or my understanding of the situation is wrong. Has anyone seen anything like that?

My brain stack is overflown

p.s. the is the quote from the Java API:

Executors.newFixedThreadPool(int nThreads)

Creates a thread pool that reuses a fixed number of threads operating
off a shared unbounded queue. At any point, at most nThreads threads
will be active processing tasks. If additional tasks are submitted
when all threads are active, they will wait in the queue until a
thread is available. If any thread terminates due to a failure
during execution prior to shutdown, a new one will take its place if
needed to execute subsequent tasks
.

Could this actually happen that my tasks cause thread to die and ExecutorService creates more threads and submits new 8 tasks to them and they die and ExecutorService creates 8 more threads and submits more 8 tasks?

p.s.s.: the whole operation inside call() of Callable is surrounded with try catch so if any exception is happening inside my operation the exception will be captured and logged. None of this is happening. The call is called and just never returns, while next tasks are beign called one by one and never returning and never finishing and never throwing any exceptions.

I suspect my tasks cause threads in thread pool to die. How is it possible to imitate?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

扭转时空 2024-12-20 19:17:30

我也将尝试猜测:

  1. 您提交了 5000 个任务,其中涉及从数据库获取数据。
  2. 不久之后,您会在所需的行/表上遇到严重的锁争用。也许外部进程正在获取用于写入的独占锁。也许出现了僵局。
  3. 任务一个接一个地阻塞,等待共享/读锁被授予。
  4. 看起来好像所有 8 个线程都被挂起,等待 I/O。
  5. 不久之后,数据库/数据库驱动程序注意到任务等待共享锁的时间太长。它概括地按顺序向任务分发Lock Wait Timeout 异常。
  6. 因此,任务一个接一个地从队列中失败,等待的任务被推入执行,但又再次失败。

请注意,任务中的异常不会停止 ExecutorService。它只会将该任务标记为已完成并继续。

请参阅此示例:

public class Foo {

    static class Task implements Callable<String> {
        private static AtomicInteger i = new AtomicInteger(1);

        public String call() throws Exception {
            i.incrementAndGet();
            if (i.get() % 2 != 0) {
                throw new RuntimeException("That's odd, I failed.");
            }
            return "I'm done";
        }
    }

    public static void main(String[] args) throws Exception {
        ExecutorService es = Executors.newFixedThreadPool(2);
        List<Future<String>> futures = new ArrayList<Future<String>>();
        for (int i = 0; i < 5; i++) {
            futures.add(es.submit(new Task()));
        }
        for (Future<String> future : futures) {
            try {
                System.out.println(future.get());
            } catch (ExecutionException ee) {
                System.err.println(ee.getCause());
            }
        }
        es.shutdown();
    }
}

可能的输出:

I'm done
I'm done
I'm done
java.lang.RuntimeException: That's odd, I failed.
java.lang.RuntimeException: That's odd, I failed.

I'll take a shot at guessing as well:

  1. You submit 5000 tasks which involve fetching data from a database.
  2. Soon after, you encounter heavy lock contention on required rows/tables. Maybe external processes are acquiring exclusive locks for writing. Maybe there is a deadlock.
  3. One after another, the tasks block, waiting for a shared/read lock to be granted.
  4. It appears as if all 8 threads are suspended, waiting on I/O.
  5. Soon after, the database/DB driver notices that tasks have been waiting too long for a shared lock. It summarily hands out Lock Wait Timeout exceptions to tasks, in order.
  6. Thus, one after another, tasks fail out of the queue and waiting tasks are pushed into execution, only to fail again.

Do note, that an exception in a Task, will not stop the ExecutorService. It'll just mark that task as done and continue.

See this example:

public class Foo {

    static class Task implements Callable<String> {
        private static AtomicInteger i = new AtomicInteger(1);

        public String call() throws Exception {
            i.incrementAndGet();
            if (i.get() % 2 != 0) {
                throw new RuntimeException("That's odd, I failed.");
            }
            return "I'm done";
        }
    }

    public static void main(String[] args) throws Exception {
        ExecutorService es = Executors.newFixedThreadPool(2);
        List<Future<String>> futures = new ArrayList<Future<String>>();
        for (int i = 0; i < 5; i++) {
            futures.add(es.submit(new Task()));
        }
        for (Future<String> future : futures) {
            try {
                System.out.println(future.get());
            } catch (ExecutionException ee) {
                System.err.println(ee.getCause());
            }
        }
        es.shutdown();
    }
}

Possible output:

I'm done
I'm done
I'm done
java.lang.RuntimeException: That's odd, I failed.
java.lang.RuntimeException: That's odd, I failed.
冬天旳寂寞 2024-12-20 19:17:30

这只是一个猜测,(考虑到问题中缺少代码,我认为猜测是值得的):

ExecutorService.invokeAll(Collection>tasks) 将继续如果当前任务抛出异常,则转移到其他任务。 (您使用的是 invokeAll() 吗?我认为 submit(Callabletask) 具有相同的行为,但 javadoc 中并不清楚)

您可以检查一下,以防这些“卡住”任务变成Future.isDone() 在后续任务开始运行之前?可能会引发异常,但在日志中看不到......

来自 javadoc:

请注意,已完成的任务可以正常终止,也可以通过以下方式终止:
抛出异常。

http ://download.oracle.com/javase/6/docs/api/java/util/concurrent/ExecutorService.html#invokeAll(java.util.Collection%29

如果这个这种情况,您可以捕获并记录 Callable.call() 方法定义中的所有异常

This is just a guess, (I think a guess is merited, given the lack of code in the question):

ExecutorService.invokeAll(Collection<? extends Callable<T>> tasks) will move on to other tasks if current tasks throw an exception. (are you using invokeAll()? I think submit(Callable<T> task) has the same behaviour, but it isn't clear from the javadoc)

Can you check incase those 'stuck' tasks become Future.isDone() before the subsequent tasks start running? Potentially exceptions are being thrown and not seen in the logs...

From the javadoc:

Note that a completed task could have terminated either normally or by
throwing an exception.

http://download.oracle.com/javase/6/docs/api/java/util/concurrent/ExecutorService.html#invokeAll(java.util.Collection%29

If this is the case, you could just catch & log all exceptions inside your Callable.call() method definition.

HTH

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文