Java中的线程安全双缓冲缓存(不适用于图形)?

发布于 2024-08-05 03:25:13 字数 2356 浏览 14 评论 0原文

我最近正在寻找一种方法来为常规对象实现双缓冲线程安全缓存。

之所以出现这种需求,是因为我们有一些缓存的数据结构,每个请求都会多次命中这些数据结构,并且需要从一个非常大的文档(1s+ 解组时间)中从缓存中重新加载,并且我们不能让所有请求因此而延迟每一分钟都那么长。

由于我找不到一个好的线程安全实现,所以我编写了自己的实现,现在我想知道它是否正确以及是否可以使其更小......这里是:

package nl.trimpe.michiel

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/**
 * Abstract class implementing a double buffered cache for a single object.
 * 
 * Implementing classes can load the object to be cached by implementing the
 * {@link #retrieve()} method.
 * 
 * @param <T>
 *            The type of the object to be cached.
 */
public abstract class DoublyBufferedCache<T> {

    private static final Log log = LogFactory.getLog(DoublyBufferedCache.class);

    private Long timeToLive;

    private long lastRetrieval;

    private T cachedObject;

    private Object lock = new Object();

    private volatile Boolean isLoading = false;

    public T getCachedObject() {
        checkForReload();
        return cachedObject;
    }

    private void checkForReload() {
        if (cachedObject == null || isExpired()) {
            if (!isReloading()) {
                synchronized (lock) {
                    // Recheck expiration because another thread might have
                    // refreshed the cache before we were allowed into the
                    // synchronized block.
                    if (isExpired()) {
                        isLoading = true;
                        try {
                            cachedObject = retrieve();
                            lastRetrieval = System.currentTimeMillis();
                        } catch (Exception e) {
                            log.error("Exception occurred retrieving cached object", e);
                        } finally {
                            isLoading = false;
                        }
                    }
                }
            }
        }
    }

    protected abstract T retrieve() throws Exception;

    private boolean isExpired() {
        return (timeToLive > 0) ? ((System.currentTimeMillis() - lastRetrieval) > (timeToLive * 1000)) : true;
    }

    private boolean isReloading() {
        return cachedObject != null && isLoading;
    }

    public void setTimeToLive(Long timeToLive) {
        this.timeToLive = timeToLive;
    }

}

I was recently looking for a way to implement a doubly buffered thread-safe cache for regular objects.

The need arose because we had some cached data structures that were being hit numerous times for each request and needed to be reloaded from cache from a very large document (1s+ unmarshalling time) and we couldn't afford to let all requests be delayed by that long every minute.

Since I couldn't find a good threadsafe implementation I wrote my own and now I am wondering if it's correct and if it can be made smaller... Here it is:

package nl.trimpe.michiel

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/**
 * Abstract class implementing a double buffered cache for a single object.
 * 
 * Implementing classes can load the object to be cached by implementing the
 * {@link #retrieve()} method.
 * 
 * @param <T>
 *            The type of the object to be cached.
 */
public abstract class DoublyBufferedCache<T> {

    private static final Log log = LogFactory.getLog(DoublyBufferedCache.class);

    private Long timeToLive;

    private long lastRetrieval;

    private T cachedObject;

    private Object lock = new Object();

    private volatile Boolean isLoading = false;

    public T getCachedObject() {
        checkForReload();
        return cachedObject;
    }

    private void checkForReload() {
        if (cachedObject == null || isExpired()) {
            if (!isReloading()) {
                synchronized (lock) {
                    // Recheck expiration because another thread might have
                    // refreshed the cache before we were allowed into the
                    // synchronized block.
                    if (isExpired()) {
                        isLoading = true;
                        try {
                            cachedObject = retrieve();
                            lastRetrieval = System.currentTimeMillis();
                        } catch (Exception e) {
                            log.error("Exception occurred retrieving cached object", e);
                        } finally {
                            isLoading = false;
                        }
                    }
                }
            }
        }
    }

    protected abstract T retrieve() throws Exception;

    private boolean isExpired() {
        return (timeToLive > 0) ? ((System.currentTimeMillis() - lastRetrieval) > (timeToLive * 1000)) : true;
    }

    private boolean isReloading() {
        return cachedObject != null && isLoading;
    }

    public void setTimeToLive(Long timeToLive) {
        this.timeToLive = timeToLive;
    }

}

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(4

私野 2024-08-12 03:25:13

你写的不是线程安全的。事实上,您无意中发现了一个常见的谬误,这是一个相当著名的问题。这称为双重检查锁定问题许多像您这样的解决方案(以及这个主题有几个变体)都有问题。

有一些潜在的解决方案,但恕我直言,最简单的就是使用 ScheduledThreadExecutorService 并每分钟或您需要的频率重新加载您需要的内容。当您重新加载它时,将其放入缓存结果中,并且对它的调用只会返回最新版本。这是线程安全的并且易于实现。当然,它不是按需加载的,但是,除了初始值之外,您在检索该值时永远不会受到性能影响。我将其称为过度热切加载而不是延迟加载。

例如:

public class Cache<T> {
  private final ScheduledExecutorsService executor =
    Executors.newSingleThreadExecutorService();
  private final Callable<T> method;
  private final Runnable refresh;
  private Future<T> result;
  private final long ttl;

  public Cache(Callable<T> method, long ttl) {
    if (method == null) {
      throw new NullPointerException("method cannot be null");
    }
    if (ttl <= 0) {
      throw new IllegalArgumentException("ttl must be positive");
    }
    this.method = method;
    this.ttl = ttl;

    // initial hits may result in a delay until we've loaded
    // the result once, after which there will never be another
    // delay because we will only refresh with complete results
    result = executor.submit(method);

    // schedule the refresh process
    refresh = new Runnable() {
      public void run() {
        Future<T> future = executor.submit(method);
        future.get();
        result = future;
        executor.schedule(refresh, ttl, TimeUnit.MILLISECONDS);
      }
    }
    executor.schedule(refresh, ttl, TimeUnit.MILLISECONDS);
  }

  public T getResult() {
    return result.get();
  }
}

这需要一点解释。基本上,您正在创建一个通用接口来缓存 Callable 的结果,这将是您的文档加载。提交 Callable(或 Runnable)会返回 Future。调用 Future.get() 会阻塞,直到它返回(完成)。

所以它的作用是根据 Future 实现 get() 方法,这样初始查询就不会失败(它们会阻塞)。之后,每隔“ttl”毫秒调用刷新方法。它将方法提交给调度程序并调用 Future.get(),该方法会产生结果并等待结果完成。完成后,它会替换“结果”成员。后续 Cache.get() 调用将返回新值。

ScheduledExecutorService 上有一个 ScheduleWithFixedRate() 方法,但我避免使用它,因为如果 Callable 花费的时间超过预定的延迟,您将最终同时运行多个,然后必须担心该问题或限制。流程在刷新结束时提交自身会更容易。

What you've written isn't threadsafe. In fact, you've stumbled onto a common fallacy that is quite a famous problem. It's called the double-checked locking problem and many such solutions as yours (and there are several variations on this theme) all have issues.

There are a few potential solutions to this but imho the easiest is simply to use a ScheduledThreadExecutorService and reload what you need every minute or however often you need to. When you reload it put it into the cache result and the calls for it just return the latest version. This is threadsafe and easy to implement. Sure it's not on-demand loaded but, apart from the initial value, you'll never take a performance hit while you retrieve the value. I'd call this over-eager loading rather than lazy-loading.

For example:

public class Cache<T> {
  private final ScheduledExecutorsService executor =
    Executors.newSingleThreadExecutorService();
  private final Callable<T> method;
  private final Runnable refresh;
  private Future<T> result;
  private final long ttl;

  public Cache(Callable<T> method, long ttl) {
    if (method == null) {
      throw new NullPointerException("method cannot be null");
    }
    if (ttl <= 0) {
      throw new IllegalArgumentException("ttl must be positive");
    }
    this.method = method;
    this.ttl = ttl;

    // initial hits may result in a delay until we've loaded
    // the result once, after which there will never be another
    // delay because we will only refresh with complete results
    result = executor.submit(method);

    // schedule the refresh process
    refresh = new Runnable() {
      public void run() {
        Future<T> future = executor.submit(method);
        future.get();
        result = future;
        executor.schedule(refresh, ttl, TimeUnit.MILLISECONDS);
      }
    }
    executor.schedule(refresh, ttl, TimeUnit.MILLISECONDS);
  }

  public T getResult() {
    return result.get();
  }
}

That takes a little explanation. Basically, you're creating a generic interface for caching the result of a Callable, which will be your document load. Submitting a Callable (or Runnable) returns a Future. Calling Future.get() blocks until it returns (completes).

So what this does is implement a get() method in terms of a Future so initial queries won't fail (they will block). After that, every 'ttl' milliseconds the refresh method is called. It submits the method to the scheduler and calls Future.get(), which yields and waits for the result to complete. Once complete, it replaces the 'result' member. Subsequence Cache.get() calls will return the new value.

There is a scheduleWithFixedRate() method on ScheduledExecutorService but I avoid it because if the Callable takes longer than the scheduled delay you will end up with multiple running at the same time and then have to worry about that or throttling. It's easier just for the process to submit itself at the end of a refresh.

巾帼英雄 2024-08-12 03:25:13

我不确定我是否理解您的需求。对于部分值,您是否需要更快地加载(和重新加载)缓存?

如果是这样,我建议将数据结构分解成更小的部分
只需加载您当时需要的部分即可。如果将大小除以 10,则加载时间除以与 10 相关的值。

如果可能的话,这可能适用于您正在阅读的原始文档。否则,这将是您阅读它的方式,您会跳过其中的很大一部分并仅加载相关部分。

我相信大多数数据都可以分解成碎片。选择更合适的,这里是示例:

  • 通过开头字母:A*,B* ...
  • 将您的id分为两部分:第一部分是一个类别,在缓存中查找它,如果需要则加载它,然后查找您的ID里面的第二部分。

I'm not sure I understand your need. Is your need to a have a faster loading (and reloading) of the cache, for a portion of the values?

If so, I would suggest breaking your datastructure into smaller pieces.
Just load the piece that you need at the time. If you divide the size by 10, you will divide the loading time by something related to 10.

This could apply to the original document you are reading, if possible. Otherwise, it would be the way you read it, where you skip a large part of it and load only the relevant part.

I believe that most data can be broken down into pieces. Choose the more appropriate, here are examples:

  • by starting letter : A*, B* ...
  • partition your id into two part : first part is a category, look for it in the cache, load it if needed, then look for your second part inside.
殤城〤 2024-08-12 03:25:13

如果您需要的不是初始加载时间,而是重新加载,也许您不介意重新加载的实际时间,但希望能够在加载新版本的同时使用旧版本

如果这是您的需要,我建议您将缓存设置为字段中可用的实例(而不是静态)。

  1. 您使用专用线程(或者至少不是常规线程)每分钟触发重新加载,这样您就不会延迟常规线程。

  2. 重新加载会创建一个新实例,为其加载数据(需要 1 秒),然后只需用新实例替换旧实例即可。 (旧的将被垃圾收集。)用另一个对象替换一个对象是一个原子操作

分析:在这种情况下,任何其他线程都可以访问旧缓存,直到最后一刻?
在最坏的情况下,指令在获取旧的缓存实例后,另一个线程用新的实例替换旧的实例。但这不会使您的代码出现错误,询问旧的缓存实例仍然会给出之前正确的值,这对于我在第一句话中给出的要求是可以接受的。

为了使您的代码更加正确,您可以将缓存实例创建为不可变的(没有可用的setter,无法修改内部状态)。这使得在多线程上下文中使用它是正确的。

If your need is not the initial loading time, but the reloading, maybe you don't mind the actual time for reloading, but want to be able to use the old version while loading the new?

If that is your need, I suggest making your cache an instance (as opposed to static) that is available in a field.

  1. You trigger reloading every minute with a dedicated thread (or a least not the regular threads), so that you don't delay your regular threads.

  2. Reloading creates a new instance, load it with data (takes 1 second), and then simply replace the old instance with the new. (The old will get garbage-collected.) Replacing an object with another is an atomic operation.

Analysis: What happens in that case is that any other thread can get access to the old cache until the last instant ?
In the worst case, the instruction just after getting the old cache instance, another thread replaces the old instance with a new. But this doesn't make your code faulty, asking the old cache instance will still give a value that was correct just before, which is acceptable by the requirement I gave as first sentence.

To make your code more correct, you can create your cache instance as immutable (no setters available, no way to modify internal state). This makes it clearer that it is correct to use it in a multi-threaded context.

一瞬间的火花 2024-08-12 03:25:13

您似乎锁定的数量超过了所需的数量,在您的良好情况下(缓存已满且有效),每个请求都需要锁定。如果缓存过期,您只能通过锁定来逃脱。

如果我们正在重新加载,则无需执行任何操作。
如果我们不重新加载,请检查是否过期,如果没有过期则继续。
如果我们没有重新加载并且已过期,请获取锁定并仔细检查是否已过期,以确保自上次检查以来我们尚未成功加载。

另请注意,您可能希望在后台线程中重新加载缓存,这样就不会出现一个请求被搁置等待缓存填充的情况。


    private void checkForReload() {
        if (cachedObject == null || isExpired()) {
                if (!isReloading()) {

                       // Recheck expiration because another thread might have
                       // refreshed the cache before we were allowed into the
                        // synchronized block.
                        if (isExpired()) {
                                synchronized (lock) {
                                        if (isExpired()) {
                                                isLoading = true;
                                                try {
                                                        cachedObject = retrieve();
                                                        lastRetrieval = System.currentTimeMillis();
                                                } catch (Exception e) {
                                                        log.error("Exception occurred retrieving cached object", e);
                                                } finally {
                                                        isLoading = false;
                                                }
                                        }
                                }
                        }
                }
        }

You appare to be locking more then is required, in your good case (cache full and valid) every request aquires a lock. you can get away with only locking if the cache is expired.

If we are reloading, do nothing.
If we are not reloading, check if expired if not expired go ahead.
If we are not reloading and we are expired, get the lock and double check expired to make sure we have not sucessfuly loaded seince last check.

Also note you may wish to reload the cache in a background thread so not event the one requrest is heldup waiting for cache to fill.


    private void checkForReload() {
        if (cachedObject == null || isExpired()) {
                if (!isReloading()) {

                       // Recheck expiration because another thread might have
                       // refreshed the cache before we were allowed into the
                        // synchronized block.
                        if (isExpired()) {
                                synchronized (lock) {
                                        if (isExpired()) {
                                                isLoading = true;
                                                try {
                                                        cachedObject = retrieve();
                                                        lastRetrieval = System.currentTimeMillis();
                                                } catch (Exception e) {
                                                        log.error("Exception occurred retrieving cached object", e);
                                                } finally {
                                                        isLoading = false;
                                                }
                                        }
                                }
                        }
                }
        }

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文