制作Java完整的未来缓存原子能

发布于 2025-02-06 01:29:56 字数 4009 浏览 2 评论 0原文

我一直在尝试设置一个简单的原子能完整的未来缓存,该缓存将String键callable类 并缓存执行和结果。我知道我可以使用咖啡因,但仍然想了解如何在没有种族条件的情况下完成此操作。插入和清理。

在这个简单的类中,我有两个简化的缓存:executions缓存,可以使尚未完成运行的可可的跟踪以及结果缓存,以跟踪结果(最终,这将是溢出磁盘的ehcache)

public class AsyncCache {

    ExecutorService executor = Executors.newFixedThreadPool(10);
    ConcurrentHashMap<String, CompletableFuture<Object>> executions = new ConcurrentHashMap<>();
    ConcurrentHashMap<String, Object> results = new ConcurrentHashMap<>();

    public CompletableFuture<Object> get(String key, Callable<Object> callable) {

        Object result = results.get(key);
        if (result != null) {
            return CompletableFuture.completedFuture(result);
        }

        return executions.computeIfAbsent(key, k -> {

            CompletableFuture<Object> future = CompletableFuture.supplyAsync(() -> {
                try {
                    return callable.call();
                } catch (Exception e) {
                    throw new CompletionException(e);
                }
            }, executor);

            return future.whenComplete((Object r, Throwable t) -> {
                if (executions.remove(k) != null) {
                    results.put(k, result);
                }
            });

        });

    }

    public void clear() {
        results.clear();
        executions.clear();
    }

}

我相信该代码有两个问题。首先,行中存在一个同步问题:

if (executions.remove(k) != null) {
    results.put(k, result);
}

之间删除(k) and results.put(k,result)可以有一个新的get < /code>调用Resulte中尚未在executions中删除的相同密钥在结果缓存中。

其次,行中还有另一个同步问题:

results.clear();
executions.clear();

在两个clear()之间,可以有一个新的get调用,无法从结果< /code>映射,但会从executions地图中获得停滞的响应。

关于如何解决此问题的任何想法,而无需天真地同步所有内容。


编辑。

如果我引入每钥匙锁定以防止阅读和写作,该怎么办?这样的事情:

    ConcurrentMap<String, ReadWriteLock> locks = new ConcurrentHashMap<String, ReadWriteLock>();

    public CompletableFuture<Object> get1(String key, Callable<Object> callable) {

        ReadWriteLock reading = locks.computeIfAbsent(key, r -> new ReentrantReadWriteLock());
        reading.readLock().lock();
        try {

            Object result = results.get(key);
            if (result != null) {
                return CompletableFuture.completedFuture(result);
            }

            return executions.computeIfAbsent(key, k -> {

                CompletableFuture<Object> future = CompletableFuture.supplyAsync(() -> {
                    try {
                        return callable.call();
                    } catch (Exception e) {
                        throw new CompletionException(e);
                    }
                }, executor);

                return future.whenComplete((Object r, Throwable t) -> {

                    ReadWriteLock writing = locks.computeIfAbsent(k, w -> new ReentrantReadWriteLock());
                    writing.writeLock().lock();
                    try {
                        if (executions.remove(k) != null) {
                            results.put(k, r);
                        }
                    } finally {
                        writing.writeLock().unlock();
                        locks.remove(k);
                    }

                });

            });

        } finally {
            reading.readLock().unlock();
            locks.remove(key);
        }

    }

这仍然会留下有关如何编写clear()方法的问题。

I've been trying to setup a simple atomic completable future cache that takes a String key and a Callable class and caches the executions and the results. I know I could use caffeine but still want to understand how this can be done without race conditions while inserting and clearing.

In this simple class I have two simplified caches: an executions cache that keeps tracks of the callables that haven't finished running, and a results cache that keeps track of the results (this will eventually be an ehcache that overflows to disk)

public class AsyncCache {

    ExecutorService executor = Executors.newFixedThreadPool(10);
    ConcurrentHashMap<String, CompletableFuture<Object>> executions = new ConcurrentHashMap<>();
    ConcurrentHashMap<String, Object> results = new ConcurrentHashMap<>();

    public CompletableFuture<Object> get(String key, Callable<Object> callable) {

        Object result = results.get(key);
        if (result != null) {
            return CompletableFuture.completedFuture(result);
        }

        return executions.computeIfAbsent(key, k -> {

            CompletableFuture<Object> future = CompletableFuture.supplyAsync(() -> {
                try {
                    return callable.call();
                } catch (Exception e) {
                    throw new CompletionException(e);
                }
            }, executor);

            return future.whenComplete((Object r, Throwable t) -> {
                if (executions.remove(k) != null) {
                    results.put(k, result);
                }
            });

        });

    }

    public void clear() {
        results.clear();
        executions.clear();
    }

}

I believe that this code has two problems. First, there is a synchronization problem in the lines:

if (executions.remove(k) != null) {
    results.put(k, result);
}

In between remove(k) and results.put(k, result) there can be a new get call for the same key that is not already in results and has been removed from executions, thus triggering a new callable execution, when the result was about to be placed in the results cache.

Second, there is another synchronization problem in the lines:

results.clear();
executions.clear();

In between both clear() there can be a new get call that would not get result from the results map but would get a stalled response from the executions map.

Any ideas on how to fix this without naively synchronizing everything.


Edit.

What if I introduce a lock per key to guard against read and writes? Something like this:

    ConcurrentMap<String, ReadWriteLock> locks = new ConcurrentHashMap<String, ReadWriteLock>();

    public CompletableFuture<Object> get1(String key, Callable<Object> callable) {

        ReadWriteLock reading = locks.computeIfAbsent(key, r -> new ReentrantReadWriteLock());
        reading.readLock().lock();
        try {

            Object result = results.get(key);
            if (result != null) {
                return CompletableFuture.completedFuture(result);
            }

            return executions.computeIfAbsent(key, k -> {

                CompletableFuture<Object> future = CompletableFuture.supplyAsync(() -> {
                    try {
                        return callable.call();
                    } catch (Exception e) {
                        throw new CompletionException(e);
                    }
                }, executor);

                return future.whenComplete((Object r, Throwable t) -> {

                    ReadWriteLock writing = locks.computeIfAbsent(k, w -> new ReentrantReadWriteLock());
                    writing.writeLock().lock();
                    try {
                        if (executions.remove(k) != null) {
                            results.put(k, r);
                        }
                    } finally {
                        writing.writeLock().unlock();
                        locks.remove(k);
                    }

                });

            });

        } finally {
            reading.readLock().unlock();
            locks.remove(key);
        }

    }

This would still leave with questions regarding how to write the clear() method.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

雪花飘飘的天空 2025-02-13 01:29:56

这是我最终使用的代码。我经历了许多修订,但找不到任何明显的僵局或比赛条件。我要把它留在这里,以防万一对别人有用。

public class AsyncCache {

    ExecutorService executor = Executors.newFixedThreadPool(10);
    ConcurrentHashMap<String, CompletableFuture<Object>> executions = new ConcurrentHashMap<>();

    ReentrantReadWriteLock guard = new ReentrantReadWriteLock();
    MutableConfiguration<String, Object> configuration = new MutableConfiguration<>();
    Cache<String, Object> results = Caching.getCachingProvider().getCacheManager().createCache("results", configuration);

    public CompletableFuture<Object> get(String key, Callable<Object> callable) {

        guard.readLock().lock();
        try {

            // Attempt to get from the cache first. Only pay attention to
            // the cache eviction policies.

            Object result = results.get(key);
            if (result != null) {
                return CompletableFuture.completedFuture(result);
            }

            // Attempt to get from the current executions second. Make sure
            // that the cache and executions is guarded by a read write lock.

            return executions.computeIfAbsent(key, k -> {

                CompletableFuture<Object> future = CompletableFuture.supplyAsync(() -> {
                    try {
                        return callable.call();
                    } catch (Exception e) {
                        throw new CompletionException(e);
                    }
                }, executor);

                return future.whenComplete((Object r, Throwable t) -> {

                    guard.writeLock().lock();
                    try {
                        if (executions.remove(k) != null) {
                            if (r != null) {
                                results.put(k, r);
                            }
                        }
                    } finally {
                        guard.writeLock().unlock();
                    }

                });

            });

        } finally {
            guard.readLock().unlock();
        }

    }

    public void clear() {

        // Guard the cache and executions with a write lock.

        guard.writeLock().lock();
        try {
            executions.clear();
            results.clear();
        } finally {
            guard.writeLock().unlock();
        }

    }

}

我仍然不确定如何检查这是否过于限制(锁定超出所需的锁定)。对于我的用例,可可的山货量可能需要100到3000毫秒,并且可以每15分钟左右调用清晰的方法。

也许我什至不需要executionsconcurrenthashmap,因为读取和写入受到保护。

This is the code I ended up using. I have gone through many revisions but can't find any obvious deadlocks or race conditions. I'm going leave it here just in case it becomes useful for somebody else.

public class AsyncCache {

    ExecutorService executor = Executors.newFixedThreadPool(10);
    ConcurrentHashMap<String, CompletableFuture<Object>> executions = new ConcurrentHashMap<>();

    ReentrantReadWriteLock guard = new ReentrantReadWriteLock();
    MutableConfiguration<String, Object> configuration = new MutableConfiguration<>();
    Cache<String, Object> results = Caching.getCachingProvider().getCacheManager().createCache("results", configuration);

    public CompletableFuture<Object> get(String key, Callable<Object> callable) {

        guard.readLock().lock();
        try {

            // Attempt to get from the cache first. Only pay attention to
            // the cache eviction policies.

            Object result = results.get(key);
            if (result != null) {
                return CompletableFuture.completedFuture(result);
            }

            // Attempt to get from the current executions second. Make sure
            // that the cache and executions is guarded by a read write lock.

            return executions.computeIfAbsent(key, k -> {

                CompletableFuture<Object> future = CompletableFuture.supplyAsync(() -> {
                    try {
                        return callable.call();
                    } catch (Exception e) {
                        throw new CompletionException(e);
                    }
                }, executor);

                return future.whenComplete((Object r, Throwable t) -> {

                    guard.writeLock().lock();
                    try {
                        if (executions.remove(k) != null) {
                            if (r != null) {
                                results.put(k, r);
                            }
                        }
                    } finally {
                        guard.writeLock().unlock();
                    }

                });

            });

        } finally {
            guard.readLock().unlock();
        }

    }

    public void clear() {

        // Guard the cache and executions with a write lock.

        guard.writeLock().lock();
        try {
            executions.clear();
            results.clear();
        } finally {
            guard.writeLock().unlock();
        }

    }

}

I'm still not sure how to check if this is overly restrictive (locking more than needed). For my use case, the callables can take from 100 to 3000 milliseconds, and the clear method can be called every 15 minutes or so.

Perhaps I don't even need the executions to be a ConcurrentHashMap since the read and writes are protected.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文