如何使用当前线程处理工作队列?

发布于 2024-11-19 15:47:16 字数 505 浏览 1 评论 0原文

我使用什么数据结构来实现以下逻辑?

  1. read() 是一种对某些工作负载进行排队的异步方法,
  2. 一次只能运行一个工作负载。
  3. 第一个对工作负载进行排队的线程成为工作线程。它在返回之前处理队列上的所有工作。下一个调用 read() 的线程将成为新的工作线程,依此类推...
  4. 如果其他线程在工作线程处理队列时调用 read(),它们只需添加到队列末尾并立即返回。

我知道如何使用 ConcurrentLinkedQueue 和 AtomicBoolean 来实现这一点,但我感觉有更好的方法。

澄清:工作负载由调用另一个名为 read2() 的异步方法组成。 read2() 是异步的,但不是线程安全的。当我说工作线程“处理工作负载”时,它只是触发第一个读取操作并立即返回。当 read2() 完成时,它会调用队列上的下一个操作,依此类推。整个 API 是异步的。因此,我想避免使用专用的消费者线程(没有真正的需要,而且它不利于可扩展性)。

What data structures do I use to implement the following logic?

  1. read() is an asynchronous method that queues some workload
  2. Only one workload may run at a time.
  3. The first thread to queue a workload becomes the worker thread. It processes all work on the queue before returning. The next thread to invoke read() becomes the new worker thread, and so on...
  4. If other threads invoke read() while a worker thread is processing the queue, they simply add to the end of the queue and return immediately.

I know how to implement this using a ConcurrentLinkedQueue and AtomicBoolean but I get the feeling there is a better way.

CLARIFICATION: A workload consists of invoking another asynchronous method called read2(). read2() is asynchronous but is not thread-safe. When I say a worker thread "processes the workload" it simply fires the first read operation and returns right away. When read2() completes, it invokes the next operation on the queue and so on. The entire API is asynchronous. As such, I'd like to avoid a dedicated consumer thread (there's no real need for it and it's bad for scalability).

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(3

嗫嚅 2024-11-26 15:47:16

由于检测处理队列项的线程的确切状态是否存在不确定性,我建议每个线程,无论是用户线程还是某个触发异步回调的内核线程,到达该系统时都会将其工作负载排队,然后尝试处理队列中的所有项目。如果某些工作负载请求发出非线程安全调用,则仅使用 CS/spinner 保护此调用 - 您说这些非线程安全调用无论如何都很简短,因此 CS/spin 锁的成本非常低。

转储 AtomicBoolean。虽然发现它清晰确实意味着没有线程正在处理队列中的工作负载,但发现它已设置并不意味着线程正在处理队列中的项目:存在第三种状态 - '已完成队列中的项目但尚未完全解决还没有清除布尔值'。

平均值,
马丁

Because of the uncertainty of detecting the exact state of a thread processing queue items or no, I suggest that every thread, no matter whether a user thread or some kernel thread firing an async callback, that arrives at this system enqueues its workload and then attempts to process all items in the queue. If some workloads request the issue of a non-threadsafe call, protect this call only with a CS/spinner - you say that these non-threadsafe calls are brief anyway, so a CS/spin lock would cost you very little.

Dump the AtomicBoolean. While finding it clear does mean that no thread is processing the workloads in the queue, finding it set does not mean that a thread is processing items in the queue: there is third state - 'finished with items in the queue but not quite got around to clearing the boolean yet'.

Rgds,
Martin

书间行客 2024-11-26 15:47:16

我认为你的设计可能会破坏软件工程中的一些重要规则,但我现在记不起它的名字了。理想情况下,您应该让一个类负责一件事,因此要求类根据外部状态更改其负责的内容会给您带来一些麻烦。

我建议您采用 生产者/消费者模式 考虑在内。您的生产者线程会将工作添加到队列中,而消费者线程将处理该工作。这确保了生产者线程不必关心何时负责从队列中获取工作以及何时负责将工作放入队列,它们所关心的只是将工作放入队列。您的消费者线程将致力于从队列中获取工作并执行它,这就是它要做的全部事情!

根据您的评论进行更新:制作人不必专心致志,您所做的就是让您的读取回调将工作项排入队列(我认为它类似于回调,但如果我',请纠正我)我错了)。又是这样:

private static BlockingQueue q;
// Asynchronous callback
public void read()
{
    q.enqueue(new WorkItem());
}

消费者可能必须是专用的,但从概念上讲,专用消费者和让多个生产者扮演消费者的独占角色之间几乎没有什么区别:

public class Consumer imlpements Runnable {
    private BlockingQueue _queue;
    public Consumer(BlockingQueue q){
        _queue = q;
    }

    public void run(){
        while(true){
            // block on dequeue until there is work in the queue (i.e. never exits the loop)
            WorkItem w = _queue.dequeue();
            w.DoWork();
        }
    }
}

现在您可以运行 both Consumer 和 Producer(s) 位于线程池 (ExecutorService) 中,它将完全按照您的要求执行操作。请注意,我没有包含线程安全的所有附加功能,例如捕获中断异常和优雅终止,但这是您可以(并且应该)添加的内容。

I think your design might be braking some important rule in software engineering whose name escapes me at the moment. Ideally, you should have one class be responsible for one thing so asking classes to change what they are responsible for depending on an external state is going to cause you some trouble.

I would recommend that you take the Producer/Consumer Patter into consideration. Your Producer thread(s) will add work onto the queue and a Consumer thread would process the work. This ensures that your producer threads don't have to concern themselves with when they are responsible for taking work from the queue and when they are responsible for putting work on the queue, all they care about is putting work on the queue. Your Consumer thread will be dedicated to taking work from the queue and executing it and that's all it would do!

Update base on your comment: the producer doesn't have to be dedicated, all you do is have your read callback enqueue the work item (I assume it's something like a callback, but correct me if I'm wrong). Here it is again:

private static BlockingQueue q;
// Asynchronous callback
public void read()
{
    q.enqueue(new WorkItem());
}

The consumer may have to be dedicated, but conceptually there is very little difference between dedicating a consumer and having multiple producers take an exclusive role as a consumer:

public class Consumer imlpements Runnable {
    private BlockingQueue _queue;
    public Consumer(BlockingQueue q){
        _queue = q;
    }

    public void run(){
        while(true){
            // block on dequeue until there is work in the queue (i.e. never exits the loop)
            WorkItem w = _queue.dequeue();
            w.DoWork();
        }
    }
}

Now you can run both the Consumer and the Producer(s) in a thread pool (ExecutorService) and it will do exactly what you want it to do. Note that I have not included all the bells and whistles of thread safety, such as catching the interrupt exceptions and terminating gracefully, but that's something that you can (and should) add.

谁与争疯 2024-11-26 15:47:16

简单地使 read 成为 synchronized 方法怎么样?

您只需直接调用read,而不是将工作负载(“调用read”)放入同步队列中。您的线程可能需要等待一段时间才能获得同步锁,但正如您所说的 read 执行得非常快,所以这应该不是问题。

(我还不是 100% 确定我理解你的问题,但我认为我理解的是:所有 read 所做的就是在新线程上启动方法 read2 。但是似乎 read 存在一些线程安全问题,因为您不希望(出于无法解释的原因)许多 read 并行执行。)

编辑:我意识到我不知道是否read 是单个共享对象的方法、静态方法或者它存在于许多对象中。如果是后者,仅仅使方法同步并没有多大作用;您需要在调用 read 之前或在 内获得一些全局锁(可能是包含 read 上的锁)阅读

What about simply making read a synchronized method?

You just call read directly instead of putting a workload ("call read") in a synchronized queue. Your threads might have to wait a bit to get the synchronization lock, but as you said read executes very quickly, so it should not be a problem.

(I'm not 100% sure I understand your problem yet, but here is what I think I understand: all read does is start method read2 on a new thread. However it seems that there is some thread-safety issue with read since you do not want (for unexplained reasons) many read to execute in parallel.)

EDIT: I realized I don't know if read is a method of a single shared object, a static method or if it is present in many objects. If it is the latter, just making the method synchronized wouldn't do much good; you would need to get some global lock (maybe a lock on the class that contains read) either before calling read, or within read.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文