返回介绍

7. 分布式集合

发布于 2025-02-21 21:07:12 字数 28769 浏览 0 评论 0 收藏 0

7.1. 映射(Map)

基于 Redis 的 Redisson 的分布式映射结构的 RMap Java 对象实现了 java.util.concurrent.ConcurrentMap 接口和 java.util.Map 接口。与 HashMap 不同的是,RMap 保持了元素的插入顺序。该对象的最大容量受 Redis 限制,最大元素数量是 4 294 967 295 个。

除了同步接口外,还提供了 异步(Async)反射式(Reactive)RxJava2 标准 的接口。如果你想用 Redis Map 来保存你的 POJO 的话,可以考虑使用 分布式实时对象(Live Object) 服务。

在特定的场景下,映射缓存(Map)上的高度频繁的读取操作,使网络通信都被视为瓶颈时,可以使用 Redisson 提供的带有 本地缓存 功能的映射。

RMap<String, SomeObject> map = redisson.getMap("anyMap");
SomeObject prevObject = map.put("123", new SomeObject());
SomeObject currentObject = map.putIfAbsent("323", new SomeObject());
SomeObject obj = map.remove("123");

map.fastPut("321", new SomeObject());
map.fastRemove("321");

RFuture<SomeObject> putAsyncFuture = map.putAsync("321");
RFuture<Void> fastPutAsyncFuture = map.fastPutAsync("321");

map.fastPutAsync("321", new SomeObject());
map.fastRemoveAsync("321");

映射的字段锁的用法:

RMap<MyKey, MyValue> map = redisson.getMap("anyMap");
MyKey k = new MyKey();
RLock keyLock = map.getLock(k);
keyLock.lock();
try {
   MyValue v = map.get(k);
   // 其他业务逻辑
} finally {
   keyLock.unlock();
}

RReadWriteLock rwLock = map.getReadWriteLock(k);
rwLock.readLock().lock();
try {
   MyValue v = map.get(k);
   // 其他业务逻辑
} finally {
   keyLock.readLock().unlock();
}

7.1.1. 映射(Map)的元素淘汰(Eviction),本地缓存(LocalCache)和数据分片(Sharding)

Redisson 提供了一系列的映射类型的数据结构,这些结构按特性主要分为三大类:

  • 元素淘汰(Eviction) 类 -- 带有元素淘汰(Eviction)机制的映射类允许针对一个映射中每个元素单独设定 有效时间最长闲置时间
  • 本地缓存(LocalCache) 类 -- 本地缓存(Local Cache)也叫就近缓存(Near Cache)。这类映射的使用主要用于在特定的场景下,映射缓存(MapCache)上的高度频繁的读取操作,使网络通信都被视为瓶颈的情况。Redisson 与 Redis 通信的同时,还将部分数据保存在本地内存里。这样的设计的好处是它能将读取速度提高最多 45 倍 。 所有同名的本地缓存共用一个订阅发布话题,所有更新和过期消息都将通过该话题共享。
  • 数据分片(Sharding) 类 -- 数据分片(Sharding)类仅适用于 Redis 集群环境下,因此带有数据分片(Sharding)功能的映射也叫集群分布式映射。它利用 分库的原理 ,将单一一个映射结构切分为若干个小的映射,并均匀的分布在集群中的各个槽里。这样的设计能使一个单一映射结构突破 Redis 自身的容量限制,让其容量随集群的扩大而增长。在扩容的同时,还能够使读写性能和元素淘汰处理能力随之成线性增长。

以下列表是 Redisson 提供的所有映射的名称及其特性:

接口名称
中文名称
RedissonClient
对应的构造方法
本地缓存功能
Local Cache
数据分片功能
Sharding
元素淘汰功能
Eviction
RMap
映射
getMap()NoNoNo
RMapCache
映射缓存
getMapCache()NoNoYes
RLocalCachedMap
本地缓存映射
getLocalCachedMap()YesNoNo
RLocalCachedMap
Cache
本地缓存映射缓存
仅限于 Redisson PRO 版本
getLocalCachedMapCache()YesNoYes
RClusteredMap
集群分布式映射存
仅限于 Redisson PRO 版本
getClusteredMap()NoYesNo
RClusteredMapCache
集群分布式映射缓存存
仅限于 Redisson PRO 版本
getClusteredMapCache()NoYesYes
RClusteredLocal
CachedMap
集群分布式本地缓存映射存
仅限于 Redisson PRO 版本
getClusteredLocal
CachedMap()
YesYesNo
RClusteredLocal
CachedMapCache
集群分布式本地缓存映射缓存存
仅限于 Redisson PRO 版本
getClusteredLocal
CachedMapCache()
YesYesYes

除此以外,Redisson 还提供了 Spring CacheJCache 的实现。

元素淘汰功能(Eviction)

Redisson 的分布式的 RMapCache Java 对象在基于 RMap 的前提下实现了针对单个元素的淘汰机制。同时仍然保留了元素的插入顺序。由于 RMapCache 是基于 RMap 实现的,使它同时继承了 java.util.concurrent.ConcurrentMap 接口和 java.util.Map 接口。Redisson 提供的 Spring Cache 整合 以及 JCache 正是基于这样的功能来实现的。

目前的 Redis 自身并不支持散列(Hash)当中的元素淘汰,因此所有过期元素都是通过 org.redisson.EvictionScheduler 实例来实现定期清理的。为了保证资源的有效利用,每次运行最多清理 300 个过期元素。任务的启动时间将根据上次实际清理数量自动调整,间隔时间趋于 1 秒到 1 小时之间。比如该次清理时删除了 300 条元素,那么下次执行清理的时间将在 1 秒以后(最小间隔时间)。一旦该次清理数量少于上次清理数量,时间间隔将增加 1.5 倍。

RMapCache<String, SomeObject> map = redisson.getMapCache("anyMap");
// 有效时间 ttl = 10 分钟
map.put("key1", new SomeObject(), 10, TimeUnit.MINUTES);
// 有效时间 ttl = 10 分钟,最长闲置时间 maxIdleTime = 10 秒钟
map.put("key1", new SomeObject(), 10, TimeUnit.MINUTES, 10, TimeUnit.SECONDS);

// 有效时间 = 3 秒钟
map.putIfAbsent("key2", new SomeObject(), 3, TimeUnit.SECONDS);
// 有效时间 ttl = 40 秒钟,最长闲置时间 maxIdleTime = 10 秒钟
map.putIfAbsent("key2", new SomeObject(), 40, TimeUnit.SECONDS, 10, TimeUnit.SECONDS);

本地缓存功能(Local Cache)

在特定的场景下,映射(Map)上的高度频繁的读取操作,使网络通信都被视为瓶颈时,使用 Redisson 提供的带有本地缓存功能的分布式本地缓存映射 RLocalCachedMap Java 对象会是一个很好的选择。它同时实现了 java.util.concurrent.ConcurrentMapjava.util.Map 两个接口。本地缓存功能充分的利用了 JVM 的自身内存空间,对部分常用的元素实行就地缓存,这样的设计让读取操作的性能较 分布式映射 相比提高最多 45 倍 。以下配置参数可以用来创建这个实例:

LocalCachedMapOptions options = LocalCachedMapOptions.defaults()
      // 用于淘汰清除本地缓存内的元素
      // 共有以下几种选择:
      // LFU - 统计元素的使用频率,淘汰用得最少(最不常用)的。
      // LRU - 按元素使用时间排序比较,淘汰最早(最久远)的。
      // SOFT - 元素用 Java 的 WeakReference 来保存,缓存元素通过 GC 过程清除。
      // WEAK - 元素用 Java 的 SoftReference 来保存,缓存元素通过 GC 过程清除。
      // NONE - 永不淘汰清除缓存元素。
     .evictionPolicy(EvictionPolicy.NONE)
     // 如果缓存容量值为 0 表示不限制本地缓存容量大小
     .cacheSize(1000)
      // 以下选项适用于断线原因造成了未收到本地缓存更新消息的情况。
      // 断线重连的策略有以下几种:
      // CLEAR - 如果断线一段时间以后则在重新建立连接以后清空本地缓存
      // LOAD - 在服务端保存一份 10 分钟的作废日志
      //        如果 10 分钟内重新建立连接,则按照作废日志内的记录清空本地缓存的元素
      //        如果断线时间超过了这个时间,则将清空本地缓存中所有的内容
      // NONE - 默认值。断线重连时不做处理。
     .reconnectionStrategy(ReconnectionStrategy.NONE)
      // 以下选项适用于不同本地缓存之间相互保持同步的情况
      // 缓存同步策略有以下几种:
      // INVALIDATE - 默认值。当本地缓存映射的某条元素发生变动时,同时驱逐所有相同本地缓存映射内的该元素
      // UPDATE - 当本地缓存映射的某条元素发生变动时,同时更新所有相同本地缓存映射内的该元素
      // NONE - 不做任何同步处理
     .syncStrategy(SyncStrategy.INVALIDATE)
      // 每个 Map 本地缓存里元素的有效时间,默认毫秒为单位
     .timeToLive(10000)
      // 或者
     .timeToLive(10, TimeUnit.SECONDS)
      // 每个 Map 本地缓存里元素的最长闲置时间,默认毫秒为单位
     .maxIdle(10000)
      // 或者
     .maxIdle(10, TimeUnit.SECONDS);
RLocalCachedMap<String, Integer> map = redisson.getLocalCachedMap("test", options);

String prevObject = map.put("123", 1);
String currentObject = map.putIfAbsent("323", 2);
String obj = map.remove("123");

// 在不需要旧值的情况下可以使用 fast 为前缀的类似方法
map.fastPut("a", 1);
map.fastPutIfAbsent("d", 32);
map.fastRemove("b");

RFuture<String> putAsyncFuture = map.putAsync("321");
RFuture<Void> fastPutAsyncFuture = map.fastPutAsync("321");

map.fastPutAsync("321", new SomeObject());
map.fastRemoveAsync("321");

当不再使用 Map 本地缓存对象的时候应该手动销毁,如果 Redisson 对象被关闭(shutdown)了,则不用手动销毁。

RLocalCachedMap<String, Integer> map = ...
map.destroy();

如何通过加载数据的方式来降低过期淘汰事件发布信息对网络的影响

代码范例:

public void loadData(String cacheName, Map<String, String> data) {
    RLocalCachedMap<String, String> clearMap = redisson.getLocalCachedMap(cacheName, 
            LocalCachedMapOptions.defaults().cacheSize(1).syncStrategy(SyncStrategy.INVALIDATE));
    RLocalCachedMap<String, String> loadMap = redisson.getLocalCachedMap(cacheName, 
            LocalCachedMapOptions.defaults().cacheSize(1).syncStrategy(SyncStrategy.NONE));

    loadMap.putAll(data);
    clearMap.clearLocalCache();
}

数据分片功能(Sharding)

Map 数据分片是 Redis 集群模式下的一个功能。Redisson 提供的分布式集群映射 RClusteredMap Java 对象也是基于 RMap 实现的。它同时实现了 java.util.concurrent.ConcurrentMapjava.util.Map 两个接口。在 这里 可以获取更多的内部信息。

RClusteredMap<String, SomeObject> map = redisson.getClusteredMap("anyMap");

SomeObject prevObject = map.put("123", new SomeObject());
SomeObject currentObject = map.putIfAbsent("323", new SomeObject());
SomeObject obj = map.remove("123");

map.fastPut("321", new SomeObject());
map.fastRemove("321");

7.1.2. 映射持久化方式(缓存策略)

Redisson 供了将映射中的数据持久化到外部储存服务的功能。主要场景有一下几种:

  1. 将 Redisson 的分布式映射类型作为业务和外部储存媒介之间的缓存。
  2. 或是用来增加 Redisson 映射类型中数据的持久性,或是用来增加已被驱逐的数据的寿命。
  3. 或是用来缓存数据库,Web 服务或其他数据源的数据。

Read-through 策略

通俗的讲,如果一个被请求的数据不存在于 Redisson 的映射中的时候,Redisson 将通过预先配置好的 MapLoader 对象加载数据。

Write-through(数据同步写入)策略

在遇到映射中某条数据被更改时,Redisson 会首先通过预先配置好的 MapWriter 对象写入到外部储存系统,然后再更新 Redis 内的数据。

Write-behind(数据异步写入)策略

对映射的数据的更改会首先写入到 Redis,然后再使用异步的方式,通过 MapWriter 对象写入到外部储存系统。在并发环境下可以通过 writeBehindThreads 参数来控制写入线程的数量,已达到对外部储存系统写入并发量的控制。

以上策略适用于所有实现了 RMapRMapCacheRLocalCachedMapRLocalCachedMapCache 接口的对象。

配置范例:

MapOptions<K, V> options = MapOptions.<K, V>defaults()
                              .writer(myWriter)
                              .loader(myLoader);

RMap<K, V> map = redisson.getMap("test", options);
// 或
RMapCache<K, V> map = redisson.getMapCache("test", options);
// 或
RLocalCachedMap<K, V> map = redisson.getLocalCachedMap("test", options);
// 或
RLocalCachedMapCache<K, V> map = redisson.getLocalCachedMapCache("test", options);

7.1.3. 映射监听器(Map Listener)

Redisson 为所有实现了 RMapCacheRLocalCachedMapCache 接口的对象提供了监听以下事件的监听器:

事件 | 监听器 元素 添加 事件 | org.redisson.api.map.event.EntryCreatedListener
元素 过期 事件 | org.redisson.api.map.event.EntryExpiredListener
元素 删除 事件 | org.redisson.api.map.event.EntryRemovedListener
元素 更新 事件 | org.redisson.api.map.event.EntryUpdatedListener

使用范例:

RMapCache<String, Integer> map = redisson.getMapCache("myMap");
// 或
RLocalCachedMapCache<String, Integer> map = redisson.getLocalCachedMapCache("myMap", options);

int updateListener = map.addListener(new EntryUpdatedListener<Integer, Integer>() {
     @Override
     public void onUpdated(EntryEvent<Integer, Integer> event) {
          event.getKey(); // 字段名
          event.getValue() // 新值
          event.getOldValue() // 旧值
          // ...
     }
});

int createListener = map.addListener(new EntryCreatedListener<Integer, Integer>() {
     @Override
     public void onCreated(EntryEvent<Integer, Integer> event) {
          event.getKey(); // 字段名
          event.getValue() // 值
          // ...
     }
});

int expireListener = map.addListener(new EntryExpiredListener<Integer, Integer>() {
     @Override
     public void onExpired(EntryEvent<Integer, Integer> event) {
          event.getKey(); // 字段名
          event.getValue() // 值
          // ...
     }
});

int removeListener = map.addListener(new EntryRemovedListener<Integer, Integer>() {
     @Override
     public void onRemoved(EntryEvent<Integer, Integer> event) {
          event.getKey(); // 字段名
          event.getValue() // 值
          // ...
     }
});

map.removeListener(updateListener);
map.removeListener(createListener);
map.removeListener(expireListener);
map.removeListener(removeListener);

7.1.4. LRU 有界映射

Redisson 提供了基于 Redis 的以 LRU 为驱逐策略的分布式 LRU 有界映射对象。顾名思义,分布式 LRU 有界映射允许通过对其中元素按使用时间排序处理的方式,主动移除超过规定容量限制的元素。

RMapCache<String, String> map = redisson.getMapCache("map");
// 尝试将该映射的最大容量限制设定为 10
map.trySetMaxSize(10);

// 将该映射的最大容量限制设定或更改为 10
map.setMaxSize(10);

map.put("1", "2");
map.put("3", "3", 1, TimeUnit.SECONDS);

7.2. 多值映射(Multimap)

基于 Redis 的 Redisson 的分布式 RMultimap Java 对象允许 Map 中的一个字段值包含多个元素。 字段总数受 Redis 限制,每个 Multimap 最多允许有 4 294 967 295 个不同字段。

7.2.1. 基于集(Set)的多值映射(Multimap)

基于 Set 的 Multimap 不允许一个字段值包含有重复的元素。

RSetMultimap<SimpleKey, SimpleValue> map = redisson.getSetMultimap("myMultimap");
map.put(new SimpleKey("0"), new SimpleValue("1"));
map.put(new SimpleKey("0"), new SimpleValue("2"));
map.put(new SimpleKey("3"), new SimpleValue("4"));

Set<SimpleValue> allValues = map.get(new SimpleKey("0"));

List<SimpleValue> newValues = Arrays.asList(new SimpleValue("7"), new SimpleValue("6"), new SimpleValue("5"));
Set<SimpleValue> oldValues = map.replaceValues(new SimpleKey("0"), newValues);

Set<SimpleValue> removedValues = map.removeAll(new SimpleKey("0"));

7.2.2. 基于列表(List)的多值映射(Multimap)

基于 List 的 Multimap 在保持插入顺序的同时允许一个字段下包含重复的元素。

RListMultimap<SimpleKey, SimpleValue> map = redisson.getListMultimap("test1");
map.put(new SimpleKey("0"), new SimpleValue("1"));
map.put(new SimpleKey("0"), new SimpleValue("2"));
map.put(new SimpleKey("0"), new SimpleValue("1"));
map.put(new SimpleKey("3"), new SimpleValue("4"));

List<SimpleValue> allValues = map.get(new SimpleKey("0"));

Collection<SimpleValue> newValues = Arrays.asList(new SimpleValue("7"), new SimpleValue("6"), new SimpleValue("5"));
List<SimpleValue> oldValues = map.replaceValues(new SimpleKey("0"), newValues);

List<SimpleValue> removedValues = map.removeAll(new SimpleKey("0"));

7.2.3. 多值映射(Multimap)淘汰机制(Eviction)

Multimap 对象的淘汰机制是通过不同的接口来实现的。它们是 RSetMultimapCache 接口和 RListMultimapCache 接口,分别对应的是 Set 和 List 的 Multimaps。

所有过期元素都是通过 org.redisson.EvictionScheduler 实例来实现定期清理的。为了保证资源的有效利用,每次运行最多清理 100 个过期元素。任务的启动时间将根据上次实际清理数量自动调整,间隔时间趋于 1 秒到 2 小时之间。比如该次清理时删除了 100 条元素,那么下次执行清理的时间将在 1 秒以后(最小间隔时间)。一旦该次清理数量少于上次清理数量,时间间隔将增加 1.5 倍。

RSetMultimapCache 的使用范例:

RSetMultimapCache<String, String> multimap = redisson.getSetMultimapCache("myMultimap");
multimap.put("1", "a");
multimap.put("1", "b");
multimap.put("1", "c");

multimap.put("2", "e");
multimap.put("2", "f");

multimap.expireKey("2", 10, TimeUnit.MINUTES);

7.3. 集(Set)

基于 Redis 的 Redisson 的分布式 Set 结构的 RSet Java 对象实现了 java.util.Set 接口。通过元素的相互状态比较保证了每个元素的唯一性。该对象的最大容量受 Redis 限制,最大元素数量是 4 294 967 295 个。

RSet<SomeObject> set = redisson.getSet("anySet");
set.add(new SomeObject());
set.remove(new SomeObject());

Redisson PRO 版本中的 Set 对象还可以在 Redis 集群环境下支持 单集合数据分片

7.3.1. 集(Set)淘汰机制(Eviction)

基于 Redis 的 Redisson 的分布式 RSetCache Java 对象在基于 RSet 的前提下实现了针对单个元素的淘汰机制。由于 RSetCache 是基于 RSet 实现的,使它还集成了 java.util.Set 接口。

目前的 Redis 自身并不支持 Set 当中的元素淘汰,因此所有过期元素都是通过 org.redisson.EvictionScheduler 实例来实现定期清理的。为了保证资源的有效利用,每次运行最多清理 100 个过期元素。任务的启动时间将根据上次实际清理数量自动调整,间隔时间趋于 1 秒到 2 小时之间。比如该次清理时删除了 100 条元素,那么下次执行清理的时间将在 1 秒以后(最小间隔时间)。一旦该次清理数量少于上次清理数量,时间间隔将增加 1.5 倍。

RSetCache<SomeObject> set = redisson.getSetCache("anySet");
// ttl = 10 seconds
set.add(new SomeObject(), 10, TimeUnit.SECONDS);

7.3.2. 集(Set)数据分片(Sharding)

Set 数据分片是 Redis 集群模式下的一个功能。Redisson 提供的分布式 RClusteredSet Java 对象也是基于 RSet 实现的。在 这里 可以获取更多的信息。

RClusteredSet<SomeObject> set = redisson.getClusteredSet("anySet");
set.add(new SomeObject());
set.remove(new SomeObject());

除了 RClusteredSet 以外,Redisson 还提供了另一种集群模式下的分布式集(Set),它不仅提供了透明的数据分片功能,还为每个元素提供了淘汰机制。 RClusteredSetCache 类分别同时提供了 RClusteredSetRSetCache 这两个接口的实现。当然这些都是基于 java.util.Set 的接口实现上的。

该功能仅限于 Redisson PRO 版本。

7.4. 有序集(SortedSet)

基于 Redis 的 Redisson 的分布式 RSortedSet Java 对象实现了 java.util.SortedSet 接口。在保证元素唯一性的前提下,通过比较器(Comparator)接口实现了对元素的排序。

RSortedSet<Integer> set = redisson.getSortedSet("anySet");
set.trySetComparator(new MyComparator()); // 配置元素比较器
set.add(3);
set.add(1);
set.add(2);

set.removeAsync(0);
set.addAsync(5);

7.5. 计分排序集(ScoredSortedSet)

基于 Redis 的 Redisson 的分布式 RScoredSortedSet Java 对象是一个可以按插入时指定的元素评分排序的集合。它同时还保证了元素的唯一性。

RScoredSortedSet<SomeObject> set = redisson.getScoredSortedSet("simple");

set.add(0.13, new SomeObject(a, b));
set.addAsync(0.251, new SomeObject(c, d));
set.add(0.302, new SomeObject(g, d));

set.pollFirst();
set.pollLast();

int index = set.rank(new SomeObject(g, d)); // 获取元素在集合中的位置
Double score = set.getScore(new SomeObject(g, d)); // 获取元素的评分

7.6. 字典排序集(LexSortedSet)

基于 Redis 的 Redisson 的分布式 RLexSortedSet Java 对象在实现了 java.util.Set<String> 接口的同时,将其中的所有字符串元素按照字典顺序排列。它公式还保证了字符串元素的唯一性。

RLexSortedSet set = redisson.getLexSortedSet("simple");
set.add("d");
set.addAsync("e");
set.add("f");

set.lexRangeTail("d", false);
set.lexCountHead("e");
set.lexRange("d", true, "z", false);

7.7. 列表(List)

基于 Redis 的 Redisson 分布式列表(List)结构的 RList Java 对象在实现了 java.util.List 接口的同时,确保了元素插入时的顺序。该对象的最大容量受 Redis 限制,最大元素数量是 4 294 967 295 个。

RList<SomeObject> list = redisson.getList("anyList");
list.add(new SomeObject());
list.get(0);
list.remove(new SomeObject());

7.8. 队列(Queue)

基于 Redis 的 Redisson 分布式无界队列(Queue)结构的 RQueue Java 对象实现了 java.util.Queue 接口。尽管 RQueue 对象无初始大小(边界)限制,但对象的最大容量受 Redis 限制,最大元素数量是 4 294 967 295 个。

RQueue<SomeObject> queue = redisson.getQueue("anyQueue");
queue.add(new SomeObject());
SomeObject obj = queue.peek();
SomeObject someObj = queue.poll();

7.9. 双端队列(Deque)

基于 Redis 的 Redisson 分布式无界双端队列(Deque)结构的 RDeque Java 对象实现了 java.util.Deque 接口。尽管 RDeque 对象无初始大小(边界)限制,但对象的最大容量受 Redis 限制,最大元素数量是 4 294 967 295 个。

RDeque<SomeObject> queue = redisson.getDeque("anyDeque");
queue.addFirst(new SomeObject());
queue.addLast(new SomeObject());
SomeObject obj = queue.removeFirst();
SomeObject someObj = queue.removeLast();

7.10. 阻塞队列(Blocking Queue)

基于 Redis 的 Redisson 分布式无界阻塞队列(Blocking Queue)结构的 RBlockingQueue Java 对象实现了 java.util.concurrent.BlockingQueue 接口。尽管 RBlockingQueue 对象无初始大小(边界)限制,但对象的最大容量受 Redis 限制,最大元素数量是 4 294 967 295 个。

RBlockingQueue<SomeObject> queue = redisson.getBlockingQueue("anyQueue");
queue.offer(new SomeObject());

SomeObject obj = queue.peek();
SomeObject someObj = queue.poll();
SomeObject ob = queue.poll(10, TimeUnit.MINUTES);

poll , pollFromAny , pollLastAndOfferFirstTotake 方法内部采用话题订阅发布实现,在 Redis 节点故障转移(主从切换)或断线重连以后,内置的相关话题监听器将自动完成话题的重新订阅。

7.11. 有界阻塞队列(Bounded Blocking Queue)

基于 Redis 的 Redisson 分布式有界阻塞队列(Bounded Blocking Queue)结构的 RBoundedBlockingQueue Java 对象实现了 java.util.concurrent.BlockingQueue 接口。该对象的最大容量受 Redis 限制,最大元素数量是 4 294 967 295 个。队列的初始容量(边界)必须在使用前设定好。

RBoundedBlockingQueue<SomeObject> queue = redisson.getBoundedBlockingQueue("anyQueue");
// 如果初始容量(边界)设定成功则返回`真(true)`,
// 如果初始容量(边界)已近存在则返回`假(false)`。
queue.trySetCapacity(2);

queue.offer(new SomeObject(1));
queue.offer(new SomeObject(2));
// 此时容量已满,下面代码将会被阻塞,直到有空闲为止。
queue.put(new SomeObject());

SomeObject obj = queue.peek();
SomeObject someObj = queue.poll();
SomeObject ob = queue.poll(10, TimeUnit.MINUTES);

poll , pollFromAny , pollLastAndOfferFirstTotake 方法内部采用话题订阅发布实现,在 Redis 节点故障转移(主从切换)或断线重连以后,内置的相关话题监听器将自动完成话题的重新订阅。

7.12. 阻塞双端队列(Blocking Deque)

基于 Redis 的 Redisson 分布式无界阻塞双端队列(Blocking Deque)结构的 RBlockingDeque Java 对象实现了 java.util.concurrent.BlockingDeque 接口。尽管 RBlockingDeque 对象无初始大小(边界)限制,但对象的最大容量受 Redis 限制,最大元素数量是 4 294 967 295 个。

RBlockingDeque<Integer> deque = redisson.getBlockingDeque("anyDeque");
deque.putFirst(1);
deque.putLast(2);
Integer firstValue = queue.takeFirst();
Integer lastValue = queue.takeLast();
Integer firstValue = queue.pollFirst(10, TimeUnit.MINUTES);
Integer lastValue = queue.pollLast(3, TimeUnit.MINUTES);

poll , pollFromAny , pollLastAndOfferFirstTotake 方法内部采用话题订阅发布实现,在 Redis 节点故障转移(主从切换)或断线重连以后,内置的相关话题监听器将自动完成话题的重新订阅。

7.13. 阻塞公平队列(Blocking Fair Queue)

基于 Redis 的 Redisson 分布式无界阻塞公平队列(Blocking Fair Queue)结构的 RBlockingFairQueue Java 对象在实现 Redisson 分布式无界阻塞队列(Blocking Queue)结构 RBlockingQueue 接口的基础上,解决了多个队列消息的处理者在复杂的网络环境下,网络延时的影响使“较远”的客户端最终收到消息数量低于“较近”的客户端的问题。从而解决了这种现象引发的个别处理节点过载的情况。

以分布式无界阻塞队列为基础,采用公平获取消息的机制,不仅保证了 pollpollFromAnypollLastAndOfferFirstTotake 方法获取消息的先入顺序,还能让队列里的消息被均匀的发布到处在复杂分布式环境中的各个处理节点里。

RBlockingFairQueue queue = redisson.getBlockingFairQueue("myQueue");
queue.offer(new SomeObject());

SomeObject obj = queue.peek();
SomeObject someObj = queue.poll();
SomeObject ob = queue.poll(10, TimeUnit.MINUTES);

该功能仅限于 Redisson PRO 版本。

7.14. 阻塞公平双端队列(Blocking Fair Deque)

基于 Redis 的 Redisson 分布式无界阻塞公平双端队列(Blocking Fair Deque)结构的 RBlockingFairDeque Java 对象在实现 Redisson 分布式无界阻塞双端队列(Blocking Deque)结构 RBlockingDeque 接口的基础上,解决了多个队列消息的处理者在复杂的网络环境下,网络延时的影响使“较远”的客户端最终收到消息数量低于“较近”的客户端的问题。从而解决了这种现象引发的个别处理节点过载的情况。

以分布式无界阻塞双端队列为基础,采用公平获取消息的机制,不仅保证了 polltakepollFirsttakeFirstpollLasttakeLast 方法获取消息的先入顺序,还能让队列里的消息被均匀的发布到处在复杂分布式环境中的各个处理节点里。

RBlockingFairDeque deque = redisson.getBlockingFairDeque("myDeque");
deque.offer(new SomeObject());

SomeObject firstElement = queue.peekFirst();
SomeObject firstElement = queue.pollFirst();
SomeObject firstElement = queue.pollFirst(10, TimeUnit.MINUTES);
SomeObject firstElement = queue.takeFirst();

SomeObject lastElement = queue.peekLast();
SomeObject lastElement = queue.pollLast();
SomeObject lastElement = queue.pollLast(10, TimeUnit.MINUTES);
SomeObject lastElement = queue.takeLast();

该功能仅限于 Redisson PRO 版本。

7.15. 延迟队列(Delayed Queue)

基于 Redis 的 Redisson 分布式延迟队列(Delayed Queue)结构的 RDelayedQueue Java 对象在实现了 RQueue 接口的基础上提供了向队列按要求延迟添加项目的功能。该功能可以用来实现消息传送延迟按几何增长或几何衰减的发送策略。

RQueue<String> distinationQueue = ...
RDelayedQueue<String> delayedQueue = getDelayedQueue(distinationQueue);
// 10 秒钟以后将消息发送到指定队列
delayedQueue.offer("msg1", 10, TimeUnit.SECONDS);
// 一分钟以后将消息发送到指定队列
delayedQueue.offer("msg2", 1, TimeUnit.MINUTES);

在该对象不再需要的情况下,应该主动销毁。仅在相关的 Redisson 对象也需要关闭的时候可以不用主动销毁。

RDelayedQueue<String> delayedQueue = ...
delayedQueue.destroy();

7.16. 优先队列(Priority Queue)

基于 Redis 的 Redisson 分布式优先队列(Priority Queue)Java 对象实现了 java.util.Queue 的接口。可以通过比较器(Comparator)接口来对元素排序。

RPriorityQueue<Integer> queue = redisson.getPriorityQueue("anyQueue");
queue.trySetComparator(new MyComparator()); // 指定对象比较器
queue.add(3);
queue.add(1);
queue.add(2);

queue.removeAsync(0);
queue.addAsync(5);

queue.poll();

7.17. 优先双端队列(Priority Deque)

基于 Redis 的 Redisson 分布式优先双端队列(Priority Deque)Java 对象实现了 java.util.Deque 的接口。可以通过比较器(Comparator)接口来对元素排序。

RPriorityDeque<Integer> queue = redisson.getPriorityDeque("anyQueue");
queue.trySetComparator(new MyComparator()); // 指定对象比较器
queue.addLast(3);
queue.addFirst(1);
queue.add(2);

queue.removeAsync(0);
queue.addAsync(5);

queue.pollFirst();
queue.pollLast();

7.18. 优先阻塞队列(Priority Blocking Queue)

基于 Redis 的分布式无界优先阻塞队列(Priority Blocking Queue)Java 对象的结构与 java.util.concurrent.PriorityBlockingQueue 类似。可以通过比较器(Comparator)接口来对元素排序。 PriorityBlockingQueue 的最大容量是 4 294 967 295 个元素。

RPriorityBlockingQueue<Integer> queue = redisson.getPriorityBlockingQueue("anyQueue");
queue.trySetComparator(new MyComparator()); // 指定对象比较器
queue.add(3);
queue.add(1);
queue.add(2);

queue.removeAsync(0);
queue.addAsync(5);

queue.take();

当 Redis 服务端断线重连以后,或 Redis 服务端发生主从切换,并重新建立连接后,断线时正在使用 pollpollLastAndOfferFirstTotake 方法的对象 Redisson 将自动再次为其订阅相关的话题。

7.19. 优先阻塞双端队列(Priority Blocking Deque)

基于 Redis 的分布式无界优先阻塞双端队列(Priority Blocking Deque)Java 对象实现了 java.util.Deque 的接口。 addLastaddFirstpush 方法不能再这个对里使用。 PriorityBlockingDeque 的最大容量是 4 294 967 295 个元素。

RPriorityBlockingDeque<Integer> queue = redisson.getPriorityBlockingDeque("anyQueue");
queue.trySetComparator(new MyComparator()); // 指定对象比较器
queue.add(2);

queue.removeAsync(0);
queue.addAsync(5);

queue.pollFirst();
queue.pollLast();
queue.takeFirst();
queue.takeLast();

当 Redis 服务端断线重连以后,或 Redis 服务端发生主从切换,并重新建立连接后,断线时正在使用 pollpollLastAndOfferFirstTotake 方法的对象 Redisson 将自动再次为其订阅相关的话题。

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
    原文