如何在 ExecutorService.shutdown() 之后立即运行未完成的任务?

发布于 2024-11-25 08:14:40 字数 303 浏览 0 评论 0原文

我有一个 ScheduledExecutorService ,其中的任务计划在一小时内执行。如何获取未完成任务的列表,以便强制它们立即运行?

我相信 shutdown() 会等待一个小时,并且看起来好像 shutdownNow() 返回一个无法 run() 的 Runnable 列表,因为 Runnable 实现会检查 Executor 状态当它注意到它已经关闭时,Runnable 拒绝运行。实际实现请参见ScheduledThreadPoolExecutor.ScheduledFutureTask.run()。

有什么想法吗?

I've got a ScheduledExecutorService with tasks scheduled to execute in an hour. How do I get the list of outstanding tasks so I can force them to run immediately?

I believe shutdown() will wait an hour and it looks as if shutdownNow() returns a list of Runnables that cannot be run() because the Runnable implementation checks the Executor state and when it notices that it has shut down the Runnable refuses to run. See ScheduledThreadPoolExecutor.ScheduledFutureTask.run() for the actual implementation.

Any ideas?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

冷弦 2024-12-02 08:14:40

我采纳了 Mark Peters 的答案,实现了所有抽象方法,添加了线程安全性,并尽可能尊重底层的 ScheduledThreadPoolExecutor 配置。

/**
 * Overrides shutdown() to run outstanding tasks immediately.
 * 
 * @author Gili Tzabari
 */
public class RunOnShutdownScheduledExecutorService extends AbstractExecutorService
    implements ScheduledExecutorService
{
    private final ScheduledExecutorService delegate;
    private final ScheduledThreadPoolExecutor scheduledThreadPoolExecutor;
    private final ExecutorService immediateService;
    private final ConcurrentMap<Future<?>, Callable<?>> tasks = Maps.newConcurrentMap();

    /**
     * Creates a new RunOnShutdownScheduledExecutorService.
     * 
     * @param delegate the executor to delegate to
     */
    public RunOnShutdownScheduledExecutorService(ScheduledExecutorService delegate)
    {
        Preconditions.checkNotNull(delegate, "delegate may not be null");

        this.delegate = delegate;
        if (delegate instanceof ScheduledThreadPoolExecutor)
        {
            this.scheduledThreadPoolExecutor = (ScheduledThreadPoolExecutor) delegate;
            this.immediateService = Executors.newFixedThreadPool(scheduledThreadPoolExecutor.
                getCorePoolSize(), scheduledThreadPoolExecutor.getThreadFactory());
        }
        else
        {
            scheduledThreadPoolExecutor = null;
            this.immediateService = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().
                setNameFormat(RunOnShutdownScheduledExecutorService.class.getName() + "-%d").build());
        }
    }

    @Override
    public boolean isShutdown()
    {
        return delegate.isShutdown();
    }

    @Override
    public boolean isTerminated()
    {
        return delegate.isTerminated();
    }

    @Override
    public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException
    {
        long before = System.nanoTime();
        if (!delegate.awaitTermination(timeout, unit))
            return false;
        long after = System.nanoTime();
        long timeLeft = timeout - unit.convert(after - before, TimeUnit.NANOSECONDS);
        return immediateService.awaitTermination(timeLeft, unit);
    }

    @Override
    public void execute(Runnable command)
    {
        delegate.execute(command);
    }

    @Override
    public ScheduledFuture<?> schedule(final Runnable command, long delay, TimeUnit unit)
    {
        CleaningRunnable decorated = new CleaningRunnable(command);
        ScheduledFuture<?> future = delegate.schedule(decorated, delay, unit);
        decorated.setFuture(future);
        tasks.put(future, Executors.callable(command));
        return new CleaningScheduledFuture<>(future);
    }

    @Override
    public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit)
    {
        CallableWithFuture<V> decorated = new CallableWithFuture<>(callable);
        ScheduledFuture<V> future = delegate.schedule(decorated, delay, unit);
        decorated.setFuture(future);
        tasks.put(future, callable);
        return new CleaningScheduledFuture<>(future);
    }

    @Override
    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period,
        TimeUnit unit)
    {
        CleaningRunnable decorated = new CleaningRunnable(command);
        ScheduledFuture<?> future = delegate.scheduleAtFixedRate(decorated, initialDelay, period, unit);
        decorated.setFuture(future);
        tasks.put(future, Executors.callable(command));
        return new CleaningScheduledFuture<>(future);
    }

    @Override
    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay,
        TimeUnit unit)
    {
        CleaningRunnable decorated = new CleaningRunnable(command);
        ScheduledFuture<?> future =
            delegate.scheduleWithFixedDelay(decorated, initialDelay, delay, unit);
        decorated.setFuture(future);
        tasks.put(future, Executors.callable(command));
        return new CleaningScheduledFuture<>(future);
    }

    @Override
    public synchronized void shutdown()
    {
        if (delegate.isShutdown())
            return;
        if (scheduledThreadPoolExecutor != null)
        {
            // WORKAROUND: http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=7069418
            //
            // Cancel waiting scheduled tasks, otherwise executor won't shut down
            scheduledThreadPoolExecutor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
        }
        delegate.shutdown();
        // Users will not be able to cancel() Futures past this point so we're guaranteed that
        // "tasks" will not be modified.

        final List<Callable<?>> outstandingTasks = Lists.newArrayList();
        for (Map.Entry<Future<?>, Callable<?>> entry: tasks.entrySet())
        {
            Future<?> future = entry.getKey();
            Callable<?> task = entry.getValue();

            if (future.isDone() && future.isCancelled())
            {
                // Task called by the underlying executor, not the user. See CleaningScheduledFuture.
                outstandingTasks.add(task);
            }
        }
        tasks.clear();
        if (outstandingTasks.isEmpty())
        {
            immediateService.shutdown();
            return;
        }

        immediateService.submit(new Callable<Void>()
        {
            @Override
            public Void call() throws Exception
            {
                delegate.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);

                // Execute outstanding tasks only after the delegate executor finishes shutting down
                for (Callable<?> task: outstandingTasks)
                    immediateService.submit(task);
                immediateService.shutdown();
                return null;
            }
        });
    }

    @Override
    public List<Runnable> shutdownNow()
    {
        return delegate.shutdownNow();
    }

    /**
     * A Runnable that removes its future when running.
     */
    private class CleaningRunnable implements Runnable
    {
        private final Runnable delegate;
        private Future<?> future;

        /**
         * Creates a new RunnableWithFuture.
         * 
         * @param delegate the Runnable to delegate to
         * @throws NullPointerException if delegate is null
         */
        public CleaningRunnable(Runnable delegate)
        {
            Preconditions.checkNotNull(delegate, "delegate may not be null");

            this.delegate = delegate;
        }

        /**
         * Associates a Future with the runnable.
         * 
         * @param future a future
         */
        public void setFuture(Future<?> future)
        {
            this.future = future;
        }

        @Override
        public void run()
        {
            tasks.remove(future);
            delegate.run();
        }
    }

    /**
     * A Callable that removes its future when running.
     */
    private class CallableWithFuture<V> implements Callable<V>
    {
        private final Callable<V> delegate;
        private Future<V> future;

        /**
         * Creates a new CallableWithFuture.
         * 
         * @param delegate the Callable to delegate to
         * @throws NullPointerException if delegate is null
         */
        public CallableWithFuture(Callable<V> delegate)
        {
            Preconditions.checkNotNull(delegate, "delegate may not be null");

            this.delegate = delegate;
        }

        /**
         * Associates a Future with the runnable.
         * 
         * @param future a future
         */
        public void setFuture(Future<V> future)
        {
            this.future = future;
        }

        @Override
        public V call() throws Exception
        {
            tasks.remove(future);
            return delegate.call();
        }
    }

    /**
     * A ScheduledFuture that removes its future when canceling.
     * 
     * This allows us to differentiate between tasks canceled by the user and the underlying
     * executor. Tasks canceled by the user are removed from "tasks".
     * 
     * @param <V> The result type returned by this Future
     */
    private class CleaningScheduledFuture<V> implements ScheduledFuture<V>
    {
        private final ScheduledFuture<V> delegate;

        /**
         * Creates a new MyScheduledFuture.
         * 
         * @param delegate the future to delegate to
         * @throws NullPointerException if delegate is null
         */
        public CleaningScheduledFuture(ScheduledFuture<V> delegate)
        {
            Preconditions.checkNotNull(delegate, "delegate may not be null");

            this.delegate = delegate;
        }

        @Override
        public long getDelay(TimeUnit unit)
        {
            return delegate.getDelay(unit);
        }

        @Override
        public int compareTo(Delayed o)
        {
            return delegate.compareTo(o);
        }

        @Override
        public boolean cancel(boolean mayInterruptIfRunning)
        {
            boolean result = delegate.cancel(mayInterruptIfRunning);

            if (result)
            {
                // Tasks canceled by users are removed from "tasks"
                tasks.remove(delegate);
            }
            return result;
        }

        @Override
        public boolean isCancelled()
        {
            return delegate.isCancelled();
        }

        @Override
        public boolean isDone()
        {
            return delegate.isDone();
        }

        @Override
        public V get() throws InterruptedException, ExecutionException
        {
            return delegate.get();
        }

        @Override
        public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException,
            TimeoutException
        {
            return delegate.get(timeout, unit);
        }
    }
}

I've taken Mark Peters' answer, implementing all abstract methods, added thread-safety and tried respecting the underlying ScheduledThreadPoolExecutor configuration whenever possible.

/**
 * Overrides shutdown() to run outstanding tasks immediately.
 * 
 * @author Gili Tzabari
 */
public class RunOnShutdownScheduledExecutorService extends AbstractExecutorService
    implements ScheduledExecutorService
{
    private final ScheduledExecutorService delegate;
    private final ScheduledThreadPoolExecutor scheduledThreadPoolExecutor;
    private final ExecutorService immediateService;
    private final ConcurrentMap<Future<?>, Callable<?>> tasks = Maps.newConcurrentMap();

    /**
     * Creates a new RunOnShutdownScheduledExecutorService.
     * 
     * @param delegate the executor to delegate to
     */
    public RunOnShutdownScheduledExecutorService(ScheduledExecutorService delegate)
    {
        Preconditions.checkNotNull(delegate, "delegate may not be null");

        this.delegate = delegate;
        if (delegate instanceof ScheduledThreadPoolExecutor)
        {
            this.scheduledThreadPoolExecutor = (ScheduledThreadPoolExecutor) delegate;
            this.immediateService = Executors.newFixedThreadPool(scheduledThreadPoolExecutor.
                getCorePoolSize(), scheduledThreadPoolExecutor.getThreadFactory());
        }
        else
        {
            scheduledThreadPoolExecutor = null;
            this.immediateService = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().
                setNameFormat(RunOnShutdownScheduledExecutorService.class.getName() + "-%d").build());
        }
    }

    @Override
    public boolean isShutdown()
    {
        return delegate.isShutdown();
    }

    @Override
    public boolean isTerminated()
    {
        return delegate.isTerminated();
    }

    @Override
    public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException
    {
        long before = System.nanoTime();
        if (!delegate.awaitTermination(timeout, unit))
            return false;
        long after = System.nanoTime();
        long timeLeft = timeout - unit.convert(after - before, TimeUnit.NANOSECONDS);
        return immediateService.awaitTermination(timeLeft, unit);
    }

    @Override
    public void execute(Runnable command)
    {
        delegate.execute(command);
    }

    @Override
    public ScheduledFuture<?> schedule(final Runnable command, long delay, TimeUnit unit)
    {
        CleaningRunnable decorated = new CleaningRunnable(command);
        ScheduledFuture<?> future = delegate.schedule(decorated, delay, unit);
        decorated.setFuture(future);
        tasks.put(future, Executors.callable(command));
        return new CleaningScheduledFuture<>(future);
    }

    @Override
    public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit)
    {
        CallableWithFuture<V> decorated = new CallableWithFuture<>(callable);
        ScheduledFuture<V> future = delegate.schedule(decorated, delay, unit);
        decorated.setFuture(future);
        tasks.put(future, callable);
        return new CleaningScheduledFuture<>(future);
    }

    @Override
    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period,
        TimeUnit unit)
    {
        CleaningRunnable decorated = new CleaningRunnable(command);
        ScheduledFuture<?> future = delegate.scheduleAtFixedRate(decorated, initialDelay, period, unit);
        decorated.setFuture(future);
        tasks.put(future, Executors.callable(command));
        return new CleaningScheduledFuture<>(future);
    }

    @Override
    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay,
        TimeUnit unit)
    {
        CleaningRunnable decorated = new CleaningRunnable(command);
        ScheduledFuture<?> future =
            delegate.scheduleWithFixedDelay(decorated, initialDelay, delay, unit);
        decorated.setFuture(future);
        tasks.put(future, Executors.callable(command));
        return new CleaningScheduledFuture<>(future);
    }

    @Override
    public synchronized void shutdown()
    {
        if (delegate.isShutdown())
            return;
        if (scheduledThreadPoolExecutor != null)
        {
            // WORKAROUND: http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=7069418
            //
            // Cancel waiting scheduled tasks, otherwise executor won't shut down
            scheduledThreadPoolExecutor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
        }
        delegate.shutdown();
        // Users will not be able to cancel() Futures past this point so we're guaranteed that
        // "tasks" will not be modified.

        final List<Callable<?>> outstandingTasks = Lists.newArrayList();
        for (Map.Entry<Future<?>, Callable<?>> entry: tasks.entrySet())
        {
            Future<?> future = entry.getKey();
            Callable<?> task = entry.getValue();

            if (future.isDone() && future.isCancelled())
            {
                // Task called by the underlying executor, not the user. See CleaningScheduledFuture.
                outstandingTasks.add(task);
            }
        }
        tasks.clear();
        if (outstandingTasks.isEmpty())
        {
            immediateService.shutdown();
            return;
        }

        immediateService.submit(new Callable<Void>()
        {
            @Override
            public Void call() throws Exception
            {
                delegate.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);

                // Execute outstanding tasks only after the delegate executor finishes shutting down
                for (Callable<?> task: outstandingTasks)
                    immediateService.submit(task);
                immediateService.shutdown();
                return null;
            }
        });
    }

    @Override
    public List<Runnable> shutdownNow()
    {
        return delegate.shutdownNow();
    }

    /**
     * A Runnable that removes its future when running.
     */
    private class CleaningRunnable implements Runnable
    {
        private final Runnable delegate;
        private Future<?> future;

        /**
         * Creates a new RunnableWithFuture.
         * 
         * @param delegate the Runnable to delegate to
         * @throws NullPointerException if delegate is null
         */
        public CleaningRunnable(Runnable delegate)
        {
            Preconditions.checkNotNull(delegate, "delegate may not be null");

            this.delegate = delegate;
        }

        /**
         * Associates a Future with the runnable.
         * 
         * @param future a future
         */
        public void setFuture(Future<?> future)
        {
            this.future = future;
        }

        @Override
        public void run()
        {
            tasks.remove(future);
            delegate.run();
        }
    }

    /**
     * A Callable that removes its future when running.
     */
    private class CallableWithFuture<V> implements Callable<V>
    {
        private final Callable<V> delegate;
        private Future<V> future;

        /**
         * Creates a new CallableWithFuture.
         * 
         * @param delegate the Callable to delegate to
         * @throws NullPointerException if delegate is null
         */
        public CallableWithFuture(Callable<V> delegate)
        {
            Preconditions.checkNotNull(delegate, "delegate may not be null");

            this.delegate = delegate;
        }

        /**
         * Associates a Future with the runnable.
         * 
         * @param future a future
         */
        public void setFuture(Future<V> future)
        {
            this.future = future;
        }

        @Override
        public V call() throws Exception
        {
            tasks.remove(future);
            return delegate.call();
        }
    }

    /**
     * A ScheduledFuture that removes its future when canceling.
     * 
     * This allows us to differentiate between tasks canceled by the user and the underlying
     * executor. Tasks canceled by the user are removed from "tasks".
     * 
     * @param <V> The result type returned by this Future
     */
    private class CleaningScheduledFuture<V> implements ScheduledFuture<V>
    {
        private final ScheduledFuture<V> delegate;

        /**
         * Creates a new MyScheduledFuture.
         * 
         * @param delegate the future to delegate to
         * @throws NullPointerException if delegate is null
         */
        public CleaningScheduledFuture(ScheduledFuture<V> delegate)
        {
            Preconditions.checkNotNull(delegate, "delegate may not be null");

            this.delegate = delegate;
        }

        @Override
        public long getDelay(TimeUnit unit)
        {
            return delegate.getDelay(unit);
        }

        @Override
        public int compareTo(Delayed o)
        {
            return delegate.compareTo(o);
        }

        @Override
        public boolean cancel(boolean mayInterruptIfRunning)
        {
            boolean result = delegate.cancel(mayInterruptIfRunning);

            if (result)
            {
                // Tasks canceled by users are removed from "tasks"
                tasks.remove(delegate);
            }
            return result;
        }

        @Override
        public boolean isCancelled()
        {
            return delegate.isCancelled();
        }

        @Override
        public boolean isDone()
        {
            return delegate.isDone();
        }

        @Override
        public V get() throws InterruptedException, ExecutionException
        {
            return delegate.get();
        }

        @Override
        public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException,
            TimeoutException
        {
            return delegate.get(timeout, unit);
        }
    }
}
夜司空 2024-12-02 08:14:40

好问题!不过,看起来您可能需要自己拼凑解决方案。

一种选择可能是使用您自己的 ScheduledExecutorService 实现来包装 ScheduledThreadPoolExecutor。当需要关闭服务时,取消任何可以取消的任务,并将它们发送到将立即执行它们的服务。然后 shutdown() 该服务。

这是一些非常粗略的代码,说明了我的意思,尽管我警告您这里可能存在陷阱,因为它是在几分钟内完成的。特别是,我没有付出太多努力来确保这是线程安全的。

class RunOnShutdownScheduledExecutorService extends AbstractExecutorService implements ScheduledExecutorService {
    private final ScheduledExecutorService delegateService;

    private Map<Future<?>, Runnable> scheduledFutures =
            Collections.synchronizedMap(new IdentityHashMap<Future<?>, Runnable>());


    public RunOnShutdownScheduledExecutorService(ScheduledExecutorService delegateService) {
        this.delegateService = delegateService;
    }

    public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
        ScheduledFuture<?> future = delegateService.schedule(command, delay, unit);
        scheduledFutures.put(future, command);
        return future;
    }

    public void shutdown() {
        delegateService.shutdown();
        ExecutorService immediateService = Executors.newFixedThreadPool(5);
        for (Map.Entry<Future<?>, Runnable> entry : scheduledFutures.entrySet()) {
            Future<?> future = entry.getKey();
            Runnable task = entry.getValue();
            if (!future.isDone()) {
                if (future.cancel(false)) {
                    immediateService.submit(task);
                }
            }
        }
        immediateService.shutdown();
    }

    //...
}

Great question! It looks like you might be on your own patching together a solution though.

One option might be to wrap the ScheduledThreadPoolExecutor with your own implementation of ScheduledExecutorService. When it comes time to shutdown the service, cancel any tasks that can be cancelled and instead send them to a service that will execute them immediately. Then shutdown() that service.

Here is some very rough code that demonstrates what I mean, though I warn you there may be pitfalls in here since it was whipped up in a few minutes. In particular, I haven't gone to much effort to ensure this is threadsafe.

class RunOnShutdownScheduledExecutorService extends AbstractExecutorService implements ScheduledExecutorService {
    private final ScheduledExecutorService delegateService;

    private Map<Future<?>, Runnable> scheduledFutures =
            Collections.synchronizedMap(new IdentityHashMap<Future<?>, Runnable>());


    public RunOnShutdownScheduledExecutorService(ScheduledExecutorService delegateService) {
        this.delegateService = delegateService;
    }

    public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
        ScheduledFuture<?> future = delegateService.schedule(command, delay, unit);
        scheduledFutures.put(future, command);
        return future;
    }

    public void shutdown() {
        delegateService.shutdown();
        ExecutorService immediateService = Executors.newFixedThreadPool(5);
        for (Map.Entry<Future<?>, Runnable> entry : scheduledFutures.entrySet()) {
            Future<?> future = entry.getKey();
            Runnable task = entry.getValue();
            if (!future.isDone()) {
                if (future.cancel(false)) {
                    immediateService.submit(task);
                }
            }
        }
        immediateService.shutdown();
    }

    //...
}
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文