带有 isEmpty() 更改通知的队列

发布于 2024-10-15 02:48:07 字数 268 浏览 4 评论 0原文

我在生产者-消费者环境中有一个 BlockingQueue(取自 ScheduledThreadPoolExecutor)。有一个线程将任务添加到队列中,还有一个线程池执行它们。

我需要有关两个事件的通知:

  1. 第一个项目添加到空队列
  2. 最后一个项目从队列中删除

通知 = 将消息写入数据库。

有什么明智的方法来实现这一点吗?

I have an BlockingQueue<Runnable>(taken from ScheduledThreadPoolExecutor) in producer-consumer environment. There is one thread adding tasks to the queue, and a thread pool executing them.

I need notifications on two events:

  1. First item added to empty queue
  2. Last item removed from queue

Notification = writing a message to database.

Is there any sensible way to implement that?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

把回忆走一遍 2024-10-22 02:48:07

一种简单而幼稚的方法是使用一个实现来装饰您的 BlockingQueue,该实现只需检查底层队列,然后发布一个任务来执行通知。

NotifyingQueue<T> extends ForwardingBlockingQueue<T> implements BlockingQueue<T> {
  private final Notifier notifier; // injected not null

  …

  @Override public void put(T element) {
    if (getDelegate().isEmpty()) {
      notifier.notEmptyAnymore();
    }
    super.put(element);
  }

  @Override public T poll() {
    final T result = super.poll();
    if ((result != null) && getDelegate().isEmpty())
      notifier.nowEmpty();
  }
  … etc
}

但这种方法有几个问题。而空-> notEmpty 非常简单 - 特别是对于单个生产者的情况,两个消费者很容易同时运行并且都看到队列从非空 -> 。空的。

但是,如果您想要的只是通知队列在某个时间变空,那么只要您的通知者是您的状态机,跟踪空和非空并在何时通知就足够了它从一个变为另一个:

AtomicStateNotifier implements Notifier {
  private final AtomicBoolean empty = new AtomicBoolean(true); // assume it starts empty
  private final Notifier delegate; // injected not null

  public void notEmptyAnymore() {
    if (empty.get() && empty.compareAndSet(true, false))
      delegate.notEmptyAnymore();
  }

  public void nowEmpty() {
    if (!empty.get() && empty.compareAndSet(false, true))
      delegate.nowEmpty();
  }
}

现在这是围绕实际通知程序实现的线程安全防护,可能将任务发送到执行程序以将事件异步写入数据库。

A simple and naïve approach would be to decorate your BlockingQueue with an implementation that simply checks the underlying queue and then posts a task to do the notification.

NotifyingQueue<T> extends ForwardingBlockingQueue<T> implements BlockingQueue<T> {
  private final Notifier notifier; // injected not null

  …

  @Override public void put(T element) {
    if (getDelegate().isEmpty()) {
      notifier.notEmptyAnymore();
    }
    super.put(element);
  }

  @Override public T poll() {
    final T result = super.poll();
    if ((result != null) && getDelegate().isEmpty())
      notifier.nowEmpty();
  }
  … etc
}

This approach though has a couple of problems. While the empty -> notEmpty is pretty straightforward – particularly for a single producer case, it would be easy for two consumers to run concurrently and both see the queue go from non-empty -> empty.

If though, all you want is to be notified that the queue became empty at some time, then this will be enough as long as your notifier is your state machine, tracking emptiness and non-emptiness and notifying when it changes from one to the other:

AtomicStateNotifier implements Notifier {
  private final AtomicBoolean empty = new AtomicBoolean(true); // assume it starts empty
  private final Notifier delegate; // injected not null

  public void notEmptyAnymore() {
    if (empty.get() && empty.compareAndSet(true, false))
      delegate.notEmptyAnymore();
  }

  public void nowEmpty() {
    if (!empty.get() && empty.compareAndSet(false, true))
      delegate.nowEmpty();
  }
}

This is now a thread-safe guard around an actual Notifier implementation that perhaps posts tasks to an Executor to asynchronously write the events to the database.

雪若未夕 2024-10-22 02:48:07

该设计很可能有缺陷,但您可以相对简单地做到这一点:
您有一个单线程添加,因此您可以在添加之前进行检查。即 pool.getQueue().isEmpty() - 只有一个生产者,这是安全的。

无法保证最后删除的项目,但您可以覆盖 beforeExecute 并再次检查队列。在 isEmpty() 返回 true 后可能会有一个小超时。也许下面的代码在 afterExecute 中执行会更好。

protected void beforeExecute(Thread t, Runnable r) { 
  if (getQueue().isEmpty()){
    try{
      Runnable r = getQueue().poll(200, TimeUnit.MILLISECONDS);
      if (r!=null){ 
        execute(r);
      }  else{
        //last message - or on after execute by Setting a threadLocal and check it there
        //alternatively you may need to do so ONLY in after execute, depending on your needs
     }
    }catch(InterruptedException _ie){
      Thread.currentThread().interrupt();
    }
  }
}

有时

我可以解释为什么使用队列本身进行通知不能很好地工作:假设您添加一个要由池执行的任务,该任务立即计划,队列再次为空,您将需要通知。

The design is most likely flawed but you can do it relatively simple:
You have a single thread adding, so you can check before adding. i.e. pool.getQueue().isEmpty() - w/ one producer, this is safe.

Last item removed cannot be guaranteed but you can override beforeExecute and check the queue again. Possibly w/ a small timeout after isEmpty() returns true. Probably the code below will be better off executed in afterExecute instead.

protected void beforeExecute(Thread t, Runnable r) { 
  if (getQueue().isEmpty()){
    try{
      Runnable r = getQueue().poll(200, TimeUnit.MILLISECONDS);
      if (r!=null){ 
        execute(r);
      }  else{
        //last message - or on after execute by Setting a threadLocal and check it there
        //alternatively you may need to do so ONLY in after execute, depending on your needs
     }
    }catch(InterruptedException _ie){
      Thread.currentThread().interrupt();
    }
  }
}

sometime like that

I can explain why doing notifications w/ the queue itself won't work well: imagine you add a task to be executed by the pool, the task is scheduled immediately, the queue is empty again and you will need notification.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文