如何让ThreadPoolExecutor的submit()方法在饱和时阻塞?

发布于 2024-08-17 03:20:55 字数 185 浏览 7 评论 0原文

我想创建一个 ThreadPoolExecutor,这样当它达到最大大小并且队列已满时,尝试时 submit() 方法会阻塞添加新任务。我是否需要为此实现自定义 RejectedExecutionHandler 或者是否有现有方法可以使用标准 Java 库来实现此目的?

I want to create a ThreadPoolExecutor such that when it has reached its maximum size and the queue is full, the submit() method blocks when trying to add new tasks. Do I need to implement a custom RejectedExecutionHandler for that or is there an existing way to do this using a standard Java library?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(17

暖树树初阳… 2024-08-24 03:20:56

我过去也有同样的需求:一种由共享线程池支持的每个客户端具有固定大小的阻塞队列。我最终编写了自己的 ThreadPoolExecutor:

UserThreadPoolExecutor
(阻塞队列(每个客户端)+线程池(在所有客户端之间共享))

请参阅:https://github.com/d4rxh4wx /UserThreadPoolExecutor

每个 UserThreadPoolExecutor 都被赋予共享 ThreadPoolExecutor 的最大线程数

。每个 UserThreadPoolExecutor 可以:

  • 如果未达到配额,则向共享线程池执行器提交任务。如果达到其配额,则作业将排队(非消耗性阻塞等待 CPU)。一旦其中一个提交的任务完成,配额就会递减,从而允许另一个等待提交到 ThreadPoolExecutor 的任务
  • 等待剩余任务完成

I had the same need in the past: a kind of blocking queue with a fixed size for each client backed by a shared thread pool. I ended up writing my own kind of ThreadPoolExecutor:

UserThreadPoolExecutor
(blocking queue (per client) + threadpool (shared amongst all clients))

See: https://github.com/d4rxh4wx/UserThreadPoolExecutor

Each UserThreadPoolExecutor is given a maximum number of threads from a shared ThreadPoolExecutor

Each UserThreadPoolExecutor can:

  • submit a task to the shared thread pool executor if its quota is not reached. If its quota is reached, the job is queued (non-consumptive blocking waiting for CPU). Once one of its submitted task is completed, the quota is decremented, allowing another task waiting to be submitted to the ThreadPoolExecutor
  • wait for the remaining tasks to complete
大姐,你呐 2024-08-24 03:20:56

我最近需要实现类似的东西,但是是在 ScheduledExecutorService 上。

我还必须确保处理在方法上传递的延迟,并确保任务要么按照调用者期望的时间提交执行,要么失败,从而引发 RejectedExecutionException

ScheduledThreadPoolExecutor 中用于执行或提交任务的其他方法在内部调用 #schedule,后者仍会依次调用被重写的方法。

import java.util.concurrent.*;

public class BlockingScheduler extends ScheduledThreadPoolExecutor {
    private final Semaphore maxQueueSize;

    public BlockingScheduler(int corePoolSize,
                             ThreadFactory threadFactory,
                             int maxQueueSize) {
        super(corePoolSize, threadFactory, new AbortPolicy());
        this.maxQueueSize = new Semaphore(maxQueueSize);
    }

    @Override
    public ScheduledFuture<?> schedule(Runnable command,
                                       long delay,
                                       TimeUnit unit) {
        final long newDelayInMs = beforeSchedule(command, unit.toMillis(delay));
        return super.schedule(command, newDelayInMs, TimeUnit.MILLISECONDS);
    }

    @Override
    public <V> ScheduledFuture<V> schedule(Callable<V> callable,
                                           long delay,
                                           TimeUnit unit) {
        final long newDelayInMs = beforeSchedule(callable, unit.toMillis(delay));
        return super.schedule(callable, newDelayInMs, TimeUnit.MILLISECONDS);
    }

    @Override
    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit) {
        final long newDelayInMs = beforeSchedule(command, unit.toMillis(initialDelay));
        return super.scheduleAtFixedRate(command, newDelayInMs, unit.toMillis(period), TimeUnit.MILLISECONDS);
    }

    @Override
    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                     long initialDelay,
                                                     long period,
                                                     TimeUnit unit) {
        final long newDelayInMs = beforeSchedule(command, unit.toMillis(initialDelay));
        return super.scheduleWithFixedDelay(command, newDelayInMs, unit.toMillis(period), TimeUnit.MILLISECONDS);
    }

    @Override
    protected void afterExecute(Runnable runnable, Throwable t) {
        super.afterExecute(runnable, t);
        try {
            if (t == null && runnable instanceof Future<?>) {
                try {
                    ((Future<?>) runnable).get();
                } catch (CancellationException | ExecutionException e) {
                    t = e;
                } catch (InterruptedException ie) {
                    Thread.currentThread().interrupt(); // ignore/reset
                }
            }
            if (t != null) {
                System.err.println(t);
            }
        } finally {
            releaseQueueUsage();
        }
    }

    private long beforeSchedule(Runnable runnable, long delay) {
        try {
            return getQueuePermitAndModifiedDelay(delay);
        } catch (InterruptedException e) {
            getRejectedExecutionHandler().rejectedExecution(runnable, this);
            return 0;
        }
    }

    private long beforeSchedule(Callable callable, long delay) {
        try {
            return getQueuePermitAndModifiedDelay(delay);
        } catch (InterruptedException e) {
            getRejectedExecutionHandler().rejectedExecution(new FutureTask(callable), this);
            return 0;
        }
    }

    private long getQueuePermitAndModifiedDelay(long delay) throws InterruptedException {
        final long beforeAcquireTimeStamp = System.currentTimeMillis();
        maxQueueSize.tryAcquire(delay, TimeUnit.MILLISECONDS);
        final long afterAcquireTimeStamp = System.currentTimeMillis();
        return afterAcquireTimeStamp - beforeAcquireTimeStamp;
    }

    private void releaseQueueUsage() {
        maxQueueSize.release();
    }
}

我这里有代码,将不胜感激任何反馈。
https://github.com/AmitabhAwasthi/BlockingScheduler

I recently had a need to achieve something similar, but on a ScheduledExecutorService.

I had to also ensure that I handle the delay being passed on the method and ensure that either the task is submitted to execute at the time as the caller expects or just fails thus throwing a RejectedExecutionException.

Other methods from ScheduledThreadPoolExecutor to execute or submit a task internally call #schedule which will still in turn invoke the methods overridden.

import java.util.concurrent.*;

public class BlockingScheduler extends ScheduledThreadPoolExecutor {
    private final Semaphore maxQueueSize;

    public BlockingScheduler(int corePoolSize,
                             ThreadFactory threadFactory,
                             int maxQueueSize) {
        super(corePoolSize, threadFactory, new AbortPolicy());
        this.maxQueueSize = new Semaphore(maxQueueSize);
    }

    @Override
    public ScheduledFuture<?> schedule(Runnable command,
                                       long delay,
                                       TimeUnit unit) {
        final long newDelayInMs = beforeSchedule(command, unit.toMillis(delay));
        return super.schedule(command, newDelayInMs, TimeUnit.MILLISECONDS);
    }

    @Override
    public <V> ScheduledFuture<V> schedule(Callable<V> callable,
                                           long delay,
                                           TimeUnit unit) {
        final long newDelayInMs = beforeSchedule(callable, unit.toMillis(delay));
        return super.schedule(callable, newDelayInMs, TimeUnit.MILLISECONDS);
    }

    @Override
    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit) {
        final long newDelayInMs = beforeSchedule(command, unit.toMillis(initialDelay));
        return super.scheduleAtFixedRate(command, newDelayInMs, unit.toMillis(period), TimeUnit.MILLISECONDS);
    }

    @Override
    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                     long initialDelay,
                                                     long period,
                                                     TimeUnit unit) {
        final long newDelayInMs = beforeSchedule(command, unit.toMillis(initialDelay));
        return super.scheduleWithFixedDelay(command, newDelayInMs, unit.toMillis(period), TimeUnit.MILLISECONDS);
    }

    @Override
    protected void afterExecute(Runnable runnable, Throwable t) {
        super.afterExecute(runnable, t);
        try {
            if (t == null && runnable instanceof Future<?>) {
                try {
                    ((Future<?>) runnable).get();
                } catch (CancellationException | ExecutionException e) {
                    t = e;
                } catch (InterruptedException ie) {
                    Thread.currentThread().interrupt(); // ignore/reset
                }
            }
            if (t != null) {
                System.err.println(t);
            }
        } finally {
            releaseQueueUsage();
        }
    }

    private long beforeSchedule(Runnable runnable, long delay) {
        try {
            return getQueuePermitAndModifiedDelay(delay);
        } catch (InterruptedException e) {
            getRejectedExecutionHandler().rejectedExecution(runnable, this);
            return 0;
        }
    }

    private long beforeSchedule(Callable callable, long delay) {
        try {
            return getQueuePermitAndModifiedDelay(delay);
        } catch (InterruptedException e) {
            getRejectedExecutionHandler().rejectedExecution(new FutureTask(callable), this);
            return 0;
        }
    }

    private long getQueuePermitAndModifiedDelay(long delay) throws InterruptedException {
        final long beforeAcquireTimeStamp = System.currentTimeMillis();
        maxQueueSize.tryAcquire(delay, TimeUnit.MILLISECONDS);
        final long afterAcquireTimeStamp = System.currentTimeMillis();
        return afterAcquireTimeStamp - beforeAcquireTimeStamp;
    }

    private void releaseQueueUsage() {
        maxQueueSize.release();
    }
}

I have the code here, will appreciate any feedback.
https://github.com/AmitabhAwasthi/BlockingScheduler

踏月而来 2024-08-24 03:20:56

我并不总是喜欢 CallerRunsPolicy,特别是因为它允许被拒绝的任务“跳过队列”并在之前提交的任务之前执行。此外,在调用线程上执行任务可能比等待第一个槽变得可用花费更长的时间。

我使用自定义的 RejectedExecutionHandler 解决了这个问题,它只是阻塞调用线程一段时间,然后尝试再次提交任务:

public class BlockWhenQueueFull implements RejectedExecutionHandler {

    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {

        // The pool is full. Wait, then try again.
        try {
            long waitMs = 250;
            Thread.sleep(waitMs);
        } catch (InterruptedException interruptedException) {}

        executor.execute(r);
    }
}

这个类可以像任何其他类一样在线程池执行器中用作 RejectedExecutinHandler ,例如:

executorPool = new ThreadPoolExecutor(1, 1, 10,
                                      TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
                                      new BlockWhenQueueFull());

我看到的唯一缺点是调用线程的锁定时间可能比严格必要的时间稍长(最多 250 毫秒)。此外,由于该执行程序实际上是递归调用的,因此等待线程变得可用(数小时)很长时间可能会导致堆栈溢出。

尽管如此,我个人还是喜欢这种方法。它结构紧凑、易于理解并且运行良好。

I don't always like the CallerRunsPolicy, especially since it allows the rejected task to 'skip the queue' and get executed before tasks that were submitted earlier. Moreover, executing the task on the calling thread might take much longer than waiting for the first slot to become available.

I solved this problem using a custom RejectedExecutionHandler, which simply blocks the calling thread for a little while and then tries to submit the task again:

public class BlockWhenQueueFull implements RejectedExecutionHandler {

    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {

        // The pool is full. Wait, then try again.
        try {
            long waitMs = 250;
            Thread.sleep(waitMs);
        } catch (InterruptedException interruptedException) {}

        executor.execute(r);
    }
}

This class can just be used in the thread-pool executor as a RejectedExecutinHandler like any other, for example:

executorPool = new ThreadPoolExecutor(1, 1, 10,
                                      TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
                                      new BlockWhenQueueFull());

The only downside I see is that the calling thread might get locked slightly longer than strictly necessary (up to 250ms). Moreover, since this executor is effectively being called recursively, very long waits for a thread to become available (hours) might result in a stack overflow.

Nevertheless, I personally like this method. It's compact, easy to understand, and works well.

半夏半凉 2024-08-24 03:20:56
  1. CallerRunsPolicy 在这里不是最佳选择。它将阻塞调用者线程,直到完成被拒绝的任务。

来自文档:

运行被拒绝任务的被拒绝任务的处理程序
直接在{@codeexecute}方法的调用线程中,
除非执行器已关闭,在这种情况下任务
被丢弃。

代码示例:

var executor = new ThreadPoolExecutor(2, 5, 1, TimeUnit.SECONDS, blockingQueue, new CallerRunsPolicy());
  1. 使用 CallerBlocksPolicy 来自 弹簧集成。它将阻塞调用者线程,直到队列准备好接收新任务。

来自文档:

一个{@link RejectedExecutionHandler},它会阻止调用者直到
执行器的队列中有空间,或者发生超时(其中
如果抛出 {@link RejectedExecutionException}。

代码示例:

var executor = new ThreadPoolExecutor(2, 5, 1, TimeUnit.SECONDS, blockingQueue, new CallerBlocksPolicy(99999));
  1. CallerRunsPolicy is not the best choice here. It will block caller thread until completion of the rejected task.

From docs:

A handler for rejected tasks that runs the rejected task
directly in the calling thread of the {@code execute} method,
unless the executor has been shut down, in which case the task
is discarded.

Code Example:

var executor = new ThreadPoolExecutor(2, 5, 1, TimeUnit.SECONDS, blockingQueue, new CallerRunsPolicy());
  1. Use CallerBlocksPolicy from spring-integration. It will block caller thread until queue is ready to receive new task.

From docs:

A {@link RejectedExecutionHandler} that blocks the caller until
the executor has room in its queue, or a timeout occurs (in which
case a {@link RejectedExecutionException} is thrown.

Code Example:

var executor = new ThreadPoolExecutor(2, 5, 1, TimeUnit.SECONDS, blockingQueue, new CallerBlocksPolicy(99999));
秋风の叶未落 2024-08-24 03:20:56

我在弹性搜索客户端中发现了这个拒绝策略。它阻塞阻塞队列上的调用者线程。代码如下-

 static class ForceQueuePolicy implements XRejectedExecutionHandler 
 {
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) 
        {
            try 
            {
                executor.getQueue().put(r);
            } 
            catch (InterruptedException e) 
            {
                //should never happen since we never wait
                throw new EsRejectedExecutionException(e);
            }
        }

        @Override
        public long rejected() 
        {
            return 0;
        }
}

I found this rejection policy in elastic search client. It blocks caller thread on blocking queue. Code below-

 static class ForceQueuePolicy implements XRejectedExecutionHandler 
 {
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) 
        {
            try 
            {
                executor.getQueue().put(r);
            } 
            catch (InterruptedException e) 
            {
                //should never happen since we never wait
                throw new EsRejectedExecutionException(e);
            }
        }

        @Override
        public long rejected() 
        {
            return 0;
        }
}
甜妞爱困 2024-08-24 03:20:55

我刚刚找到的可能解决方案之一:

public class BoundedExecutor {
    private final Executor exec;
    private final Semaphore semaphore;

    public BoundedExecutor(Executor exec, int bound) {
        this.exec = exec;
        this.semaphore = new Semaphore(bound);
    }

    public void submitTask(final Runnable command)
            throws InterruptedException, RejectedExecutionException {
        semaphore.acquire();
        try {
            exec.execute(new Runnable() {
                public void run() {
                    try {
                        command.run();
                    } finally {
                        semaphore.release();
                    }
                }
            });
        } catch (RejectedExecutionException e) {
            semaphore.release();
            throw e;
        }
    }
}

还有其他解决方案吗?我更喜欢基于 RejectedExecutionHandler 的东西,因为它似乎是处理此类情况的标准方法。

One of the possible solutions I've just found:

public class BoundedExecutor {
    private final Executor exec;
    private final Semaphore semaphore;

    public BoundedExecutor(Executor exec, int bound) {
        this.exec = exec;
        this.semaphore = new Semaphore(bound);
    }

    public void submitTask(final Runnable command)
            throws InterruptedException, RejectedExecutionException {
        semaphore.acquire();
        try {
            exec.execute(new Runnable() {
                public void run() {
                    try {
                        command.run();
                    } finally {
                        semaphore.release();
                    }
                }
            });
        } catch (RejectedExecutionException e) {
            semaphore.release();
            throw e;
        }
    }
}

Are there any other solutions? I'd prefer something based on RejectedExecutionHandler since it seems like a standard way to handle such situations.

去了角落 2024-08-24 03:20:55

您可以使用 ThreadPoolExecutor 和阻塞队列:

public class ImageManager {
    BlockingQueue<Runnable> blockingQueue = new ArrayBlockingQueue<Runnable>(blockQueueSize);
    RejectedExecutionHandler rejectedExecutionHandler = new ThreadPoolExecutor.CallerRunsPolicy();
    private ExecutorService executorService =  new ThreadPoolExecutor(numOfThread, numOfThread, 
        0L, TimeUnit.MILLISECONDS, blockingQueue, rejectedExecutionHandler);

    private int downloadThumbnail(String fileListPath){
        executorService.submit(new yourRunnable());
    }
}

You can use ThreadPoolExecutor and a blockingQueue:

public class ImageManager {
    BlockingQueue<Runnable> blockingQueue = new ArrayBlockingQueue<Runnable>(blockQueueSize);
    RejectedExecutionHandler rejectedExecutionHandler = new ThreadPoolExecutor.CallerRunsPolicy();
    private ExecutorService executorService =  new ThreadPoolExecutor(numOfThread, numOfThread, 
        0L, TimeUnit.MILLISECONDS, blockingQueue, rejectedExecutionHandler);

    private int downloadThumbnail(String fileListPath){
        executorService.submit(new yourRunnable());
    }
}
马蹄踏│碎落叶 2024-08-24 03:20:55

您应该使用 CallerRunsPolicy,它在调用线程中执行被拒绝的任务。这样,在该任务完成之前,它无法向执行程序提交任何新任务,此时将有一些空闲池线程或者该过程将重复。

http://java.sun .com/j2se/1.5.0/docs/api/java/util/concurrent/ThreadPoolExecutor.CallerRunsPolicy.html

来自文档:

拒绝的任务

在方法execute(java.lang.Runnable)中提交的新任务将是
当执行者被拒绝时
关闭,并且当执行者
对两个最大值使用有限界限
线程和工作队列容量,以及
已饱和。无论哪种情况,
执行方法调用
RejectedExecutionHandler.rejectedExecution(java.lang.Runnable,
java.util.concurrent.ThreadPoolExecutor)
其方法
RejectedExecutionHandler。四
预定义的处理程序策略是
提供:

  1. 在默认的 ThreadPoolExecutor.AbortPolicy 中,
    处理程序抛出运行时
    拒绝执行异常
    拒绝。
  2. 在 ThreadPoolExecutor.CallerRunsPolicy 中,
    调用执行本身的线程
    运行任务。这提供了一个简单的
    反馈控制机制将
    减慢新任务的速度
    已提交。
  3. 在 ThreadPoolExecutor.DiscardPolicy 中,
    无法执行的任务很简单
    掉落。
  4. 在 ThreadPoolExecutor.DiscardOldestPolicy 中,
    如果执行器没有关闭,
    工作队列头部的任务是
    删除,然后重试执行
    (这可能会再次失败,导致
    重复。)

此外,请确保在调用 ThreadPoolExecutor 构造函数时使用有界队列,例如 ArrayBlockingQueue。否则,什么都不会被拒绝。

编辑:响应您的评论,将 ArrayBlockingQueue 的大小设置为等于线程池的最大大小并使用 AbortPolicy。

编辑2:好的,我明白你的意思了。怎么样:重写 beforeExecute() 方法来检查 getActiveCount() 是否超过 getMaximumPoolSize(),如果超过,睡觉再试一次?

You should use the CallerRunsPolicy, which executes the rejected task in the calling thread. This way, it can't submit any new tasks to the executor until that task is done, at which point there will be some free pool threads or the process will repeat.

http://java.sun.com/j2se/1.5.0/docs/api/java/util/concurrent/ThreadPoolExecutor.CallerRunsPolicy.html

From the docs:

Rejected tasks

New tasks submitted in method execute(java.lang.Runnable) will be
rejected when the Executor has been
shut down, and also when the Executor
uses finite bounds for both maximum
threads and work queue capacity, and
is saturated. In either case, the
execute method invokes the
RejectedExecutionHandler.rejectedExecution(java.lang.Runnable,
java.util.concurrent.ThreadPoolExecutor)
method of its
RejectedExecutionHandler. Four
predefined handler policies are
provided:

  1. In the default ThreadPoolExecutor.AbortPolicy, the
    handler throws a runtime
    RejectedExecutionException upon
    rejection.
  2. In ThreadPoolExecutor.CallerRunsPolicy,
    the thread that invokes execute itself
    runs the task. This provides a simple
    feedback control mechanism that will
    slow down the rate that new tasks are
    submitted.
  3. In ThreadPoolExecutor.DiscardPolicy, a
    task that cannot be executed is simply
    dropped.
  4. In ThreadPoolExecutor.DiscardOldestPolicy,
    if the executor is not shut down, the
    task at the head of the work queue is
    dropped, and then execution is retried
    (which can fail again, causing this to
    be repeated.)

Also, make sure to use a bounded queue, such as ArrayBlockingQueue, when calling the ThreadPoolExecutor constructor. Otherwise, nothing will get rejected.

Edit: in response to your comment, set the size of the ArrayBlockingQueue to be equal to the max size of the thread pool and use the AbortPolicy.

Edit 2: Ok, I see what you're getting at. What about this: override the beforeExecute() method to check that getActiveCount() doesn't exceed getMaximumPoolSize(), and if it does, sleep and try again?

橪书 2024-08-24 03:20:55

我知道,这是一个 hack,但在我看来,这里提供的最干净的 hack ;-)

因为 ThreadPoolExecutor 使用阻塞队列“offer”而不是“put”,所以让我们覆盖阻塞队列“offer”的行为:

class BlockingQueueHack<T> extends ArrayBlockingQueue<T> {

    BlockingQueueHack(int size) {
        super(size);
    }

    public boolean offer(T task) {
        try {
            this.put(task);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        return true;
    }
}

ThreadPoolExecutor tp = new ThreadPoolExecutor(1, 2, 1, TimeUnit.MINUTES, new BlockingQueueHack(5));

我测试过它似乎有效。
实施一些超时策略留给读者练习。

I know, it is a hack, but in my opinion most clean hack between those offered here ;-)

Because ThreadPoolExecutor uses blocking queue "offer" instead of "put", lets override behaviour of "offer" of the blocking queue:

class BlockingQueueHack<T> extends ArrayBlockingQueue<T> {

    BlockingQueueHack(int size) {
        super(size);
    }

    public boolean offer(T task) {
        try {
            this.put(task);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        return true;
    }
}

ThreadPoolExecutor tp = new ThreadPoolExecutor(1, 2, 1, TimeUnit.MINUTES, new BlockingQueueHack(5));

I tested it and it seems to work.
Implementing some timeout policy is left as a reader's exercise.

姜生凉生 2024-08-24 03:20:55

以下类包装 ThreadPoolExecutor 并使用信号量在工作队列已满时进行阻塞:

public final class BlockingExecutor { 

    private final Executor executor;
    private final Semaphore semaphore;

    public BlockingExecutor(int queueSize, int corePoolSize, int maxPoolSize, int keepAliveTime, TimeUnit unit, ThreadFactory factory) {
        BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>();
        this.executor = new ThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveTime, unit, queue, factory);
        this.semaphore = new Semaphore(queueSize + maxPoolSize);
    }

    private void execImpl (final Runnable command) throws InterruptedException {
        semaphore.acquire();
        try {
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        command.run();
                    } finally {
                        semaphore.release();
                    }
                }
            });
        } catch (RejectedExecutionException e) {
            // will never be thrown with an unbounded buffer (LinkedBlockingQueue)
            semaphore.release();
            throw e;
        }
    }

    public void execute (Runnable command) throws InterruptedException {
        execImpl(command);
    }
}

该包装类基于 Brian Goetz 的《Java Concurrency in Practice》一书中给出的解决方案。书中的解决方案仅采用两个构造函数参数:一个 Executor 和一个用于信号量的边界。 Fixpoint 给出的答案显示了这一点。这种方法有一个问题:它可能会进入池线程繁忙、队列已满的状态,但信号量刚刚释放了许可证。 (semaphore.release() 在finally 块中)。在这种状态下,新的任务可以抢到刚刚释放的许可,但由于任务队列已满而被拒绝。当然这不是你想要的;在这种情况下你想阻止。

为了解决这个问题,我们必须使用无界队列,正如 JCiP 明确提到的那样。信号量充当守卫,提供虚拟队列大小的效果。这具有副作用,即该单元可能包含 maxPoolSize + virtualQueueSize + maxPoolSize 任务。这是为什么?因为
finally 块中的semaphore.release()。如果所有池线程同时调用此语句,则释放 maxPoolSize 许可证,允许相同数量的任务进入该单元。如果我们使用有界队列,它仍然会满,导致任务被拒绝。现在,因为我们知道这只在池线程即将完成时才会发生,所以这不是问题。我们知道池线程不会阻塞,所以很快就会从队列中取出一个任务。

不过,您可以使用有界队列。只需确保其大小等于 virtualQueueSize + maxPoolSize 即可。较大的尺寸是无用的,信号量将阻止更多的项目进入。较小的尺寸将导致任务被拒绝。随着任务规模的减小,任务被拒绝的可能性也会增加。例如,假设您想要一个 maxPoolSize=2 且 virtualQueueSize=5 的有界执行程序。然后取一个具有 5+2=7 个许可的信号量,实际队列大小为 5+2=7。那么该单元中可以容纳的实际任务数量为2+5+2=9。当执行器已满时(队列中有 5 个任务,线程池中有 2 个任务,因此有 0 个可用许可)并且所有池线程都释放其许可,那么进来的任务正好可以获取 2 个许可。

现在 JCiP 的解决方案有点麻烦使用,因为它不强制执行所有这些约束(无界队列,或受这些数学限制等限制)。我认为这只是一个很好的例子来演示如何基于已经可用的部分构建新的线程安全类,而不是作为一个成熟的、可重用的类。我不认为后者是作者的本意。

The following class wraps around a ThreadPoolExecutor and uses a Semaphore to block then the work queue is full:

public final class BlockingExecutor { 

    private final Executor executor;
    private final Semaphore semaphore;

    public BlockingExecutor(int queueSize, int corePoolSize, int maxPoolSize, int keepAliveTime, TimeUnit unit, ThreadFactory factory) {
        BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>();
        this.executor = new ThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveTime, unit, queue, factory);
        this.semaphore = new Semaphore(queueSize + maxPoolSize);
    }

    private void execImpl (final Runnable command) throws InterruptedException {
        semaphore.acquire();
        try {
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        command.run();
                    } finally {
                        semaphore.release();
                    }
                }
            });
        } catch (RejectedExecutionException e) {
            // will never be thrown with an unbounded buffer (LinkedBlockingQueue)
            semaphore.release();
            throw e;
        }
    }

    public void execute (Runnable command) throws InterruptedException {
        execImpl(command);
    }
}

This wrapper class is based on a solution given in the book Java Concurrency in Practice by Brian Goetz. The solution in the book only takes two constructor parameters: an Executor and a bound used for the semaphore. This is shown in the answer given by Fixpoint. There is a problem with that approach: it can get in a state where the pool threads are busy, the queue is full, but the semaphore has just released a permit. (semaphore.release() in the finally block). In this state, a new task can grab the just released permit, but is rejected because the task queue is full. Of course this is not something you want; you want to block in this case.

To solve this, we must use an unbounded queue, as JCiP clearly mentions. The semaphore acts as a guard, giving the effect of a virtual queue size. This has the side effect that it is possible that the unit can contain maxPoolSize + virtualQueueSize + maxPoolSize tasks. Why is that? Because of the
semaphore.release() in the finally block. If all pool threads call this statement at the same time, then maxPoolSize permits are released, allowing the same number of tasks to enter the unit. If we were using a bounded queue, it would still be full, resulting in a rejected task. Now, because we know that this only occurs when a pool thread is almost done, this is not a problem. We know that the pool thread will not block, so a task will soon be taken from the queue.

You are able to use a bounded queue though. Just make sure that its size equals virtualQueueSize + maxPoolSize. Greater sizes are useless, the semaphore will prevent to let more items in. Smaller sizes will result in rejected tasks. The chance of tasks getting rejected increases as the size decreases. For example, say you want a bounded executor with maxPoolSize=2 and virtualQueueSize=5. Then take a semaphore with 5+2=7 permits and an actual queue size of 5+2=7. The real number of tasks that can be in the unit is then 2+5+2=9. When the executor is full (5 tasks in queue, 2 in thread pool, so 0 permits available) and ALL pool threads release their permits, then exactly 2 permits can be taken by tasks coming in.

Now the solution from JCiP is somewhat cumbersome to use as it doesn't enforce all these constraints (unbounded queue, or bounded with those math restrictions, etc.). I think that this only serves as a good example to demonstrate how you can build new thread safe classes based on the parts that are already available, but not as a full-grown, reusable class. I don't think that the latter was the author's intention.

情话难免假 2024-08-24 03:20:55

Hibernate 有一个简单的 BlockPolicy ,可以执行您想要的操作:

请参阅:Executors.java

/**
 * A handler for rejected tasks that will have the caller block until
 * space is available.
 */
public static class BlockPolicy implements RejectedExecutionHandler {

    /**
     * Creates a <tt>BlockPolicy</tt>.
     */
    public BlockPolicy() { }

    /**
     * Puts the Runnable to the blocking queue, effectively blocking
     * the delegating thread until space is available.
     * @param r the runnable task requested to be executed
     * @param e the executor attempting to execute this task
     */
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        try {
            e.getQueue().put( r );
        }
        catch (InterruptedException e1) {
            log.error( "Work discarded, thread was interrupted while waiting for space to schedule: {}", r );
        }
    }
}

Hibernate has a BlockPolicy that is simple and may do what you want:

See: Executors.java

/**
 * A handler for rejected tasks that will have the caller block until
 * space is available.
 */
public static class BlockPolicy implements RejectedExecutionHandler {

    /**
     * Creates a <tt>BlockPolicy</tt>.
     */
    public BlockPolicy() { }

    /**
     * Puts the Runnable to the blocking queue, effectively blocking
     * the delegating thread until space is available.
     * @param r the runnable task requested to be executed
     * @param e the executor attempting to execute this task
     */
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        try {
            e.getQueue().put( r );
        }
        catch (InterruptedException e1) {
            log.error( "Work discarded, thread was interrupted while waiting for space to schedule: {}", r );
        }
    }
}
渡你暖光 2024-08-24 03:20:55

上面引用的 Java 并发实践 中的 BoundedExecutor 答案只有在您为执行器使用无界队列,或者信号量边界不大于队列大小时才能正常工作。信号量是在提交线程和池中的线程之间共享状态的,因此即使队列大小<0,也可以使执行器饱和。边界 <=(队列大小 + 池大小)。

仅当您的任务不会永远运行时,使用 CallerRunsPolicy 才有效,在这种情况下,您的提交线程将永远保留在rejectedExecution 中,如果您的任务需要很长时间,那么这是一个坏主意运行时间,因为如果提交线程本身正在运行任务,则它无法提交任何新任务或执行任何其他操作。

如果这是不可接受的,那么我建议在提交任务之前检查执行程序的有界队列的大小。如果队列已满,则稍等片刻再尝试再次提交。吞吐量会受到影响,但我建议这是一个比许多其他建议的解决方案更简单的解决方案,并且可以保证任何任务都不会被拒绝。

The BoundedExecutor answer quoted above from Java Concurrency in Practice only works correctly if you use an unbounded queue for the Executor, or the semaphore bound is no greater than the queue size. The semaphore is state shared between the submitting thread and the threads in the pool, making it possible to saturate the executor even if queue size < bound <= (queue size + pool size).

Using CallerRunsPolicy is only valid if your tasks don't run forever, in which case your submitting thread will remain in rejectedExecution forever, and a bad idea if your tasks take a long time to run, because the submitting thread can't submit any new tasks or do anything else if it's running a task itself.

If that's not acceptable then I suggest checking the size of the executor's bounded queue before submitting a task. If the queue is full, then wait a short time before trying to submit again. The throughput will suffer, but I suggest it's a simpler solution than many of the other proposed solutions and you're guaranteed no tasks will get rejected.

半世晨晓 2024-08-24 03:20:55

您可以像这样使用自定义 RejectedExecutionHandler

ThreadPoolExecutor tp= new ThreadPoolExecutor(core_size, // core size
                max_handlers, // max size 
                timeout_in_seconds, // idle timeout 
                TimeUnit.SECONDS, queue, new RejectedExecutionHandler() {
                    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                        // This will block if the queue is full
                        try {
                            executor.getQueue().put(r);
                        } catch (InterruptedException e) {
                            System.err.println(e.getMessage());
                        }

                    }
                });

you can use a custom RejectedExecutionHandler like this

ThreadPoolExecutor tp= new ThreadPoolExecutor(core_size, // core size
                max_handlers, // max size 
                timeout_in_seconds, // idle timeout 
                TimeUnit.SECONDS, queue, new RejectedExecutionHandler() {
                    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                        // This will block if the queue is full
                        try {
                            executor.getQueue().put(r);
                        } catch (InterruptedException e) {
                            System.err.println(e.getMessage());
                        }

                    }
                });
风吹雨成花 2024-08-24 03:20:55

创建您自己的阻塞队列以供执行器使用,具有您正在寻找的阻塞行为,同时始终返回可用的剩余容量(确保执行器不会尝试创建比其核心池更多的线程,或触发拒绝处理程序)。

我相信这会给你带来你正在寻找的阻止行为。拒绝处理程序永远不会符合要求,因为这表明执行程序无法执行该任务。我可以想象的是,你会在处理程序中得到某种形式的“忙等待”。这不是你想要的,你想要一个阻塞调用者的执行器队列......

Create your own blocking queue to be used by the Executor, with the blocking behavior you are looking for, while always returning available remaining capacity (ensuring the executor will not try to create more threads than its core pool, or trigger the rejection handler).

I believe this will get you the blocking behavior you are looking for. A rejection handler will never fit the bill, since that indicates the executor can not perform the task. What I could envision there is that you get some form of 'busy waiting' in the handler. That is not what you want, you want a queue for the executor that blocks the caller...

沒落の蓅哖 2024-08-24 03:20:55

避免 @FixPoint 解决方案出现问题。可以使用 ListeningExecutorService 并在 FutureCallback 内释放信号量 onSuccess 和 onFailure。

To avoid issues with @FixPoint solution. One could use ListeningExecutorService and release the semaphore onSuccess and onFailure inside FutureCallback.

帥小哥 2024-08-24 03:20:55

最近我发现这个问题也有同样的问题。 OP 没有明确说明,但我们不想使用在提交者线程上执行任务的 RejectedExecutionHandler,因为如果该任务长时间运行,这将导致工作线程利用率不足。

阅读所有答案和评论,特别是使用信号量或使用 afterExecute 有缺陷的解决方案,我仔细查看了 ThreadPoolExecutor 看看是否有出路。我很惊讶地看到有2000多行(带注释的)代码,其中一些让我感觉头晕。考虑到我实际上有一个相当简单的要求——一个生产者,几个消费者,当没有消费者可以工作时让生产者阻塞——我决定推出自己的解决方案。它不是一个 ExecutorService,而只是一个 Executor。而且它不会根据工作负载调整线程数量,而是仅保留固定数量的线程,这也符合我的要求。这是代码。随意吐槽一下:-)

package x;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.SynchronousQueue;

/**
 * distributes {@code Runnable}s to a fixed number of threads. To keep the
 * code lean, this is not an {@code ExecutorService}. In particular there is
 * only very simple support to shut this executor down.
 */
public class ParallelExecutor implements Executor {
  // other bounded queues work as well and are useful to buffer peak loads
  private final BlockingQueue<Runnable> workQueue =
      new SynchronousQueue<Runnable>();
  private final Thread[] threads;

  /*+**********************************************************************/
  /**
   * creates the requested number of threads and starts them to wait for
   * incoming work
   */
  public ParallelExecutor(int numThreads) {
    this.threads = new Thread[numThreads];
    for(int i=0; i<numThreads; i++) {
      // could reuse the same Runner all over, but keep it simple
      Thread t = new Thread(new Runner());
      this.threads[i] = t;
      t.start();
    }
  }
  /*+**********************************************************************/
  /**
   * returns immediately without waiting for the task to be finished, but may
   * block if all worker threads are busy.
   * 
   * @throws RejectedExecutionException if we got interrupted while waiting
   *         for a free worker
   */
  @Override
  public void execute(Runnable task)  {
    try {
      workQueue.put(task);
    } catch (InterruptedException e) {
      Thread.currentThread().interrupt();
      throw new RejectedExecutionException("interrupt while waiting for a free "
          + "worker.", e);
    }
  }
  /*+**********************************************************************/
  /**
   * Interrupts all workers and joins them. Tasks susceptible to an interrupt
   * will preempt their work. Blocks until the last thread surrendered.
   */
  public void interruptAndJoinAll() throws InterruptedException {
    for(Thread t : threads) {
      t.interrupt();
    }
    for(Thread t : threads) {
      t.join();
    }
  }
  /*+**********************************************************************/
  private final class Runner implements Runnable {
    @Override
    public void run() {
      while (!Thread.currentThread().isInterrupted()) {
        Runnable task;
        try {
          task = workQueue.take();
        } catch (InterruptedException e) {
          // canonical handling despite exiting right away
          Thread.currentThread().interrupt(); 
          return;
        }
        try {
          task.run();
        } catch (RuntimeException e) {
          // production code to use a logging framework
          e.printStackTrace();
        }
      }
    }
  }
}

Recently I found this question having the same problem. The OP does not say so explicitly, but we do not want to use the RejectedExecutionHandler which executes a task on the submitter's thread, because this will under-utilize the worker threads if this task is a long running one.

Reading all the answers and comments, in particular the flawed solution with the semaphore or using afterExecute I had a closer look at the code of the ThreadPoolExecutor to see if there is some way out. I was amazed to see that there are more than 2000 lines of (commented) code, some of which make me feel dizzy. Given the rather simple requirement I actually have --- one producer, several consumers, let the producer block when no consumers can take work --- I decided to roll my own solution. It is not an ExecutorService but just an Executor. And it does not adapt the number of threads to the work load, but holds a fixed number of threads only, which also fits my requirements. Here is the code. Feel free to rant about it :-)

package x;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.SynchronousQueue;

/**
 * distributes {@code Runnable}s to a fixed number of threads. To keep the
 * code lean, this is not an {@code ExecutorService}. In particular there is
 * only very simple support to shut this executor down.
 */
public class ParallelExecutor implements Executor {
  // other bounded queues work as well and are useful to buffer peak loads
  private final BlockingQueue<Runnable> workQueue =
      new SynchronousQueue<Runnable>();
  private final Thread[] threads;

  /*+**********************************************************************/
  /**
   * creates the requested number of threads and starts them to wait for
   * incoming work
   */
  public ParallelExecutor(int numThreads) {
    this.threads = new Thread[numThreads];
    for(int i=0; i<numThreads; i++) {
      // could reuse the same Runner all over, but keep it simple
      Thread t = new Thread(new Runner());
      this.threads[i] = t;
      t.start();
    }
  }
  /*+**********************************************************************/
  /**
   * returns immediately without waiting for the task to be finished, but may
   * block if all worker threads are busy.
   * 
   * @throws RejectedExecutionException if we got interrupted while waiting
   *         for a free worker
   */
  @Override
  public void execute(Runnable task)  {
    try {
      workQueue.put(task);
    } catch (InterruptedException e) {
      Thread.currentThread().interrupt();
      throw new RejectedExecutionException("interrupt while waiting for a free "
          + "worker.", e);
    }
  }
  /*+**********************************************************************/
  /**
   * Interrupts all workers and joins them. Tasks susceptible to an interrupt
   * will preempt their work. Blocks until the last thread surrendered.
   */
  public void interruptAndJoinAll() throws InterruptedException {
    for(Thread t : threads) {
      t.interrupt();
    }
    for(Thread t : threads) {
      t.join();
    }
  }
  /*+**********************************************************************/
  private final class Runner implements Runnable {
    @Override
    public void run() {
      while (!Thread.currentThread().isInterrupted()) {
        Runnable task;
        try {
          task = workQueue.take();
        } catch (InterruptedException e) {
          // canonical handling despite exiting right away
          Thread.currentThread().interrupt(); 
          return;
        }
        try {
          task.run();
        } catch (RuntimeException e) {
          // production code to use a logging framework
          e.printStackTrace();
        }
      }
    }
  }
}
冷清清 2024-08-24 03:20:55

我相信有一种非常优雅的方法可以通过使用 java.util.concurrent.Semaphore 和 Executor.newFixedThreadPool 的委托行为来解决这个问题。
新的执行器服务只会在有线程执行新任务时才会执行。阻塞由信号量管理,许可数量等于线程数量。当任务完成时,它会返回一个许可证。

public class FixedThreadBlockingExecutorService extends AbstractExecutorService {

private final ExecutorService executor;
private final Semaphore blockExecution;

public FixedThreadBlockingExecutorService(int nTreads) {
    this.executor = Executors.newFixedThreadPool(nTreads);
    blockExecution = new Semaphore(nTreads);
}

@Override
public void shutdown() {
    executor.shutdown();
}

@Override
public List<Runnable> shutdownNow() {
    return executor.shutdownNow();
}

@Override
public boolean isShutdown() {
    return executor.isShutdown();
}

@Override
public boolean isTerminated() {
    return executor.isTerminated();
}

@Override
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
    return executor.awaitTermination(timeout, unit);
}

@Override
public void execute(Runnable command) {
    blockExecution.acquireUninterruptibly();
    executor.execute(() -> {
        try {
            command.run();
        } finally {
            blockExecution.release();
        }
    });
}

I believe there is quite elegant way to solve this problem by using java.util.concurrent.Semaphore and delegating behavior of Executor.newFixedThreadPool.
The new executor service will only execute new task when there is a thread to do so. Blocking is managed by Semaphore with number of permits equal to number of threads. When a task is finished it returns a permit.

public class FixedThreadBlockingExecutorService extends AbstractExecutorService {

private final ExecutorService executor;
private final Semaphore blockExecution;

public FixedThreadBlockingExecutorService(int nTreads) {
    this.executor = Executors.newFixedThreadPool(nTreads);
    blockExecution = new Semaphore(nTreads);
}

@Override
public void shutdown() {
    executor.shutdown();
}

@Override
public List<Runnable> shutdownNow() {
    return executor.shutdownNow();
}

@Override
public boolean isShutdown() {
    return executor.isShutdown();
}

@Override
public boolean isTerminated() {
    return executor.isTerminated();
}

@Override
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
    return executor.awaitTermination(timeout, unit);
}

@Override
public void execute(Runnable command) {
    blockExecution.acquireUninterruptibly();
    executor.execute(() -> {
        try {
            command.run();
        } finally {
            blockExecution.release();
        }
    });
}
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文