将给定 ID 的任务绑定到同一线程的线程池

发布于 2024-12-16 10:41:57 字数 851 浏览 0 评论 0原文

是否有线程池(Java 中)的实现可以确保同一逻辑 ID 的所有任务都在同一线程上执行?

我所遵循的逻辑是,如果给定逻辑 ID 的特定线程上已经执行了一个任务,则具有相同 ID 的新任务将被安排在同一线程上。如果没有线程执行相同 ID 的任务,则可以使用任何线程。

这将允许不相关 ID 的任务并行执行,但相同 ID 的任务按照提交的顺序串行执行。

如果没有,是否有关于如何扩展 ThreadPoolExecutor 来获得此行为的建议(如果可能的话)?

更新

经过更长时间的思考,我实际上并不要求相同逻辑 ID 的任务在同一线程上执行,只是它们不会同时执行。

一个例子是为客户处理订单的系统,可以同时处理多个订单,但不能为同一客户处理(同一客户的所有订单都必须按顺序处理)。

我目前采用的方法是使用标准 ThreadPoolExecutor,带有自定义的 BlockingQueue,并使用自定义包装器包装 Runnable。 Runnable 包装器逻辑是:

  1. 以原子方式尝试将 ID 添加到并发“运行”集 (ConcurrentHashMap),以查看当前是否正在运行相同 ID 的任务
    • 如果添加失败,则将任务推回到队列前面并立即返回
    • 如果成功,继续
  2. 运行任务
  3. 从“运行”集合中删除任务关联的ID

队列的poll()< /code> 方法则仅返回 ID 当前不在“正在运行”集中的任务。

这样做的问题是,我确信会有很多我没有考虑到的极端情况,因此需要大量测试。

Are there any implementations of a thread pool (in Java) that ensures all tasks for the same logical ID are executed on the same thread?

The logic I'm after is if there is already a task being executed on a specific thread for a given logical ID, then new tasks with the same ID are scheduled on the same thread. If there are no threads executing a task for the same ID then any thread can be used.

This would allow tasks for unrelated IDs to be executed in parallel, but tasks for the same ID to be executed in serial and in the order submitted.

If not, are there any suggestions on how I might extend ThreadPoolExecutor to get this behaviour (if that's even possible)?

UPDATE

Having spent longer thinking about this, I don't actually require that tasks for the same logical ID get executed on the same thread, just that they don't get executed at the same time.

An example for this would be a system that processed orders for customers, where it was OK to process multiple orders at the same time, but not for the same customer (and all orders for the same customer had to be processed in order).

The approach I'm taking at the moment is to use a standard ThreadPoolExecutor, with a customised BlockingQueue and also wrapping the Runnable with a custom wrapper. The Runnable wrapper logic is:

  1. Atomically attempt to add ID to concurrent 'running' set (ConcurrentHashMap) to see if a task for the same ID is currently running
    • if add fails, push the task back on to the front of the queue and return immediately
    • if succeeeds, carry on
  2. Run the task
  3. Remove the task's associated ID from the 'running' set

The queue's poll() methods then only return tasks that have an ID that is not currently in the 'running' set.

The trouble with this is that I'm sure there are going to be a lot of corner cases that I haven't thought about, so it's going to require a lot of testing.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(6

孤独患者 2024-12-23 10:41:57

创建一组执行程序服务,每个执行程序服务运行一个线程,并通过项目 ID 的哈希码将队列条目分配给它们。该数组可以是任意大小,具体取决于您最多要使用多少个线程。

这将限制我们可以从执行程序服务中使用,但仍然允许使用其功能在不再需要时关闭唯一的线程(使用allowCoreThreadTimeOut(true))并根据需要重新启动它。此外,所有排队的东西都可以工作而无需重写。

Create an array of executor services running one thread each and assign your queue entries to them by the hash code of your item id. The array can be of any size, depending on how many threads at most do you want to use.

This will restrict that we can use from the executor service but still allows to use its capability to shut down the only thread when no longer needed (with allowCoreThreadTimeOut(true)) and restart it as required. Also, all queuing stuff will work without rewriting it.

痞味浪人 2024-12-23 10:41:57

最简单的想法可能是这样的:

拥有一个固定的 BlockingQueue 映射。使用哈希机制根据任务 ID 选择队列。哈希算法应该为相同的 id 选择相同的队列。为每个队列启动一个线程。每个线程都会从它自己的专用队列中选择一个任务并执行它。

ps 适当的解决方案很大程度上取决于您分配给线程的工作类型

UPDATE

好吧,这个疯狂的想法怎么样,请耐心等待:)

比如说,我们有一个 ConcurrentHashMap 包含引用 id -> OrderQueue

ID1->Q1, ID2->Q2, ID3->Q3, ...

意味着现在每个 id 都与其自己的队列相关联。 OrderQueue 是一个自定义阻塞队列,带有附加布尔标志 - isAssociatedWithWorkingThread

还有一个常规的 BlockingQueue,我们现在将其称为 amortizationQueue,稍后您将看到它的使用。

接下来,我们有 N 个工作线程。每个工作线程都有自己的工作队列,它是一个包含与该线程关联的 ID 的 BlockingQueue。

当新的 id 出现时,我们执行以下操作:

create a new OrderQueue(isAssociatedWithWorkingThread=false)
put the task to the queue
put id->OrderQueue to the map
put this OrderQueue to amortizationQueue

当现有 id 的更新出现时,我们执行以下操作:

pick OrderQueue from the map
put the task to the queue
if isAssociatedWithWorkingThread == false
    put this OrderQueue to amortizationQueue

每个工作线程执行以下操作:

take next id from the working queue
take the OrderQueue associated with this id from the map
take all tasks from this queue
execute them
mark isAssociatedWithWorkingThread=false for this OrderQueue
put this OrderQueue to amortizationQueue

非常简单。现在到了有趣的部分 - 工作窃取:)

如果在某个时间点某个工作线程发现自己的工作队列为空,那么它会执行以下操作:

go to the pool of all working threads
pick one (say, one with the longest working queue)
steal id from *the tail* of that thread's working queue
put this id to it's own working queue
continue with regular execution

并且还有 +1 个额外线程提供摊销工作:

while (true)
    take next OrderQueue from amortizationQueue
    if queue is not empty and isAssociatedWithWorkingThread == false
         set isAssociatedWithWorkingThread=true
         pick any working thread and add the id to it's working queue

将不得不花费更多时间考虑是否可以使用 AtomicBoolean 作为 isAssociatedWithWorkingThread 标志,或者需要使其阻塞操作来检查/更改此标志。

The simplest idea could be this:

Have a fixed map of BlockingQueues. Use hash mechanism to pick a queue based on task id. The hash algorithm should pick the same queue for the same ids. Start one single thread for every queue. every thread will pick one task from it's own dedicated queue and execute it.

p.s. the appropriate solution is strongly depends on the type of work you assign to threads

UPDATE

Ok, how about this crazy idea, please bear with me :)

Say, we have a ConcurrentHashMap which holds references id -> OrderQueue

ID1->Q1, ID2->Q2, ID3->Q3, ...

Meaning that now every id is associated with it's own queue. OrderQueue is a custom blocking-queue with an additional boolean flag - isAssociatedWithWorkingThread.

There is also a regular BlockingQueue which we will call amortizationQueue for now, you'll see it's use later.

Next, we have N working threads. Every working thread has it's own working queue which is a BlockingQueue containing ids associated with this thread.

When a new id comes, we do the following:

create a new OrderQueue(isAssociatedWithWorkingThread=false)
put the task to the queue
put id->OrderQueue to the map
put this OrderQueue to amortizationQueue

When an update for existing id comes we do the following:

pick OrderQueue from the map
put the task to the queue
if isAssociatedWithWorkingThread == false
    put this OrderQueue to amortizationQueue

Every working thread does the following:

take next id from the working queue
take the OrderQueue associated with this id from the map
take all tasks from this queue
execute them
mark isAssociatedWithWorkingThread=false for this OrderQueue
put this OrderQueue to amortizationQueue

Pretty straightforward. Now to the fun part - work stealing :)

If at some point of time some working thread finds itself with empty working queue, then it does the following:

go to the pool of all working threads
pick one (say, one with the longest working queue)
steal id from *the tail* of that thread's working queue
put this id to it's own working queue
continue with regular execution

And there also +1 additional thread which provides amortization work:

while (true)
    take next OrderQueue from amortizationQueue
    if queue is not empty and isAssociatedWithWorkingThread == false
         set isAssociatedWithWorkingThread=true
         pick any working thread and add the id to it's working queue

Will have to spend more time thinking if you can get away with AtomicBoolean for isAssociatedWithWorkingThread flag or there is a need to make it blocking operation to check/change this flag.

醉生梦死 2024-12-23 10:41:57

我最近不得不处理类似的情况。

我最终得到了与你类似的设计。唯一的区别是“当前”是一个映射而不是一个集合:从 ID 到 Runnables 队列的映射。当任务的可运行对象的包装器发现其 ID 存在于映射中时,它会将任务的可运行对象添加到 ID 的队列中并立即返回。否则,ID 将被添加到具有空队列的映射中并执行任务。

任务完成后,包装器会再次检查 ID 的队列。如果队列不为空,则选择可运行的。否则它就会从地图上删除,我们就完成了。

我将把关闭和取消作为练习留给读者:)

I had to deal with a similar situation recently.

I ended up with a design similar to yours. The only difference was that the "current" was a map rather than a set: a map from ID to a queue of Runnables. When the wrapper around task's runnable sees that its ID is present in the map it adds the task's runnable to the ID's queue and returns immediately. Otherwise the ID is added to the map with empty queue and the task is executed.

When the task is done, the wrapper checks the ID's queue again. If the queue is not empty, the runnable is picked. Otherwise it's removed from the map and we're done.

I'll leave shutdown and cancelation as an exercise to the reader :)

迷途知返 2024-12-23 10:41:57

我们的方法与原始问题的更新中的方法类似。我们有一个可运行的包装类,其中包含一个队列(LinkedTransferQueue),我们将其称为 RunnableQueue。 Runnable 队列具有以下基本 API:

public class RunnableQueue implements Runnable
{
  public RunnableQueue(String name, Executor executor);
  public void run();

  public void execute(Runnable runnable);
}

当用户通过执行调用提交第一个 Runnable 时,RunnableQueue 将自身排队到执行器上。后续对执行的调用会在 RunnableQueue 内的队列中排队。当可运行队列被 ThreadPool 执行(通过其 run 方法)时,它开始通过逐个串行执行可运行队列来“耗尽”内部队列。如果在执行时对 RunnableQueue 调用execute,则新的可运行对象将简单地附加到内部队列中。一旦队列被清空,可运行队列的 run 方法就会完成并“离开”执行器池。重复冲洗。

我们还有其他优化,例如在 RunnableQueue 将自身重新发布到执行程序池之前只让一定数量的可运行程序运行(例如四个)。

里面唯一真正棘手的一点(而且并不那么难)是在它被发布到执行器或不发布到执行器时进行同步,这样它就不会重新发布,或者错过它应该发布的时间。

总的来说,我们发现这个方法运行得很好。我们的“ID”(语义上下文)是可运行队列。我们的需求(即插件)引用了 RunnableQueue 而不是执行器池,因此它被迫通过 RunnableQueue 专门工作。这不仅保证所有访问都是连续的(线程限制),而且让 RunnableQueue “调节”插件的作业加载。此外,它不需要集中管理结构或其他争论点。

Our approach is similar to what is in the update of the original question. We have a wrapper class that is a runnable that contains a queue (LinkedTransferQueue) which we call a RunnableQueue. The runnable queue has the basic API of:

public class RunnableQueue implements Runnable
{
  public RunnableQueue(String name, Executor executor);
  public void run();

  public void execute(Runnable runnable);
}

When the user submits the first Runnable via the execute call the RunnableQueue enqueues itself on the executor. Subsequent calls to execute get queued up on the queue inside the RunnableQueue. When the runnable queue get executed by the ThreadPool (via its run method) it starts to "drain" the internal queue by serially executing the runnables one by one. If execute is called on the RunnableQueue while it is executing, the new runnables simply get appended to the internal queue. Once the queue is drained, the run method of the runnable queue completes and it "leaves" the executor pool. Rinse repeat.

We have other optimizations that do things like only let some number of runnables run (e.g. four) before the RunnableQueue re-posts itself to the executor pool.

The only really tricky bit inside and it isn't that hard) is to synchronize around when it is posted to the executor or not so that it doesn't repost, or miss when it should post.

Overall we find this to work pretty well. The "ID" (semantic context) for us is the runnable queue. The need we have (i.e. a plugin) has a reference to the RunnableQueue and not the executor pool so it is forced to work exclusively through the RunnableQueue. This not only guarantees all accesses are serially sequence (thread confinement) but lets the RunnableQueue "moderate" the plugin's job loading. Additionally, it requires no centralized management structure or other points of contention.

冷心人i 2024-12-23 10:41:57

我必须实现一个类似的解决方案,并且通过 h22 创建一组执行器服务的建议对我来说似乎是最好的方法,但需要注意的是,我将采用 ID 的模数 % (或者是原始数据) ID(假设它是 long/int 或哈希码)相对于某些所需的最大大小,并使用该结果作为新 ID,这样我就可以在最终获得太多执行程序服务对象同时仍然获得大量执行程序服务对象之间取得平衡中的并发数 加工。

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ExecutorServiceRouter {

    private List<ExecutorService> services;
    private int size;

    public ExecutorServiceRouter(int size) {
        services = new ArrayList<ExecutorService>(size);
        this.size = size;
        for (int i = 0; i < size; i++) {
            services.add(Executors.newSingleThreadExecutor());
        }
    }

    public void route(long id, Runnable r) {
        services.get((int) (id % size)).execute(r);
    }

    public void shutdown() {
        for (ExecutorService service : services) {
            service.shutdown();
        }
    }

}

I have to implement a similar solution and the suggestion of creating an array of executor services by h22 seems the best approach to me with one caveat that I will be taking the modulus % of the ID (either the raw ID assuming it is long/int or the hash code) relative to some desired max size and using that result as the new ID so that way I can have a balance between ending up with way too many executor service objects while still getting a good amount of concurrency in the processing.

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ExecutorServiceRouter {

    private List<ExecutorService> services;
    private int size;

    public ExecutorServiceRouter(int size) {
        services = new ArrayList<ExecutorService>(size);
        this.size = size;
        for (int i = 0; i < size; i++) {
            services.add(Executors.newSingleThreadExecutor());
        }
    }

    public void route(long id, Runnable r) {
        services.get((int) (id % size)).execute(r);
    }

    public void shutdown() {
        for (ExecutorService service : services) {
            service.shutdown();
        }
    }

}
睫毛上残留的泪 2024-12-23 10:41:57

扩展 ThreadPoolExecutor 会非常困难。我建议你采用生产者-消费者系统。这就是我的建议。

  1. 您可以创建典型的生产者消费者系统。查看此问题中提到的代码。
  2. 现在,每个系统都会有一个队列和一个单一消费者线程,它将串行处理队列中的任务。
  3. 现在,创建一个此类单独系统的池
  4. 当您提交相关 ID 的任务时,请查看是否已经有一个为该相关 ID 标记的系统当前正在处理该任务,如果是,则提交任务,
  5. 如果它没有处理任何任务,则用这个新的相关 ID 标记该系统ID并提交任务。
  6. 这样,单个系统将只满足一个逻辑相关的 ID。

在这里,我假设相关 ID 是单个 ID 的逻辑集合,并且将为相关 ID 而不是单个 ID 创建生产者消费者系统。

Extending ThreadPoolExecutor would be quite difficult. I would suggest you to go for a producer-consumer system. Here is what I am suggesting.

  1. You can create typical producer consumer systems . Check out the code mentioned in this question.
  2. Now each of these system will have a queue and a Single Consumer thread,which will process the tasks in the queue serially
  3. Now, create a pool of such individual systems.
  4. When you submit a task for a related ID , see if there is already a system marked for that related ID which is currently processing the tasks, if yes then submit the tasks,
  5. If its not processing any tasks then mark that system with this new related ID and submit the task.
  6. This way a single system will cater only for one logical related IDs .

Here I am assuming that a related ID is logical bunch of individual IDs and the producer consumer systems will be created for related IDs and NOT individual IDs.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文