使用 ScheduledThreadPoolExecutor 建立有界队列的最佳方法是什么?

发布于 2024-12-04 06:17:49 字数 399 浏览 1 评论 0原文

Sun Java (1.6) ScheduledThreadPoolExecutorThreadPoolExecutor 的扩展,内部使用 DelayQueue 的实现,它是一个无界队列。我需要的是一个具有有界队列ScheduledThreadpoolExecutor,即它对队列中累积的任务有限制,这样当队列中的任务超过限制时,它就会开始拒绝进一步提交任务并防止 JVM 内存不足。

令人惊讶的是,谷歌或 stackoverflow 没有向我指出任何讨论这个问题的结果。有没有我错过的这样的东西?如果没有,我如何实现 ScheduledThreadpoolExecutor 以最佳方式提供我预期的功能?

The Sun Java (1.6) ScheduledThreadPoolExecutor which is an extension of ThreadPoolExecutor internally uses an implementation of DelayQueue which is an unbounded queue. What I need is a ScheduledThreadpoolExecutor with a bounded queue i.e. it has a limit on the tasks accumulating in the queue so that when tasks in queue exceed the limit, it starts rejecting the further submitted tasks and prevents JVM going out of memory.

Surprisingly, google or stackoverflow did not point me to any results which are discussing this problem. Is there any such thing already available I am missing out? If not, how can I implement a ScheduledThreadpoolExecutor to provide me my expected functionality in a best way?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(5

内心荒芜 2024-12-11 06:17:49

正如其他人已经指出的那样,没有现成的方法可以做到这一点。只要确保尝试使用“组合”而不是“继承”即可。创建一个新类,该类实现必要的接口,并通过根据需要对必要的方法进行检查来委托给基础 ScheduledThreadPoolExecutor

您还可以使用此线程中指定的技术 进行简单的修改。您可以使用 Semaphore#tryAcquire 代替使用 Semaphore#acquire,并根据布尔结果决定是否需要调用拒绝处理程序。想想看,我个人认为,库作者直接子类化特定执行器而不是依赖组合来创建普通执行器上的“可调度”包装器是一种疏忽。

As others have already pointed out, there isn't a ready way of doing this. Just make sure you try to use "composition" instead of "inheritance". Create a new class which implements the necessary interface and delegate to the underlying ScheduledThreadPoolExecutor by doing checks as per required on the necessary methods.

You can also use the technique specified in this thread with a simple modification. Instead of using Semaphore#acquire, you can use Semaphore#tryAcquire and depending on the boolean outcome decide whether you need to call the rejection handler or not. Come to think of it, I personally feel that it was an oversight on the part of the library authors to directly subclass a specific executor rather than relying on composition to create a "schedulable" wrapper over a normal executor.

记忆で 2024-12-11 06:17:49

如何以不同的方式处理它,即根据队列大小延迟任务提交。执行器服务通过 getQueue() 公开队列。您可以对其调用 size() ,并根据您计划的队列大小限制,您可以开始拒绝任务或开始延迟任务执行(增加计划时间,保持队列大小作为因素之一)。

总而言之,这又不是最好的解决方案;仅供参考,java提供了延迟队列来支持工作窃取。

How about handling it differently i.e. depending upon the queue size delay the task subsmission. The executor services exposes the queue via getQueue(). You can invoke the size() on it and depending upon the limit you plan for the queue size, you can either start rejecting the tasks or start delaying the task execution (increase the scheduled time keeping the size of the queue as one of the factor).

All said, this is again not the best solution; just fyi, java provides delay queue to support work stealing.

千紇 2024-12-11 06:17:49

最简单的解决方法是使用计划执行器仅计划任务,而不实际执行它们。调度程序必须显式检查执行程序队列大小,如果执行程序队列高于阈值,则丢弃任务。

另一种选择是在计划任务中检查 ScheduledThreadPoolExecutor 队列大小。如果队列高于阈值,则立即返回。在这种情况下,任务将立即执行并从队列中删除。所以不会发生溢出。

The simplest workaround is to use scheduled executor to schedule tasks only, not to actually execute them. Scheduler have to explicitly check executor queue size and discard task if executor queue is above a threshold.

Another option is to check ScheduledThreadPoolExecutor queue size right in scheduled task. If the queue is above threshold, just return immediately. In this case the task will be executed instantly and removed from queue. So overflow won't happen.

请持续率性 2024-12-11 06:17:49

ScheduledThreadPoolExecutor 不使用队列作为字段,而是调用 getQueue。但它调用 super.getQueue ,它是来自 ThreadPoolExecutor 的队列。您可以使用反射来覆盖它,如下所示:

public class BoundedScheduledThreadPoolExecutor extends ScheduledThreadPoolExecutor {
  public BoundedScheduledThreadPoolExecutor(int corePoolSize, RejectedExecutionHandler handler, int queueCapacity) {
    super(corePoolSize, handler);
    setMaximumPoolSize(corePoolSize);
    setKeepAliveTime(0, TimeUnit.MILLISECONDS);
    LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>(queueCapacity) {
            @Override
            public boolean add(Runnable r) {
                boolean added = offer(r);
                if (added) {
                    return added;
                } else {
                    getRejectedExecutionHandler().rejectedExecution(r, CrashingThreadPoolExecutor.this);
                    return false;
                }
            }
        };

    try {
        Field workQueueField = ThreadPoolExecutor.class.getDeclaredField("workQueue");
        workQueueField.setAccessible(true);
        workQueueField.set(this, queue);
    } catch (Exception e) {
        throw new RuntimeException(e);
    }
  }
}

ScheduledThreadPoolExecutor does not use queue as field but instead calls getQueue. But it calls super.getQueue which is queue from ThreadPoolExecutor. You can use reflection to override it like this:

public class BoundedScheduledThreadPoolExecutor extends ScheduledThreadPoolExecutor {
  public BoundedScheduledThreadPoolExecutor(int corePoolSize, RejectedExecutionHandler handler, int queueCapacity) {
    super(corePoolSize, handler);
    setMaximumPoolSize(corePoolSize);
    setKeepAliveTime(0, TimeUnit.MILLISECONDS);
    LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>(queueCapacity) {
            @Override
            public boolean add(Runnable r) {
                boolean added = offer(r);
                if (added) {
                    return added;
                } else {
                    getRejectedExecutionHandler().rejectedExecution(r, CrashingThreadPoolExecutor.this);
                    return false;
                }
            }
        };

    try {
        Field workQueueField = ThreadPoolExecutor.class.getDeclaredField("workQueue");
        workQueueField.setAccessible(true);
        workQueueField.set(this, queue);
    } catch (Exception e) {
        throw new RuntimeException(e);
    }
  }
}
停滞 2024-12-11 06:17:49

如果您真的,真的不想重新实现ScheduledThreadPoolExecutor,那么您可以扩展它并覆盖所有schedule*方法并实现您的自己的任务界限。但这会相当令人讨厌:

private final Object scheduleMonitor = new Object();

@Override
public ScheduledFuture<?> schedule(Runnable command,
                                   long delay,
                                   TimeUnit unit) 
{
    if (command == null || unit == null)
        throw new NullPointerException();

    synchronized (scheduleMonitor)
    {                
        while (getQueue().size() >= MAX_QUEUE_SIZE)
        {
           scheduleMonitor.wait();
        }
        super.schedule(command, delay, unit);
    }
}

@Override
Runnable getTask() 
{
    final Runnable r = getTask();
    synchronized (scheduleMonitor)
    {
        scheduleMonitor.notify();
    }
    return r;
}

并重复:

  • public; ScheduledFutureSchedule(Callable可调用,长延迟,TimeUnit单位)
  • public Sc​​heduledFuture ScheduleAtFixedRate(可运行命令,
    长初始延迟,
    长期,
    TimeUnit 单位)
  • public Sc​​heduledFuture; ScheduleWithFixedDelay(可运行命令,
    长初始延迟,
    长时间延迟,
    TimeUnit 单位)

注意,这不会阻止重复任务占用队列超过限制,它只会阻止新安排的任务。

另一个警告是,我没有通过在锁定 scheduleMonitor 时调用 super.schedule 来检查任何死锁问题...

If you really, really don't want to re-implement ScheduledThreadPoolExecutor then you could extend it and override all the schedule* methods and implement your own bounding of tasks. It would be fairly nasty though:

private final Object scheduleMonitor = new Object();

@Override
public ScheduledFuture<?> schedule(Runnable command,
                                   long delay,
                                   TimeUnit unit) 
{
    if (command == null || unit == null)
        throw new NullPointerException();

    synchronized (scheduleMonitor)
    {                
        while (getQueue().size() >= MAX_QUEUE_SIZE)
        {
           scheduleMonitor.wait();
        }
        super.schedule(command, delay, unit);
    }
}

@Override
Runnable getTask() 
{
    final Runnable r = getTask();
    synchronized (scheduleMonitor)
    {
        scheduleMonitor.notify();
    }
    return r;
}

And repeat for:

  • public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit)
  • public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
    long initialDelay,
    long period,
    TimeUnit unit)
  • public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
    long initialDelay,
    long delay,
    TimeUnit unit)

Note, this won't stop repeating tasks from taking the queue over a limit, it will only block newly scheduled tasks.

Another caveat is that I haven't checked for any deadlock issues by calling super.schedule while holding the lock on scheduleMonitor...

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文