计算地图:提前计算价值

发布于 2024-09-08 15:58:20 字数 1086 浏览 9 评论 0原文

我有一个 计算地图(使用 软值),我用它来缓存昂贵的计算结果。

现在我遇到一种情况,我知道某个特定的键可能会在接下来的几秒钟内被查找。该密钥的计算成本也比大多数密钥更高。

我想在最小优先级线程中提前计算该值,以便最终请求该值时它已经被缓存,从而提高响应时间。

执行此操作的好方法是什么:

  1. 我可以控制执行计算的线程(特别是其优先级)。
  2. 避免了重复工作,即计算只进行一次。如果计算任务已经在运行,则调用线程将等待该任务,而不是再次计算值(FutureTask 实现了这一点。对于 Guava 的计算映射,如果您仅调用 getget,则这是正确的code> 但如果将其与对 put 的调用混合使用则不会。)
  3. “提前计算值”方法是异步且幂等的。如果计算已经在进行中,它应该立即返回,而无需等待该计算完成。
  4. 避免优先级倒置,例如,如果高优先级线程请求值,而中优先级线程正在执行不相关的操作,但计算任务在低优先级线程上排队,则高优先级线程不得处于饥饿状态。也许这可以通过暂时提高计算线程的优先级和/或在调用线程上运行计算来实现。

如何在所有涉及的线程之间进行协调?


其他信息
我的应用程序中的计算是图像过滤操作,这意味着它们都是 CPU 密集型的。这些操作包括仿射变换(范围从 50μs 到 1ms)和卷积(最多 10ms)。当然,不同线程优先级的有效性取决于操作系统抢占较大任务的能力。

I have a computing map (with soft values) that I am using to cache the results of an expensive computation.

Now I have a situation where I know that a particular key is likely to be looked up within the next few seconds. That key is also more expensive to compute than most.

I would like to compute the value in advance, in a minimum-priority thread, so that when the value is eventually requested it will already be cached, improving the response time.

What is a good way to do this such that:

  1. I have control over the thread (specifically its priority) in which the computation is performed.
  2. Duplicate work is avoided, i.e. the computation is only done once. If the computation task is already running then the calling thread waits for that task instead of computing the value again (FutureTask implements this. With Guava's computing maps this is true if you only call get but not if you mix it with calls to put.)
  3. The "compute value in advance" method is asynchronous and idempotent. If a computation is already in progress it should return immediately without waiting for that computation to finish.
  4. Avoid priority inversion, e.g. if a high-priority thread requests the value while a medium-priority thread is doing something unrelated but the the computation task is queued on a low-priority thread, the high-priority thread must not be starved. Maybe this could be achieved by temporarily boosting the priority of the computing thread(s) and/or running the computation on the calling thread.

How could this be coordinated between all the threads involved?


Additional info
The computations in my application are image filtering operations, which means they are all CPU-bound. These operations include affine transforms (ranging from 50µs to 1ms) and convolutions (up to 10ms.) Of course the effectiveness of varying thread priorities depends on the ability of the OS to preempt the larger tasks.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(4

享受孤独 2024-09-15 15:58:21

协调这种情况的一种常见方法是使用一个值为 FutureTask 对象的映射。因此,以我从我的网络服务器编写的一些代码为例,其基本思想是,对于给定的参数,我们查看是否已经存在一个 FutureTask (意味着该参数的计算已经被安排),并且如果是的话,我们等待。在这个例子中,我们以其他方式安排查找,但是如果需要的话,可以在其他地方通过单独的调用来完成:

  private final ConcurrentMap<WordLookupJob, Future<CharSequence>> cache = ...

  private Future<CharSequence> getOrScheduleLookup(final WordLookupJob word) {
    Future<CharSequence> f = cache.get(word);
    if (f == null) {
      Callable<CharSequence> ex = new Callable<CharSequence>() {
        public CharSequence call() throws Exception {
          return doCalculation(word);
        }
      };
      Future<CharSequence> ft = executor.submit(ex);
      f = cache.putIfAbsent(word, ft);
      if (f != null) {
        // somebody slipped in with the same word -- cancel the
        // lookup we've just started and return the previous one
        ft.cancel(true);
      } else {
        f = ft;
      }
    }
    return f;
  }

就线程优先级而言:我想知道这是否会实现您认为的效果?我不太明白您关于将查找优先级提高到等待线程之上的观点:如果线程正在等待,那么它正在等待,无论其他线程的相对优先级如何......(您可能想看看一些我写过关于线程优先级线程调度,但长话短说,我不确定改变优先级一定会买你所期待的。)

One common way of coordinating this type of situation is to have a map whose values are FutureTask objects. So, stealing as an example some code I wrote from a web server of mine, the essential idea is that for a given parameter, we see if there is already a FutureTask (meaning that the calculation with that parameter has already been scheduled), and if so we wait for it. In this example, we otherwise schedule the lookup, but that could be done elsewhere with a separate call if that was desirable:

  private final ConcurrentMap<WordLookupJob, Future<CharSequence>> cache = ...

  private Future<CharSequence> getOrScheduleLookup(final WordLookupJob word) {
    Future<CharSequence> f = cache.get(word);
    if (f == null) {
      Callable<CharSequence> ex = new Callable<CharSequence>() {
        public CharSequence call() throws Exception {
          return doCalculation(word);
        }
      };
      Future<CharSequence> ft = executor.submit(ex);
      f = cache.putIfAbsent(word, ft);
      if (f != null) {
        // somebody slipped in with the same word -- cancel the
        // lookup we've just started and return the previous one
        ft.cancel(true);
      } else {
        f = ft;
      }
    }
    return f;
  }

In terms of thread priorities: I wonder if this will achieve what you think it will? I don't quite understand your point about raising the priority of the lookup above the waiting thread: if the thread is waiting, then it's waiting, whatever the relative priorities of other threads... (You might want to have a look at some articles I've written on thread priorities and thread scheduling, but to cut a long story short, I'm not sure that changing the priority will necessarily buy you what you're expecting.)

嘿哥们儿 2024-09-15 15:58:21

我怀疑您通过关注线程优先级而走上了错误的道路。通常,由于 I/O(内存不足数据)与 CPU 限制(逻辑计算)相比,缓存保存的数据的计算成本较高。如果您通过预取来猜测用户未来的操作,例如查看未读的电子邮件,那么这向我表明您的工作可能受 I/O 限制。这意味着只要不发生线程饥饿(调度程序不允许这种情况),使用线程优先级玩游戏就不会带来太多性能改进。

如果成本是 I/O 调用,那么后台线程将被阻塞,等待数据到达,并且处理该数据应该相当便宜(例如反序列化)。由于线程优先级的更改不会提供太多加速,因此在后台线程池上异步执行工作应该足够了。如果缓存未命中惩罚太高,则使用多层缓存往往有助于进一步减少用户感知的延迟。

I suspect that you are heading down the wrong path by focusing on thread priorities. Usually the data that a cache holds is expensive to compute due I/O (out-of-memory data) vs. CPU bound (logic computation). If you're prefetching to guess a user's future action, such as looking at unread emails, then it indicates to me that your work is likely I/O bound. This means that as long as thread starvation does not occur (which schedulers disallow), playing games with thread priority won't offer much of a performance improvement.

If the cost is an I/O call then the background thread is blocked waiting for the data to arrive and processing that data should be fairly cheap (e.g. deserialization). As the change in thread priority won't offer much of a speed-up, performing the work asynchronously on background threadpool should be sufficient. If the cache miss penalty is too high, then using multiple layers of caching tends to help to further reduce the user perceived latency.

╰ゝ天使的微笑 2024-09-15 15:58:21

作为线程优先级的替代方案,您可以仅在没有高优先级任务正在进行时才执行低优先级任务。这是一个简单的方法:

AtomicInteger highPriorityCount = new AtomicInteger();

void highPriorityTask() {
  highPriorityCount.incrementAndGet();
  try {
    highPriorityImpl();
  } finally {
    highPriorityCount.decrementAndGet();  
  }
}

void lowPriorityTask() {
  if (highPriorityCount.get() == 0) {
    lowPriorityImpl();
  }
}

在您的用例中,两个 Impl() 方法都会在计算映射上调用 get(),在同一线程中调用 highPriorityImpl(),在不同线程中调用 lowPriorityImpl()。

您可以编写一个更复杂的版本,将低优先级任务推迟到高优先级任务完成并限制并发低优先级任务的数量。

As an alternative to thread priorities, you could perform a low-priority task only if no high-priority tasks are in progress. Here's a simple way to do that:

AtomicInteger highPriorityCount = new AtomicInteger();

void highPriorityTask() {
  highPriorityCount.incrementAndGet();
  try {
    highPriorityImpl();
  } finally {
    highPriorityCount.decrementAndGet();  
  }
}

void lowPriorityTask() {
  if (highPriorityCount.get() == 0) {
    lowPriorityImpl();
  }
}

In your use case, both Impl() methods would call get() on the computing map, highPriorityImpl() in the same thread and lowPriorityImpl() in a different thread.

You could write a more sophisticated version that defers low-priority tasks until the high-priority tasks complete and limits the number of concurrent low-priority tasks.

昇り龍 2024-09-15 15:58:20

您可以通过使用带有 ComputedMap 的 Future 来安排后台计算的“仅一次”执行。 Future 代表计算值的任务。未来由 ComputedMap 创建,同时传递给 ExecutorService 用于后台执行。可以使用您自己的 ThreadFactory 实现创建低优先级线程,例如,

class LowPriorityThreadFactory implements ThreadFactory
{
   public Thread newThread(Runnable r) {
     Tread t = new Thread(r);
     t.setPriority(MIN_PRIORITY);
     return t;
   }
}

当需要该值时,您的高优先级线程然后从映射中获取 future,并调用 get() 方法检索结果,等待必要时进行计算。为了避免优先级反转,您可以向任务添加一些额外的代码:

class HandlePriorityInversionTask extends FutureTask<ResultType>
{
   Integer priority;  // non null if set
   Integer originalPriority;
   Thread thread;
   public ResultType get() {
      if (!isDone()) 
         setPriority(Thread.currentThread().getPriority());
      return super.get();
   }
   public void run() {
      synchronized (this) {
         thread = Thread.currentThread();
         originalPriority = thread.getPriority();
         if (priority!=null) setPriority(priority);
      } 
      super.run();
   }
   protected synchronized void done() {
         if (originalPriority!=null) setPriority(originalPriority);
         thread = null;
   }

   void synchronized setPriority(int priority) {
       this.priority = Integer.valueOf(priority);
       if (thread!=null)
          thread.setPriority(priority);
   }
}

这会提高优先级如果任务尚未完成,则将任务的优先级设置为调用 get() 的线程的优先级,并在任务完成时(正常或其他情况)将优先级返回到原始优先级。 (为了简单起见,代码不会检查优先级是否确实更高,但这很容易添加。)

当高优先级任务调用 get() 时,Future 可能尚未开始执行。您可能会想通过为执行程序服务使用的线程数设置一个较大的上限来避免这种情况,但这可能是一个坏主意,因为每个线程都可以以高优先级运行,消耗与之前一样多的 cpu操作系统将其关闭。该池的大小可能应该与硬件线程的数量相同,例如,将池的大小设置为Runtime.availableProcessors()。如果任务尚未开始执行,而不是等待执行程序安排它(这是优先级反转的一种形式,因为高优先级线程正在等待低优先级线程完成),那么您可以选择从当前执行器并在仅运行高优先级线程的执行器上重新提交。

You can arrange for "once only" execution of the background computation by using a Future with the ComputedMap. The Future represents the task that computes the value. The future is created by the ComputedMap and at the same time, passed to an ExecutorService for background execution. The executor can be configured with your own ThreadFactory implementation that creates low priority threads, e.g.

class LowPriorityThreadFactory implements ThreadFactory
{
   public Thread newThread(Runnable r) {
     Tread t = new Thread(r);
     t.setPriority(MIN_PRIORITY);
     return t;
   }
}

When the value is needed, your high-priority thread then fetches the future from the map, and calls the get() method to retrieve the result, waiting for it to be computed if necessary. To avoid priority inversion you add some additional code to the task:

class HandlePriorityInversionTask extends FutureTask<ResultType>
{
   Integer priority;  // non null if set
   Integer originalPriority;
   Thread thread;
   public ResultType get() {
      if (!isDone()) 
         setPriority(Thread.currentThread().getPriority());
      return super.get();
   }
   public void run() {
      synchronized (this) {
         thread = Thread.currentThread();
         originalPriority = thread.getPriority();
         if (priority!=null) setPriority(priority);
      } 
      super.run();
   }
   protected synchronized void done() {
         if (originalPriority!=null) setPriority(originalPriority);
         thread = null;
   }

   void synchronized setPriority(int priority) {
       this.priority = Integer.valueOf(priority);
       if (thread!=null)
          thread.setPriority(priority);
   }
}

This takes care of raising the priority of the task to the priority of the thread calling get() if the task has not completed, and returns the priority to the original when the task completes, normally or otherwise. (To keep it brief, the code doesn't check if the priority is indeed greater, but that's easy to add.)

When the high priority task calls get(), the future may not yet have begun executing. You might be tempted to avoid this by setting a large upper bound on the number of threads used by the executor service, but this may be a bad idea, since each thread could be running at high priority, consuming as much cpu as it can before the OS switches it out. The pool should probably be the same size as the number of hardware threads, e.g. size the pool to Runtime.availableProcessors(). If the task has not started executing, rather than wait for the executor to schedule it (which is a form of priority inversion, since your high priority thread is waiting for the low-priority threads to complete) then you may choose to cancel it from the current executor and re-submit on an executor running only high-priority threads.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文