并发缓存共享模式

发布于 2024-10-06 17:06:23 字数 935 浏览 1 评论 0原文

好吧,我有点不确定如何最好地命名这个问题:)但是假设这种情况,你是 出去获取一些网页(带有各种网址)并将其缓存在本地。即使使用多个线程,缓存部分也很容易解决。

然而,想象一下一个线程开始获取一个 url,几毫秒后另一个线程想要获取相同的 url。是否有任何好的模式可以使秒线程的方法等待第一个线程来获取页面,将其插入缓存并返回它,这样您就不必执行多个请求。即使对于需要大约 300-700 毫秒的请求,开销也足够小,是否值得这样做?并且无需锁定其他网址的请求

基本上,当对相同网址的请求紧密相连时,我希望第二个请求“搭载”第一个请求

我有一个松散的想法,即拥有一个字典,在其中插入一个以键为网址的对象当您开始获取页面并锁定它时。如果已经有任何匹配的键,它就会获取该对象,锁定它,然后尝试获取实际缓存的 url。

我有点不确定细节,但是要使其真正线程安全,使用 ConcurrentDictionary 可能是其中的一部分......

对于这样的场景有没有通用的模式和解决方案?

细分错误行为:

线程 1:检查缓存,它不存在,因此开始获取 url

线程 2:开始获取相同的 url,因为它在缓存中仍然不存在

线程 1:完成并插入到缓存中,返回页面

线程2:完成并插入缓存(或丢弃它),返回页面

细分正确行为:

线程 1:检查缓存,它不存在,因此开始获取 url

线程 2:想要相同的 url,但看到当前正在获取它所以等待线程 1

线程 1:完成并插入到缓存中,返回页面

线程 2:注意到线程 1 已完成并返回它获取的页面线程 1

编辑

到目前为止,大多数解决方案似乎都误解了问题并且只解决缓存问题,正如我所说,这不是问题,问题是在进行外部 Web 获取时进行第二次获取在第一个获取缓存之前完成以使用结果从第一个开始,然后再做第二个

Ok I was a little unsure on how best name this problem :) But assume this scenarion, you're
going out and fetching some webpage (with various urls) and caching it locally. The cache part is pretty easy to solve even with multiple threads.

However, imagine that one thread starts fetching an url, and a couple of milliseconds later another want to get the same url. Is there any good pattern for making the seconds thread's method wait on the first one to fetch the page , insert it into the cache and return it so you don't have to do multiple requests. With little enough overhead that it's worth doing even for requests that take about 300-700 ms? And without locking requests for other urls

Basically when requests for identical urls comes in tightly after each other I want the second request to "piggyback" the first request

I had some loose idea of having a dictionary where you insert an object with the key as url when you start fetching a page and lock on it. If there's any matching the key already it get's the object, locks on it and then tries to fetch the url for the actual cache.

I'm a little unsure of the particulars however to make it really thread-safe, using ConcurrentDictionary might be one part of it...

Is there any common pattern and solutions for scenarios like this?

Breakdown wrong behavior:

Thread 1: Checks the cache, it doesnt exists so starts fetching the url

Thread 2: Starts fetching the same url since it still doesn't exist in Cache

Thread 1: finished and inserts into the cache, returns the page

Thread 2: Finishes and also inserts into cache (or discards it), returns the page

Breakdown correct behavior:

Thread 1: Checks the cache, it doesnt exists so starts fetching the url

Thread 2: Wants the same url, but sees it's currently being fetched so waits on thread 1

Thread 1: finished and inserts into the cache, returns the page

Thread 2: Notices that thread 1 is finished and returns the page thread 1 it fetched

EDIT

Most solutions sofar seem to misunderstand the problem and only addressing the caching, as I said that isnt the problem, the problem is when doing an external web fetch to make the second fetch that is done before the first one has cached it to use the result from the first rather then doing a second

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(6

焚却相思 2024-10-13 17:06:23

您可以使用 ConcurrentDictionary双重检查锁定的变体:

public static string GetUrlContent(string url)
{
    object value1 = _cache.GetOrAdd(url, new object());

    if (value1 == null)    // null check only required if content
        return null;       // could legitimately be a null string

    var urlContent = value1 as string;
    if (urlContent != null)
        return urlContent;    // got the content

    // value1 isn't a string which means that it's an object to lock against
    lock (value1)
    {
        object value2 = _cache[url];

        // at this point value2 will *either* be the url content
        // *or* the object that we already hold a lock against
        if (value2 != value1)
            return (string)value2;    // got the content

        urlContent = FetchContentFromTheWeb(url);    // todo
        _cache[url] = urlContent;
        return urlContent;
    }
}

private static readonly ConcurrentDictionary<string, object> _cache =
                                  new ConcurrentDictionary<string, object>();

You could use a ConcurrentDictionary<K,V> and a variant of double-checked locking:

public static string GetUrlContent(string url)
{
    object value1 = _cache.GetOrAdd(url, new object());

    if (value1 == null)    // null check only required if content
        return null;       // could legitimately be a null string

    var urlContent = value1 as string;
    if (urlContent != null)
        return urlContent;    // got the content

    // value1 isn't a string which means that it's an object to lock against
    lock (value1)
    {
        object value2 = _cache[url];

        // at this point value2 will *either* be the url content
        // *or* the object that we already hold a lock against
        if (value2 != value1)
            return (string)value2;    // got the content

        urlContent = FetchContentFromTheWeb(url);    // todo
        _cache[url] = urlContent;
        return urlContent;
    }
}

private static readonly ConcurrentDictionary<string, object> _cache =
                                  new ConcurrentDictionary<string, object>();
对你而言 2024-10-13 17:06:23

编辑:我的代码现在有点丑陋,但每个 URL 使用单独的锁。这允许异步获取不同的 URL,但每个 URL 只会获取一次。

public class UrlFetcher
{
    static Hashtable cache = Hashtable.Synchronized(new Hashtable());

    public static String GetCachedUrl(String url)
    {
        // exactly 1 fetcher is created per URL
        InternalFetcher fetcher = (InternalFetcher)cache[url];
        if( fetcher == null )
        {
            lock( cache.SyncRoot )
            {
                fetcher = (InternalFetcher)cache[url];
                if( fetcher == null )
                {
                    fetcher = new InternalFetcher(url);
                    cache[url] = fetcher;
                }
            }
        }
        // blocks all threads requesting the same URL
        return fetcher.Contents;
    }

    /// <summary>Each fetcher locks on itself and is initilized with null contents.
    /// The first thread to call fetcher.Contents will cause the fetch to occur, and
    /// block until completion.</summary>
    private class InternalFetcher
    {
        private String url;
        private String contents;

        public InternalFetcher(String url)
        {
            this.url = url;
            this.contents = null;
        }

        public String Contents
        {
            get
            {
                if( contents == null )
                {
                    lock( this ) // "this" is an instance of InternalFetcher...
                    {
                        if( contents == null )
                        {
                            contents = FetchFromWeb(url);
                        }
                    }
                }
                return contents;
            }
        }
    }
}

EDIT: My code is quite a bit uglier now, but uses a separate lock per URL. This allows different URLs to be fetched asynchronously, however each URL will only be fetched once.

public class UrlFetcher
{
    static Hashtable cache = Hashtable.Synchronized(new Hashtable());

    public static String GetCachedUrl(String url)
    {
        // exactly 1 fetcher is created per URL
        InternalFetcher fetcher = (InternalFetcher)cache[url];
        if( fetcher == null )
        {
            lock( cache.SyncRoot )
            {
                fetcher = (InternalFetcher)cache[url];
                if( fetcher == null )
                {
                    fetcher = new InternalFetcher(url);
                    cache[url] = fetcher;
                }
            }
        }
        // blocks all threads requesting the same URL
        return fetcher.Contents;
    }

    /// <summary>Each fetcher locks on itself and is initilized with null contents.
    /// The first thread to call fetcher.Contents will cause the fetch to occur, and
    /// block until completion.</summary>
    private class InternalFetcher
    {
        private String url;
        private String contents;

        public InternalFetcher(String url)
        {
            this.url = url;
            this.contents = null;
        }

        public String Contents
        {
            get
            {
                if( contents == null )
                {
                    lock( this ) // "this" is an instance of InternalFetcher...
                    {
                        if( contents == null )
                        {
                            contents = FetchFromWeb(url);
                        }
                    }
                }
                return contents;
            }
        }
    }
}
◇流星雨 2024-10-13 17:06:23

Semaphore 请起立!起来!起来!

使用Semaphore您可以轻松地与它同步您的线程。
的两种情况下,

  1. 在尝试加载当前正在缓存的页面
  2. 您都将缓存保存到从中加载页面的文件中。

在这两种情况下你都会遇到麻烦。

就像作家和读者问题一样,这是操作系统竞赛问题中的常见问题。仅当线程想要重建缓存或开始缓存页面时,任何线程都不应从中读取。如果一个线程正在读取它,它应该等到读取完成并替换缓存,任何两个线程都不应该将相同的页面缓存到同一个文件中。因此,所有读取器都可以随时从缓存中读取,因为没有写入器在其上写入。

你应该阅读msdn上的一些信号量使用示例,它非常容易使用。只有想要做某事的线程才会调用信号量,如果资源可以授予它,它就会执行工作,否则会休眠并等待资源准备好时被唤醒。

Will the Semaphore please stand up! stand up! stand up!

use Semaphore you can easily synchronize your threads with it.
on both cases where

  1. you are trying to load a page that is currently being cached
  2. you are saving cache to a file where a page is loading from it.

in both scenarios you will face troubles.

it is just like writers and readers problem that is a common problem in Operating System Racing Issues. just when a thread wants to rebuild a cache or start caching a page no thread should read from it. if a thread is reading it it should wait until reading finished and replace the cache, no 2 threads should cache same page in to a same file. hence it is possible for all readers to read from a cache at anytime since no writer is writing on it.

you should read some semaphore using samples on msdn, it is very easy to use. just the thread that wants to do something is call the semaphore and if the resource can granted it do the works otherwise sleeps and wait to be woken up when the resource is ready.

把回忆走一遍 2024-10-13 17:06:23

免责声明:这可能是一个n00bish 答案。如果是的话请原谅我。

我建议使用一些带锁的共享字典对象来跟踪当前正在获取或已经获取的 url。

  • 在每次请求时,根据此对象检查 url。

  • 如果存在该 url 条目,请检查缓存。 (这意味着其中一个线程已获取它或当前正在获取它)

  • 如果它在缓存中可用,请使用它,否则使当前线程休眠一段时间并再次检查。 (如果不在缓存中,某些线程仍在获取它,因此请等待其完成)

  • 如果在字典对象中找不到该条目,请将 url 添加到其中并发送请求。一旦获得响应,将其添加到缓存中。

此逻辑应该有效,但是,您需要处理缓存过期以及从字典对象中删除条目。

Disclaimer: This might be a n00bish answer. Please pardon me, if it is.

I'd recommend using some shared dictionary object with locks to keep a track of the url being currently fetched or have already been fetched.

  • At every request, check the url against this object.

  • If an entry for the url is present, check the cache. (this means one of the threads has either fetched it or is currently fetching it)

  • If its available in the cache, use it, else put the current thread to sleep for a while and check back again. (if not in cache, some thread is still fetching it, so wait while its done)

  • If the entry is not found in the dictionary object, add the url to it and send the request. Once it obtains a response, add it to cache.

This logic should work, however, you would need to take care of cache expiration and removal of the entry from the dictionary object.

柠北森屋 2024-10-13 17:06:23

我的解决方案是当缓存超时或不存在时,使用atomicBoolean来控制访问数据库;

在同一时刻,只有一个线程(我称之为read-th)可以访问数据库,其他线程旋转直到read-th返回数据并将其写入缓存;

这里的代码;通过java实现;

public class CacheBreakDownDefender<K, R> {

/**
 * false = do not write null to cache when get null value from database;
 */
private final boolean writeNullToCache;

/**
 * cache different query key
 */
private final ConcurrentHashMap<K, AtomicBoolean> selectingDBTagMap = new ConcurrentHashMap<>();


public static <K, R> CacheBreakDownDefender<K, R> getInstance(Class<K> keyType, Class<R> resultType) {
    return Singleton.get(keyType.getName() + resultType.getName(), () -> new CacheBreakDownDefender<>(false));
}

public static <K, R> CacheBreakDownDefender<K, R> getInstance(Class<K> keyType, Class<R> resultType, boolean writeNullToCache) {
    return Singleton.get(keyType.getName() + resultType.getName(), () -> new CacheBreakDownDefender<>(writeNullToCache));
}

private CacheBreakDownDefender(boolean writeNullToCache) {
    this.writeNullToCache = writeNullToCache;
}

public R readFromCache(K key, Function<K, ? extends R> getFromCache, Function<K, ? extends R> getFromDB, BiConsumer<K, R> writeCache) throws InterruptedException {
    R result = getFromCache.apply(key);
    if (result == null) {
        final AtomicBoolean selectingDB = selectingDBTagMap.computeIfAbsent(key, x -> new AtomicBoolean(false));
        if (selectingDB.compareAndSet(false, true)) { 
            try { 
                result = getFromDB.apply(key);
                if (result != null || writeNullToCache) {
                    writeCache.accept(key, result);
                }
            } finally {
                selectingDB.getAndSet(false);
                selectingDBTagMap.remove(key);
            }
        } else {
            
            while (selectingDB.get()) {
                TimeUnit.MILLISECONDS.sleep(0L);
                //do nothing...  
            }
            return getFromCache.apply(key);
        }
    }
    return result;
}

public static void main(String[] args) throws InterruptedException {

    Map<String, String> map = new ConcurrentHashMap<>();
    CacheBreakDownDefender<String, String> instance = CacheBreakDownDefender.getInstance(String.class, String.class, true);

    for (int i = 0; i < 9; i++) {
        int finalI = i;
        new Thread(() -> {
            String kele = null;
            try {
                if (finalI == 6) {
                    kele = instance.readFromCache("kele2", map::get, key -> "helloword2", map::put);
                } else
                    kele = instance.readFromCache("kele", map::get, key -> "helloword", map::put);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            log.info("resut= {}", kele);
        }).start();
    }
    TimeUnit.SECONDS.sleep(2L);
}

}

my solution is use atomicBoolean to control access database when cache is timeout or unexist;

at the same moment, only one thread(i call it read-th) can access database, the other threads spin until the read-th return data and write it into cache;

here codes; implement by java;

public class CacheBreakDownDefender<K, R> {

/**
 * false = do not write null to cache when get null value from database;
 */
private final boolean writeNullToCache;

/**
 * cache different query key
 */
private final ConcurrentHashMap<K, AtomicBoolean> selectingDBTagMap = new ConcurrentHashMap<>();


public static <K, R> CacheBreakDownDefender<K, R> getInstance(Class<K> keyType, Class<R> resultType) {
    return Singleton.get(keyType.getName() + resultType.getName(), () -> new CacheBreakDownDefender<>(false));
}

public static <K, R> CacheBreakDownDefender<K, R> getInstance(Class<K> keyType, Class<R> resultType, boolean writeNullToCache) {
    return Singleton.get(keyType.getName() + resultType.getName(), () -> new CacheBreakDownDefender<>(writeNullToCache));
}

private CacheBreakDownDefender(boolean writeNullToCache) {
    this.writeNullToCache = writeNullToCache;
}

public R readFromCache(K key, Function<K, ? extends R> getFromCache, Function<K, ? extends R> getFromDB, BiConsumer<K, R> writeCache) throws InterruptedException {
    R result = getFromCache.apply(key);
    if (result == null) {
        final AtomicBoolean selectingDB = selectingDBTagMap.computeIfAbsent(key, x -> new AtomicBoolean(false));
        if (selectingDB.compareAndSet(false, true)) { 
            try { 
                result = getFromDB.apply(key);
                if (result != null || writeNullToCache) {
                    writeCache.accept(key, result);
                }
            } finally {
                selectingDB.getAndSet(false);
                selectingDBTagMap.remove(key);
            }
        } else {
            
            while (selectingDB.get()) {
                TimeUnit.MILLISECONDS.sleep(0L);
                //do nothing...  
            }
            return getFromCache.apply(key);
        }
    }
    return result;
}

public static void main(String[] args) throws InterruptedException {

    Map<String, String> map = new ConcurrentHashMap<>();
    CacheBreakDownDefender<String, String> instance = CacheBreakDownDefender.getInstance(String.class, String.class, true);

    for (int i = 0; i < 9; i++) {
        int finalI = i;
        new Thread(() -> {
            String kele = null;
            try {
                if (finalI == 6) {
                    kele = instance.readFromCache("kele2", map::get, key -> "helloword2", map::put);
                } else
                    kele = instance.readFromCache("kele", map::get, key -> "helloword", map::put);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            log.info("resut= {}", kele);
        }).start();
    }
    TimeUnit.SECONDS.sleep(2L);
}

}

墨离汐 2024-10-13 17:06:23

这并不完全适用于并发缓存,而是适用于所有缓存:

< strong>“具有错误策略的缓存是内存泄漏的另一个名称”(Raymond Chen)

This is not exactly for concurrent caches but for all caches:

"A cache with a bad policy is another name for a memory leak" (Raymond Chen)

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文