如何在执行一定次数后停止计划重复执行的 Runnable

发布于 2024-12-02 14:12:51 字数 2493 浏览 1 评论 0原文

情况

我有一个可运行的。我有一个类,它使用带有 scheduleWithFixedDelay

目标

我想更改此类以安排 Runnable 无限期地进行固定延迟执行,直到它运行一定次数,具体取决于某个参数传递给构造函数。

如果可能的话,我想使用相同的 Runnable,因为它在概念上是应该“运行”的相同事物。

可能的方法

方法#1

有两个 Runnable,一个在多次执行后取消计划(它会记录计数),另一个则不会:

public class MyClass{
    private ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();

    public enum Mode{
        INDEFINITE, FIXED_NO_OF_TIMES
    }

    public MyClass(Mode mode){
        if(mode == Mode.INDEFINITE){
            scheduler.scheduleWithFixedDelay(new DoSomethingTask(), 0, 100, TimeUnit.MILLISECONDS);
        }else if(mode == Mode.FIXED_NO_OF_TIMES){
            scheduler.scheduleWithFixedDelay(new DoSomethingNTimesTask(), 0, 100, TimeUnit.MILLISECONDS);
        }
    }

    private class DoSomethingTask implements Runnable{
        @Override
        public void run(){
            doSomething();
        }
    }

    private class DoSomethingNTimesTask implements Runnable{
        private int count = 0;

        @Override
        public void run(){
            doSomething();
            count++;
            if(count > 42){
                // Cancel the scheduling.
                // Can you do this inside the run method, presumably using
                // the Future returned by the schedule method? Is it a good idea?
            }
        }
    }

    private void doSomething(){
        // do something
    }
}

我宁愿只使用一个 Runnable 来执行 doSomething 方法。将调度与 Runnable 联系起来感觉是错误的。您对此有何看法?

方法#2

使用单个 Runnable 来执行我们想要定期运行的代码。有一个单独的预定可运行对象,用于检查第一个可运行对象已运行多少次,并在达到一定数量时取消。这可能不准确,因为它是异步的。感觉有点麻烦。您对此有何看法?

方法 #3

扩展 ScheduledExecutorService 并添加方法“scheduleWithFixedDelayNTimes”。也许这样的类已经存在了?目前,我正在使用 Executors.newSingleThreadScheduledExecutor(); 来获取 ScheduledExecutorService 实例。我可能必须实现类似的功能来实例化扩展的 ScheduledExecutorService。这可能很棘手。您对此有何看法?

无调度程序方法 [编辑]

我无法使用调度程序。我可以改为:

for(int i = 0; i < numTimesToRun; i++){
    doSomething();
    Thread.sleep(delay);
}

并在某个线程中运行它。你对此有何看法?您仍然可以使用 runnable 并直接调用 run 方法。


欢迎任何建议。我正在寻找一场辩论,以找到实现我的目标的“最佳实践”方式。

Situation

I have a Runnable. I have a class that schedules this Runnable for execution using a ScheduledExecutorService with scheduleWithFixedDelay.

Goal

I want to alter this class to schedule the Runnable for fixed delay execution either indefinitely, or until it has been run a certain number of times, depending on some parameter that is passed in to the constructor.

If possible, I would like to use the same Runnable, as it is conceptually the same thing that should be "run".

Possible approaches

Approach #1

Have two Runnables, one that cancels the schedule after a number of executions (which it keeps a count of) and one that doesn't:

public class MyClass{
    private ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();

    public enum Mode{
        INDEFINITE, FIXED_NO_OF_TIMES
    }

    public MyClass(Mode mode){
        if(mode == Mode.INDEFINITE){
            scheduler.scheduleWithFixedDelay(new DoSomethingTask(), 0, 100, TimeUnit.MILLISECONDS);
        }else if(mode == Mode.FIXED_NO_OF_TIMES){
            scheduler.scheduleWithFixedDelay(new DoSomethingNTimesTask(), 0, 100, TimeUnit.MILLISECONDS);
        }
    }

    private class DoSomethingTask implements Runnable{
        @Override
        public void run(){
            doSomething();
        }
    }

    private class DoSomethingNTimesTask implements Runnable{
        private int count = 0;

        @Override
        public void run(){
            doSomething();
            count++;
            if(count > 42){
                // Cancel the scheduling.
                // Can you do this inside the run method, presumably using
                // the Future returned by the schedule method? Is it a good idea?
            }
        }
    }

    private void doSomething(){
        // do something
    }
}

I would rather just have one Runnable for the execution of the doSomething method. Tying the scheduling to the Runnable feels wrong. What do you think about this?

Approach #2

Have a single Runnable for the execution of the code that we want to run periodically. Have a separate scheduled runnable that checks how many times the first Runnable has run and cancels when it gets to a certain amount. This may not be accurate, as it would be asynchronous. It feels a bit cumbersome. What do you think about this?

Approach #3

Extend ScheduledExecutorService and add a method "scheduleWithFixedDelayNTimes". Perhaps such a class already exists? Currently, I'm using Executors.newSingleThreadScheduledExecutor(); to get my ScheduledExecutorService instance. I would presumably have to implement similar functionality to instantiate the extended ScheduledExecutorService. This could be tricky. What do you think about this?

No scheduler approach [Edit]

I could not use a scheduler. I could instead have something like:

for(int i = 0; i < numTimesToRun; i++){
    doSomething();
    Thread.sleep(delay);
}

And run that in some thread. What do you think of that? You could potentially still use the runnable and call the run method directly.


Any suggestions welcome. I'm looking for a debate to find the "best practice" way of achieving my goal.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(7

假装爱人 2024-12-09 14:12:51

您可以在 Future 上使用 cancel() 方法。来自 scheduleAtFixedRate

Otherwise, the task will only terminate via cancellation or termination of the executor

这里是一些示例代码将 Runnable 包装在另一个 Runnable 中,跟踪原始运行的次数,并在运行 N 次后取消。

public void runNTimes(Runnable task, int maxRunCount, long period, TimeUnit unit, ScheduledExecutorService executor) {
    new FixedExecutionRunnable(task, maxRunCount).runNTimes(executor, period, unit);
}

class FixedExecutionRunnable implements Runnable {
    private final AtomicInteger runCount = new AtomicInteger();
    private final Runnable delegate;
    private volatile ScheduledFuture<?> self;
    private final int maxRunCount;

    public FixedExecutionRunnable(Runnable delegate, int maxRunCount) {
        this.delegate = delegate;
        this.maxRunCount = maxRunCount;
    }

    @Override
    public void run() {
        delegate.run();
        if(runCount.incrementAndGet() == maxRunCount) {
            boolean interrupted = false;
            try {
                while(self == null) {
                    try {
                        Thread.sleep(1);
                    } catch (InterruptedException e) {
                        interrupted = true;
                    }
                }
                self.cancel(false);
            } finally {
                if(interrupted) {
                    Thread.currentThread().interrupt();
                }
            }
        }
    }

    public void runNTimes(ScheduledExecutorService executor, long period, TimeUnit unit) {
        self = executor.scheduleAtFixedRate(this, 0, period, unit);
    }
}

You can use the cancel() method on Future. From the javadocs of scheduleAtFixedRate

Otherwise, the task will only terminate via cancellation or termination of the executor

Here is some example code that wraps a Runnable in another that tracks the number of times the original was run, and cancels after running N times.

public void runNTimes(Runnable task, int maxRunCount, long period, TimeUnit unit, ScheduledExecutorService executor) {
    new FixedExecutionRunnable(task, maxRunCount).runNTimes(executor, period, unit);
}

class FixedExecutionRunnable implements Runnable {
    private final AtomicInteger runCount = new AtomicInteger();
    private final Runnable delegate;
    private volatile ScheduledFuture<?> self;
    private final int maxRunCount;

    public FixedExecutionRunnable(Runnable delegate, int maxRunCount) {
        this.delegate = delegate;
        this.maxRunCount = maxRunCount;
    }

    @Override
    public void run() {
        delegate.run();
        if(runCount.incrementAndGet() == maxRunCount) {
            boolean interrupted = false;
            try {
                while(self == null) {
                    try {
                        Thread.sleep(1);
                    } catch (InterruptedException e) {
                        interrupted = true;
                    }
                }
                self.cancel(false);
            } finally {
                if(interrupted) {
                    Thread.currentThread().interrupt();
                }
            }
        }
    }

    public void runNTimes(ScheduledExecutorService executor, long period, TimeUnit unit) {
        self = executor.scheduleAtFixedRate(this, 0, period, unit);
    }
}
故乡的云 2024-12-09 14:12:51

引用自 API 描述 (ScheduledExecutorService.scheduleWithFixedDelay):

创建并执行一个周期性操作,该操作在给定的初始延迟后首先启用,然后在一次执行终止和下一次执行开始之间具有给定的延迟。 如果任务的任何执行遇到异常,则后续执行将被抑制。否则,任务只会通过取消或终止执行器来终止。

因此,最简单的事情就是“只是抛出异常”(尽管这被认为是不好的做法):

static class MyTask implements Runnable {

    private int runs = 0;

    @Override
    public void run() {
        System.out.println(runs);
        if (++runs >= 20)
            throw new RuntimeException();
    }
}

public static void main(String[] args) {
    ScheduledExecutorService s = Executors.newSingleThreadScheduledExecutor();
    s.scheduleWithFixedDelay(new MyTask(), 0, 100, TimeUnit.MILLISECONDS);
}

Quoted from the API description (ScheduledExecutorService.scheduleWithFixedDelay):

Creates and executes a periodic action that becomes enabled first after the given initial delay, and subsequently with the given delay between the termination of one execution and the commencement of the next. If any execution of the task encounters an exception, subsequent executions are suppressed. Otherwise, the task will only terminate via cancellation or termination of the executor.

So, the easiest thing would be to "just throw an exception" (even though this is considered bad practice):

static class MyTask implements Runnable {

    private int runs = 0;

    @Override
    public void run() {
        System.out.println(runs);
        if (++runs >= 20)
            throw new RuntimeException();
    }
}

public static void main(String[] args) {
    ScheduledExecutorService s = Executors.newSingleThreadScheduledExecutor();
    s.scheduleWithFixedDelay(new MyTask(), 0, 100, TimeUnit.MILLISECONDS);
}
人间不值得 2024-12-09 14:12:51

到目前为止,sbridges 解决方案似乎是最干净的解决方案,除了您提到的之外,它将处理执行次数的责任留给了 Runnable 本身。它不应该关心这个,相反,重复应该是处理调度的类的参数。为了实现这一目标,我建议采用以下设计,为 Runnables 引入一个新的执行器类。该类提供了两种用于调度任务的公共方法,它们是标准的 Runnables,具有有限或无限重复。如果需要,可以传递相同的 Runnable 来进行有限和无限调度(这对于扩展 Runnable 类以提供有限重复的所有建议解决方案都是不可能的)。取消有限重复的处理完全封装在调度程序类中:

class MaxNScheduler
{

  public enum ScheduleType 
  {
     FixedRate, FixedDelay
  }

  private ScheduledExecutorService executorService =
     Executors.newSingleThreadScheduledExecutor();

  public ScheduledFuture<?> scheduleInfinitely(Runnable task, ScheduleType type, 
    long initialDelay, long period, TimeUnit unit)
  {
    return scheduleNTimes(task, -1, type, initialDelay, period, unit);
  }

  /** schedule with count repetitions */
  public ScheduledFuture<?> scheduleNTimes(Runnable task, int repetitions, 
    ScheduleType type, long initialDelay, long period, TimeUnit unit) 
  {
    RunnableWrapper wrapper = new RunnableWrapper(task, repetitions);
    ScheduledFuture<?> future;
    if(type == ScheduleType.FixedDelay)
      future = executorService.scheduleWithFixedDelay(wrapper, 
         initialDelay, period, TimeUnit.MILLISECONDS);
    else
      future = executorService.scheduleAtFixedRate(wrapper, 
         initialDelay, period, TimeUnit.MILLISECONDS);
    synchronized(wrapper)
    {
       wrapper.self = future;
       wrapper.notify(); // notify wrapper that it nows about it's future (pun intended)
    }
    return future;
  }

  private static class RunnableWrapper implements Runnable 
  {
    private final Runnable realRunnable;
    private int repetitions = -1;
    ScheduledFuture<?> self = null;

    RunnableWrapper(Runnable realRunnable, int repetitions) 
    {
      this.realRunnable = realRunnable;
      this.repetitions = repetitions;
    }

    private boolean isInfinite() { return repetitions < 0; }
    private boolean isFinished() { return repetitions == 0; }

    @Override
    public void run()
    {
      if(!isFinished()) // guard for calls to run when it should be cancelled already
      {
        realRunnable.run();

        if(!isInfinite())
        {
          repetitions--;
          if(isFinished())
          {
            synchronized(this) // need to wait until self is actually set
            {
              if(self == null)
              {
                 try { wait(); } catch(Exception e) { /* should not happen... */ }
              }
              self.cancel(false); // cancel gracefully (not throwing InterruptedException)
            }
          }
        }
      }
    }
  }

}

公平地说,管理重复的逻辑仍然是 a Runnable,但它是 < code>Runnable 完全位于 MaxNScheduler 内部,而为调度传递的 Runnable 任务不必关心调度的性质。此外,如果需要,可以通过在每次执行 RunnableWrapper.run 时提供一些回调,轻松地将这个问题转移到调度程序中。这会使代码稍微复杂化,并且需要保留一些 RunnableWrapper 映射和相应的重复,这就是为什么我选择将计数器保留在 RunnableWrapper 类中。

设置 self 时,我还在包装器上添加了一些同步。从理论上讲,这是需要的,当执行完成时, self 可能尚未被分配(这是一个相当理论的场景,但只可能重复 1 次)。

取消会被优雅地处理,不会抛出 InterruptedException,并且如果在执行取消之前安排了另一轮,则 RunnableWrapper 不会调用底层的 Runnable< /代码>。

So far sbridges solution seems to be the cleanest one, except for what you mentioned, that it leaves the responsibility of handling the number of executions to the Runnable itself. It should not be concerned with this, instead the repetitions should be a parameter of the class handling the scheduling. To achieve this, I would suggest the following design, that introduces a new executor class for Runnables. The class provides two public methods for scheduling tasks, which are standard Runnables, with finite or infinite repetition. The same Runnable can be passed for finite and infinite scheduling, if desired (which is not possible with all proposed solutions that extend the Runnable class to provide finite repetitions). The handling of canceling finite repetitions is completely encapsulated in the scheduler class:

class MaxNScheduler
{

  public enum ScheduleType 
  {
     FixedRate, FixedDelay
  }

  private ScheduledExecutorService executorService =
     Executors.newSingleThreadScheduledExecutor();

  public ScheduledFuture<?> scheduleInfinitely(Runnable task, ScheduleType type, 
    long initialDelay, long period, TimeUnit unit)
  {
    return scheduleNTimes(task, -1, type, initialDelay, period, unit);
  }

  /** schedule with count repetitions */
  public ScheduledFuture<?> scheduleNTimes(Runnable task, int repetitions, 
    ScheduleType type, long initialDelay, long period, TimeUnit unit) 
  {
    RunnableWrapper wrapper = new RunnableWrapper(task, repetitions);
    ScheduledFuture<?> future;
    if(type == ScheduleType.FixedDelay)
      future = executorService.scheduleWithFixedDelay(wrapper, 
         initialDelay, period, TimeUnit.MILLISECONDS);
    else
      future = executorService.scheduleAtFixedRate(wrapper, 
         initialDelay, period, TimeUnit.MILLISECONDS);
    synchronized(wrapper)
    {
       wrapper.self = future;
       wrapper.notify(); // notify wrapper that it nows about it's future (pun intended)
    }
    return future;
  }

  private static class RunnableWrapper implements Runnable 
  {
    private final Runnable realRunnable;
    private int repetitions = -1;
    ScheduledFuture<?> self = null;

    RunnableWrapper(Runnable realRunnable, int repetitions) 
    {
      this.realRunnable = realRunnable;
      this.repetitions = repetitions;
    }

    private boolean isInfinite() { return repetitions < 0; }
    private boolean isFinished() { return repetitions == 0; }

    @Override
    public void run()
    {
      if(!isFinished()) // guard for calls to run when it should be cancelled already
      {
        realRunnable.run();

        if(!isInfinite())
        {
          repetitions--;
          if(isFinished())
          {
            synchronized(this) // need to wait until self is actually set
            {
              if(self == null)
              {
                 try { wait(); } catch(Exception e) { /* should not happen... */ }
              }
              self.cancel(false); // cancel gracefully (not throwing InterruptedException)
            }
          }
        }
      }
    }
  }

}

To be fair, the logic of managing the repetitions is still with a Runnable, but it'a a Runnable completely internal to the MaxNScheduler, whereas the Runnable task passed for scheduling has to not concern itself with the nature of the scheduling. Also this concern could be easily moved out into the scheduler if desired, by providing some callback every time RunnableWrapper.run was executed. This would complicate the code slightly and would introduce the need of keeping some map of RunnableWrappers and the corresponding repetitions, which is why I opted for keeping the counters in the RunnableWrapper class.

I also added some synchronization on the wrapper when setting the self. This is needed as theoretically, when the executions finish, self might not have been assigned yet (a quite theoretical scenario, but for only 1 repetition possible).

The cancelling is handled gracefully, without throwing an InterruptedException and in case before the cancel is executed, another round is scheduled, the RunnableWrapper will not call the underlying Runnable.

离去的眼神 2024-12-09 14:12:51

这是我的建议(我相信它可以处理问题中提到的所有情况):

public class RepeatedScheduled implements Runnable {

    private int repeatCounter = -1;
    private boolean infinite;

    private ScheduledExecutorService ses;
    private long initialDelay;
    private long delay;
    private TimeUnit unit;

    private final Runnable command;
    private Future<?> control;

    public RepeatedScheduled(ScheduledExecutorService ses, Runnable command,
        long initialDelay, long delay, TimeUnit unit) {

        this.ses = ses;
        this.initialDelay = initialDelay;
        this.delay = delay;
        this.unit = unit;

        this.command = command;
        this.infinite = true;

    }

    public RepeatedScheduled(ScheduledExecutorService ses, Runnable command,
        long initialDelay, long delay, TimeUnit unit, int maxExecutions) {

        this(ses, command, initialDelay, delay, unit);
        this.repeatCounter = maxExecutions;
        this.infinite = false;

    }

    public Future<?> submit() {

        // We submit this, not the received command
        this.control = this.ses.scheduleWithFixedDelay(this,
            this.initialDelay, this.delay, this.unit);

        return this.control;

    }

    @Override
    public synchronized void run() {

        if ( !this.infinite ) {
            if ( this.repeatCounter > 0 ) {
                this.command.run();
                this.repeatCounter--;
            } else {
                this.control.cancel(false);
            }
        } else {
            this.command.run();
        }

    }

}

此外,它允许外部方停止 submit()Future 中的所有内容> 方法。

用法:

Runnable MyRunnable = ...;
// Repeat 20 times
RepeatedScheduled rs = new RepeatedScheduled(
    MySes, MyRunnable, 33, 44, TimeUnit.SECONDS, 20);
Future<?> MyControl = rs.submit();
...

Here is my suggestion (I believe it handles all cases mentioned in the question):

public class RepeatedScheduled implements Runnable {

    private int repeatCounter = -1;
    private boolean infinite;

    private ScheduledExecutorService ses;
    private long initialDelay;
    private long delay;
    private TimeUnit unit;

    private final Runnable command;
    private Future<?> control;

    public RepeatedScheduled(ScheduledExecutorService ses, Runnable command,
        long initialDelay, long delay, TimeUnit unit) {

        this.ses = ses;
        this.initialDelay = initialDelay;
        this.delay = delay;
        this.unit = unit;

        this.command = command;
        this.infinite = true;

    }

    public RepeatedScheduled(ScheduledExecutorService ses, Runnable command,
        long initialDelay, long delay, TimeUnit unit, int maxExecutions) {

        this(ses, command, initialDelay, delay, unit);
        this.repeatCounter = maxExecutions;
        this.infinite = false;

    }

    public Future<?> submit() {

        // We submit this, not the received command
        this.control = this.ses.scheduleWithFixedDelay(this,
            this.initialDelay, this.delay, this.unit);

        return this.control;

    }

    @Override
    public synchronized void run() {

        if ( !this.infinite ) {
            if ( this.repeatCounter > 0 ) {
                this.command.run();
                this.repeatCounter--;
            } else {
                this.control.cancel(false);
            }
        } else {
            this.command.run();
        }

    }

}

In addition, it allows an external party to stop everything from the Future returned by the submit() method.

Usage:

Runnable MyRunnable = ...;
// Repeat 20 times
RepeatedScheduled rs = new RepeatedScheduled(
    MySes, MyRunnable, 33, 44, TimeUnit.SECONDS, 20);
Future<?> MyControl = rs.submit();
...
最近可好 2024-12-09 14:12:51

对于轮询直到某个超时的用例,我们可以使用 Future.get() 采用更简单的解决方案。

/* Define task */
public class Poll implements Runnable {
    @Override
    public void run() {
        // Polling logic
    }
}

/* Create executor service */
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(5);

/* Schedule task - poll every 500ms */
ScheduledFuture<?> future = executorService.scheduleAtFixedRate(new Poll(), 0, 500, TimeUnit.MILLISECONDS);

/* Wait till 60 sec timeout */
try {
    future.get(60, TimeUnit.SECONDS);
} catch (TimeoutException e) {
    scheduledFuture.cancel(false);
    // Take action on timeout
}

For use cases like polling until a certain timeout, we can approach with a simpler solution using Future.get().

/* Define task */
public class Poll implements Runnable {
    @Override
    public void run() {
        // Polling logic
    }
}

/* Create executor service */
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(5);

/* Schedule task - poll every 500ms */
ScheduledFuture<?> future = executorService.scheduleAtFixedRate(new Poll(), 0, 500, TimeUnit.MILLISECONDS);

/* Wait till 60 sec timeout */
try {
    future.get(60, TimeUnit.SECONDS);
} catch (TimeoutException e) {
    scheduledFuture.cancel(false);
    // Take action on timeout
}
我纯我任性 2024-12-09 14:12:51

你的第一种方法似乎没问题。您可以通过将 mode 对象传递给其构造函数(或传递 -1 作为其必须运行的最大次数)来组合两种类型的可运行对象,并使用此模式来确定是否必须取消可运行对象or not :

private class DoSomethingNTimesTask implements Runnable{
    private int count = 0;
    private final int limit;

    /**
     * Constructor for no limit
     */
    private DoSomethingNTimesTask() {
        this(-1);
    }

    /**
     * Constructor allowing to set a limit
     * @param limit the limit (negative number for no limit)
     */
    private DoSomethingNTimesTask(int limit) {
        this.limit = limit;
    }

    @Override
    public void run(){
        doSomething();
        count++;
        if(limit >= 0 && count > limit){
            // Cancel the scheduling
        }
    }
}

您必须将计划的未来传递给您的任务才能使其自行取消,否则您可能会引发异常。

Your first approach seems OK. You could combine both types of runnables by passing the mode object to its constructor (or pass -1 as the max number of times it must run), and use this mode to determine if the runnable must be canceled or not :

private class DoSomethingNTimesTask implements Runnable{
    private int count = 0;
    private final int limit;

    /**
     * Constructor for no limit
     */
    private DoSomethingNTimesTask() {
        this(-1);
    }

    /**
     * Constructor allowing to set a limit
     * @param limit the limit (negative number for no limit)
     */
    private DoSomethingNTimesTask(int limit) {
        this.limit = limit;
    }

    @Override
    public void run(){
        doSomething();
        count++;
        if(limit >= 0 && count > limit){
            // Cancel the scheduling
        }
    }
}

You'll have to pass the scheduled future to your task in order for it to cancel itself, or you might throw an exception.

断肠人 2024-12-09 14:12:51

我一直在寻找完全相同的功能并选择了 org.springframework.scheduling.Trigger。

以下是完整测试的工作示例(抱歉,如果代码过多)
应用程序上下文.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 xmlns:task="http://www.springframework.org/schema/task"
 xmlns:util="http://www.springframework.org/schema/util"
 xmlns:context="http://www.springframework.org/schema/context"
 xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
        http://www.springframework.org/schema/context/ http://www.springframework.org/schema/context/spring-context.xsd
        http://www.springframework.org/schema/util/ http://www.springframework.org/schema/util/spring-util.xsd
        http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task-3.0.xsd">

    <bean id="blockingTasksScheduler" class="org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler">
        <property name="poolSize" value="10" />
    </bean>

    <task:scheduler id="deftaskScheduler" pool-size="10" />

</beans>

JAVA

package com.alz.springTests.schedulerTest;

import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.temporal.ChronoUnit;
import java.util.Date;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;

import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.Trigger;
import org.springframework.scheduling.TriggerContext;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;

public class ScheduledTest {

    private static ApplicationContext applicationContext;
    private static TaskScheduler taskScheduler;

    private static final class SelfCancelableTask implements Runnable, Trigger {
        Date creationTime = new Date();
        AtomicInteger counter = new AtomicInteger(0);
        private volatile boolean shouldStop = false;
        private int repeatInterval = 3; //seconds

        @Override
        public void run() {
            log("task: run started");

            // simulate "doing job" started
            int sleepTimeMs = ThreadLocalRandom.current().nextInt(500, 2000+1);
            log("will sleep " + sleepTimeMs + " ms");
            try {
                Thread.sleep(sleepTimeMs);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            // "doing job" finished

            int i = counter.incrementAndGet();
            if (i > 5) { //cancel myself
                logErr("Attempts exceeded, will mark as shouldStop");
                shouldStop = true;

            } else {
                log("task: executing cycle #"+i);
            }
        }

        @Override
        public Date nextExecutionTime(TriggerContext triggerContext) {
            log("nextExecutionTime: triggerContext.lastActualExecutionTime() " + triggerContext.lastActualExecutionTime());
            log("nextExecutionTime: triggerContext.lastCompletionTime() " + triggerContext.lastCompletionTime());
            log("nextExecutionTime: triggerContext.lastScheduledExecutionTime() " + triggerContext.lastScheduledExecutionTime());

            if (shouldStop) 
                return null;

            if (triggerContext.lastCompletionTime() == null) {
                LocalDateTime ldt = creationTime.toInstant().atZone(ZoneId.systemDefault()).toLocalDateTime().plus(repeatInterval, ChronoUnit.SECONDS);
                return Date.from(ldt.atZone(ZoneId.systemDefault()).toInstant());
            } else {
                LocalDateTime ldt = triggerContext.lastCompletionTime().toInstant().atZone(ZoneId.systemDefault()).toLocalDateTime().plus(repeatInterval, ChronoUnit.SECONDS);
                return Date.from(ldt.atZone(ZoneId.systemDefault()).toInstant());               
            }

        }

    }

    private static void log(String log) {
        System.out.printf("%s [%s] %s\r\n", LocalDateTime.now(), Thread.currentThread(), log);
    }

    private static void logErr(String log) {
        System.err.printf("%s [%s] %s\r\n", LocalDateTime.now(), Thread.currentThread(), log);
    }

    public static void main(String[] args) {

        log("main: Stated...");

        applicationContext = new ClassPathXmlApplicationContext("applicationContext.xml");

        taskScheduler = (TaskScheduler) applicationContext.getBean("blockingTasksScheduler");

        ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = ((ThreadPoolTaskScheduler)taskScheduler).getScheduledThreadPoolExecutor();

        SelfCancelableTask selfCancelableTask = new SelfCancelableTask();
        taskScheduler.schedule(selfCancelableTask, selfCancelableTask);


        int waitAttempts = 0;
        while (waitAttempts < 30) {
            log("scheduledPool pending tasks: " + scheduledThreadPoolExecutor.getQueue().size());

            try {
                Thread.sleep(1*1000);
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }

            waitAttempts++;

        }

        log("main: Done!");


    }

}

I've been looking for exact same functionality and chose org.springframework.scheduling.Trigger.

Below is full-test working example (sorry if too much flood in code)
applicationContext.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 xmlns:task="http://www.springframework.org/schema/task"
 xmlns:util="http://www.springframework.org/schema/util"
 xmlns:context="http://www.springframework.org/schema/context"
 xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
        http://www.springframework.org/schema/context/ http://www.springframework.org/schema/context/spring-context.xsd
        http://www.springframework.org/schema/util/ http://www.springframework.org/schema/util/spring-util.xsd
        http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task-3.0.xsd">

    <bean id="blockingTasksScheduler" class="org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler">
        <property name="poolSize" value="10" />
    </bean>

    <task:scheduler id="deftaskScheduler" pool-size="10" />

</beans>

JAVA

package com.alz.springTests.schedulerTest;

import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.temporal.ChronoUnit;
import java.util.Date;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;

import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.Trigger;
import org.springframework.scheduling.TriggerContext;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;

public class ScheduledTest {

    private static ApplicationContext applicationContext;
    private static TaskScheduler taskScheduler;

    private static final class SelfCancelableTask implements Runnable, Trigger {
        Date creationTime = new Date();
        AtomicInteger counter = new AtomicInteger(0);
        private volatile boolean shouldStop = false;
        private int repeatInterval = 3; //seconds

        @Override
        public void run() {
            log("task: run started");

            // simulate "doing job" started
            int sleepTimeMs = ThreadLocalRandom.current().nextInt(500, 2000+1);
            log("will sleep " + sleepTimeMs + " ms");
            try {
                Thread.sleep(sleepTimeMs);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            // "doing job" finished

            int i = counter.incrementAndGet();
            if (i > 5) { //cancel myself
                logErr("Attempts exceeded, will mark as shouldStop");
                shouldStop = true;

            } else {
                log("task: executing cycle #"+i);
            }
        }

        @Override
        public Date nextExecutionTime(TriggerContext triggerContext) {
            log("nextExecutionTime: triggerContext.lastActualExecutionTime() " + triggerContext.lastActualExecutionTime());
            log("nextExecutionTime: triggerContext.lastCompletionTime() " + triggerContext.lastCompletionTime());
            log("nextExecutionTime: triggerContext.lastScheduledExecutionTime() " + triggerContext.lastScheduledExecutionTime());

            if (shouldStop) 
                return null;

            if (triggerContext.lastCompletionTime() == null) {
                LocalDateTime ldt = creationTime.toInstant().atZone(ZoneId.systemDefault()).toLocalDateTime().plus(repeatInterval, ChronoUnit.SECONDS);
                return Date.from(ldt.atZone(ZoneId.systemDefault()).toInstant());
            } else {
                LocalDateTime ldt = triggerContext.lastCompletionTime().toInstant().atZone(ZoneId.systemDefault()).toLocalDateTime().plus(repeatInterval, ChronoUnit.SECONDS);
                return Date.from(ldt.atZone(ZoneId.systemDefault()).toInstant());               
            }

        }

    }

    private static void log(String log) {
        System.out.printf("%s [%s] %s\r\n", LocalDateTime.now(), Thread.currentThread(), log);
    }

    private static void logErr(String log) {
        System.err.printf("%s [%s] %s\r\n", LocalDateTime.now(), Thread.currentThread(), log);
    }

    public static void main(String[] args) {

        log("main: Stated...");

        applicationContext = new ClassPathXmlApplicationContext("applicationContext.xml");

        taskScheduler = (TaskScheduler) applicationContext.getBean("blockingTasksScheduler");

        ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = ((ThreadPoolTaskScheduler)taskScheduler).getScheduledThreadPoolExecutor();

        SelfCancelableTask selfCancelableTask = new SelfCancelableTask();
        taskScheduler.schedule(selfCancelableTask, selfCancelableTask);


        int waitAttempts = 0;
        while (waitAttempts < 30) {
            log("scheduledPool pending tasks: " + scheduledThreadPoolExecutor.getQueue().size());

            try {
                Thread.sleep(1*1000);
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }

            waitAttempts++;

        }

        log("main: Done!");


    }

}
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文