- 1.2 EMQ简介
- 1.3 快速开始
- 1.3.1 集群信息
- 1.3.2 控制台管理示例
- 1.3.3 代码示例
- 1.3.4 SDK
- 1.4 基础知识
- 1.4.1 基本概念
- 1.4.2 消息状态和接收模型
- 1.5 常见问题
- 1.5.1 Queue
- 1.5.2 Message
- 1.5.3 常见异常
- 1.5.4 其他
- 1.6 Queue API
- 1.6.1 创建/删除/清理 Queue
- 1.6.2 设置 Queue 属性/配额
- 1.6.3 获取 Queue 信息
- 1.6.4 Queue 权限操作
- 1.6.5 拷贝 Queue 元数据
- 1.7 Message API
- 1.7.1 发送 Message
- 1.7.2 接收 Message
- 1.7.3 更改 Message 不可见时间
- 1.7.4 删除(ACK) Message
- 1.7.5 重新驱动 Message
- 1.7.6 查询 Message
- 1.7.7 直接删除 Message
- 1.8 报警/统计 API
- 1.8.1 设置/获取 用户联系/配额信息
- 1.8.2 添加/移除/获取 Queue 报警策略
- 1.8.3 设置/获取 Queue 每日消息统计
- 1.9 高级队列功能
- 1.9.1 多读者/Tag
- 1.9.1.1 模型介绍
- 1.9.1.2 相关API
- 1.9.2 死信队列
- 1.9.3 Topic Queue
- 1.9.4 Priority Queue
- 1.10 计费规则
- 1.11 新旧版认证迁移
1.9.3 Topic Queue
TopicQueue是为了保证Queue中Message的局部有序性(同一Topic内有序)而引入的概念。
用户在创建Queue时,将Queue的topicQueue属性设置为True,得到的Queue即是TopicQueue。
之后,向TopicQueue中发送Message时,用户可以为Message设置String类型的Topic属性(也可以不指定,即Topic为空)。 EMQ将保证用户接收Message时,相同Topic的Message严格按照MessageId的顺序送达; 并且,如果用户未对MessageId较小的Message进行deleteMessage()
, 同一Topic下MessageId较大Message不会被任何一个Client接收到.
注
MessageId的大小依据sendTime+delayTime.
空Topic和任何Topic都不相同,即Topic为空的Message没有“有序性”,其行为与nonTopic中的Message一样.
TopicQueue中的Message同样具有“超时重发”的特性,以保证消息队列at-lease once的送达语义; 重发现象出现后,可能会导致客户端的某两个并发(线程)收到同样的消息,从而打乱消息的顺序。详细举例如下:
相同Topic的Message,依messageId的大小为m1, m2, m3
用户启动了两个线程来处理Message, t1, t2;超时设置为30秒
t1收到m1,进行处理;处理过程中,m2, m3被EMQ锁定,无法被任何线程接收;
t1很快处理完m1,并使用`deleteMessage()`进行ACK。ACK后,Message解锁,m2被t2接收;m3被锁定
t2收到m2后,尚未开始处理,即被操作系统挂起(如Java GC)。挂起时间超过30秒
超时后,m2再次变为visible,从而被t1接收处理。t1对m2 deleteMessage()后,m3被解锁,从而被t1接收并处理
t2挂起结束后,并未感知m2已被t1处理,于是继续处理m2
在上述过程时,由于“超时重发”的特性,从EMQ Server看来,Message的receive顺序是m1, m2, m2, m3
但从用户整体的处理来看,处理顺序为m1, m2, m3, m2
在这种情况下,Message的有序性遭到了破坏。
需要指出,具有auto-recovery功能的消息系统可能都会存在类似的问题。
如果需要严格有序性,需要用户在Client端配合。
比如Client记录处理的最后一条Message的messageId,然后简单丢弃receive到的messageId更大的Message。 这样,不但保证了Message的严格有序,同时也实现了at-most once的语义,即保证了Message处理不丢不重。
TopicQueue在实现方法上,是通过HASH算法将相同Topic的Message放入Queue中的同一Partition,因此可能会有如下的一些性能损失。如果一个队列中的Message没有有序性的需求,请尽量将其创建为nonTopicQueue:
- Partition的数量不可调整
对于nonTopicQueue,用户可以在流量增大时增加Partition数量,以提高Queue的并发性; 但在目前的实现中(以后可能会改进),TopicQueue的Partition数量一经确认,不可再次调整。 用户需要在createQueue()
时预估流量,设置合适的Partition数量。对于size在1 KB左右的Message,可以认为一个Partition的通过能力在500 Message/sec. - 不同Topic Message之间的相互影响
一个Partition中包含一到多个Topic;
如果某个Topic的Message长时间一直未被成功处理,可能会影响同一Partition内所有Message. - Message在Partition之间分布不均匀
指定了Topic的Message只能被send至Queue中某一确定的Partition,如果此Partition对应的底层结构暂时不可用,将导致sendMessage()
失败;
未指定Topic的Message(包括nonTopicQueue的Message和TopicQueue中未指定Topic的Message)将随机地进入某个Partition,并在此Partition不可用时,选择另外的Partition重试.
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论