带有背压的执行人员服务

发布于 2025-02-05 07:17:08 字数 1983 浏览 3 评论 0 原文

我对 executorService >:

  • 有限的同时处理任务(线程),最好
  • 在所有线程被占用时可配置,随后提交任务获得排队,
  • 队列应应用背压:如果提交了太多任务并且队列已满,则提交应 block
  • 阻止提交可以具有超时:如果在给定时间内未提交任务,则应引发例外。

原因:任务的生产者可能会使执行器服务或队列不堪重负,如果队列中的任务太多,则会增加内存需求。背压将在这种情况下阻止生产者,直到处理能够追赶为止。制作人(一些来自Kafka的阅读,JDBC的其他阅读)可能会有开放的交易,因此提交新任务(如果阻止)也应该能够超时。

作为执行者服务实现,例如 threadpoolexecutor ,请使用 blockingqueue 来加入任务,我希望队列实际上具有这样的功能,可以排队至定义的任务数,然后阻止,直到工作线程从队列中拉出任务为止。相反,事实证明,当队列已满时,提交时用 recondedexecutionException 拒绝其他任务。这当然不是我想要的。

我想到了 executorService 的包装器,该包装使用 smaphore 来控制同时接受多少个任务,这也使我可以在全部时阻止(使用超时)获得了信号量的许可:

public class Pipeline {

    private final ExecutorService executorService;
    private final Semaphore semaphore;

    private final int queueTimeout;
    private final TimeUnit queueTimeoutUnit;

    public Pipeline(int maxConcurrent, int queueTimeout, TimeUnit queueTimeoutUnit) {
        semaphore = new Semaphore(maxConcurrent);
        executorService = Executors.newCachedThreadPool();

        this.queueTimeout = queueTimeout;
        this.queueTimeoutUnit = queueTimeoutUnit;
    }

    public <T> Future<T> submit(Callable<T> task) {
        try {
            boolean acquired = semaphore.tryAcquire(queueTimeout, queueTimeoutUnit);
            if (!acquired) {
                throw new RuntimeException("Timeout accepting task");
            }
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        return executorService.submit(() -> {
            try {
                return task.call();
            } finally {
                semaphore.release();
            }
        });
    }

    public void shutdown() {
        executorService.shutdown();
    }
}

这实际上有效,但似乎是一种常见的用例,可以在Java API中涵盖。我是否缺少一些内置功能?

I've got following requirements to an ExecutorService:

  • limited number of concurrently processing tasks (threads), preferably configurable
  • when all threads are occupied, subsequently submitted tasks get queued.
  • the queue should apply backpressure: if too many tasks are submitted and the queue is full, the submission should block.
  • the blocking submission can have a timeout: if a task is not submitted within a given time, an exception should be thrown.

Reason: the producers of the tasks may overwhelm the executor service or queue, resulting in an increased memory demand if too many tasks are in the queue. The backpressure will block the producers in such a situation, until the processing can catch up. The producers (some reading from Kafka, other reading from JDBC) may have open transactions which can timeout, so the submission of new tasks, if blocking, should be able to time out as well.

As the executor service implementations, such as the ThreadPoolExecutor, use a BlockingQueue to enqueue tasks, I would have expected that the queue would actually have such a feature to only queue up to a defined number of tasks and then blocking until tasks are pulled from the queue by the worker threads. Instead it turned out that when the queue is full, additional tasks are rejected with a RejectedExecutionException on submission. This is certainly not what I want.

I came up with a wrapper for the ExecutorService, which uses a Semaphore to control how many tasks are concurrently accepted, which also allows me to block (with a timeout) when all permits of the Semaphore are acquired:

public class Pipeline {

    private final ExecutorService executorService;
    private final Semaphore semaphore;

    private final int queueTimeout;
    private final TimeUnit queueTimeoutUnit;

    public Pipeline(int maxConcurrent, int queueTimeout, TimeUnit queueTimeoutUnit) {
        semaphore = new Semaphore(maxConcurrent);
        executorService = Executors.newCachedThreadPool();

        this.queueTimeout = queueTimeout;
        this.queueTimeoutUnit = queueTimeoutUnit;
    }

    public <T> Future<T> submit(Callable<T> task) {
        try {
            boolean acquired = semaphore.tryAcquire(queueTimeout, queueTimeoutUnit);
            if (!acquired) {
                throw new RuntimeException("Timeout accepting task");
            }
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        return executorService.submit(() -> {
            try {
                return task.call();
            } finally {
                semaphore.release();
            }
        });
    }

    public void shutdown() {
        executorService.shutdown();
    }
}

This actually works, but seems like a common enough use case that may already be covered in the Java API. Am I missing some built-in functionality for this?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

月光色 2025-02-12 07:17:08

如果您创建 threadpoolexecutor ,并且提供了固定的长度 arrayblockingquequequequequequequequequequequequequequequequequeque 和您自己的自定义 reciendedexecutionhandler 怎么办?

笔记!我本人没有尝试过。这只是一个主意。

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.TimeUnit;

class MyHandler implements RejectedExecutionHandler {
    final BlockingQueue<Runnable> task_queue;
    final long timeout;
    final TimeUnit unit;

    public MyHandler(
        BlockingQueue<Runnable> task_queue,
        long timeout,
        TimeUnit unit
    ) {
        this.task_queue = task_queue;
        this.timeout = timeout;
        this.unit = unit;
    }

    @override
    public void rejectedExecution(
        Runnable task,
        ThreadPoolExecutor pool
    ) {
        boolean timed_out = false;
        try {
            timed_out = ! task_queue.offer(task, timeout, unit);
        }
        catch (Exception ex) {
            ...use your shameful imagination here...
        }
        if (timed_out) {
            throw new RejectedExecutionException("queue is full");
        }
    }
}

然后,创建执行程序:

BlockingQueue<Runnable> task_queue = new ArrayBlockingQueue<>(...);
ExecutorService pool = new ThreadPoolExecutor(
    corePoolSize,  maximumPoolSize,
    keepAliveTime, keepAliveTimeUnit,
    task_queue,
    new MyHandler(task_queue, submitTimeout, submitTimeoutUnit)
);

有点hack-y,因为它会弄乱执行人背后的队列,但是我的期望是,如果代码调用 pool.submit(task)> ,然后将调用处理程序的 recondeDexecution()方法。它将等待指定的时间将任务添加到队列中。

如果成功,那么任务就会添加到队列中,每个人都很高兴。如果失败,则 decredExeCutionException 将为 pool.submit(task)的呼叫者传播,以捕获和处理,但是您认为合适。

What if you create a ThreadPoolExecutor, and you provide a fixed-length ArrayBlockingQueue and your own custom RejectedExecutionHandler?

NOTE! I have not tried this myself. It's just an idea.

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.TimeUnit;

class MyHandler implements RejectedExecutionHandler {
    final BlockingQueue<Runnable> task_queue;
    final long timeout;
    final TimeUnit unit;

    public MyHandler(
        BlockingQueue<Runnable> task_queue,
        long timeout,
        TimeUnit unit
    ) {
        this.task_queue = task_queue;
        this.timeout = timeout;
        this.unit = unit;
    }

    @override
    public void rejectedExecution(
        Runnable task,
        ThreadPoolExecutor pool
    ) {
        boolean timed_out = false;
        try {
            timed_out = ! task_queue.offer(task, timeout, unit);
        }
        catch (Exception ex) {
            ...use your shameful imagination here...
        }
        if (timed_out) {
            throw new RejectedExecutionException("queue is full");
        }
    }
}

Then, create the executor:

BlockingQueue<Runnable> task_queue = new ArrayBlockingQueue<>(...);
ExecutorService pool = new ThreadPoolExecutor(
    corePoolSize,  maximumPoolSize,
    keepAliveTime, keepAliveTimeUnit,
    task_queue,
    new MyHandler(task_queue, submitTimeout, submitTimeoutUnit)
);

It's a bit hack-y because it messes with the queue behind the executor's back, but my expectation is, if the queue is full when your code calls pool.submit(task), then the rejectedExecution() method of the handler will be called. It will wait for the specified amount of time to add the task to the queue.

If it succeeds, then the task is added to the queue, and everybody's happy. If it fails, then the RejectedExecutionException will be propagated back for the caller of pool.submit(task) to catch and handle however you see fit.

花开雨落又逢春i 2025-02-12 07:17:08

这是标准库中缺少的内容,以及为什么LMAX iSPLOR库存在:

我的经验是,它使并发消息队列变得轻而易举 - 根本不需要线程代码。警告是,最低的延迟等待策略需要大量的CPU资源。

This is something that's missing from the standard library and why the LMAX Disruptor library exists:

https://lmax-exchange.github.io/disruptor/

My experience is it makes concurrent message queue a breeze - no threading code required at all. The caveat is that significant cpu resource is required for the lowest-latency wait strategies.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文