- 1.2 EMQ简介
- 1.3 快速开始
- 1.3.1 集群信息
- 1.3.2 控制台管理示例
- 1.3.3 代码示例
- 1.3.4 SDK
- 1.4 基础知识
- 1.4.1 基本概念
- 1.4.2 消息状态和接收模型
- 1.5 常见问题
- 1.5.1 Queue
- 1.5.2 Message
- 1.5.3 常见异常
- 1.5.4 其他
- 1.6 Queue API
- 1.6.1 创建/删除/清理 Queue
- 1.6.2 设置 Queue 属性/配额
- 1.6.3 获取 Queue 信息
- 1.6.4 Queue 权限操作
- 1.6.5 拷贝 Queue 元数据
- 1.7 Message API
- 1.7.1 发送 Message
- 1.7.2 接收 Message
- 1.7.3 更改 Message 不可见时间
- 1.7.4 删除(ACK) Message
- 1.7.5 重新驱动 Message
- 1.7.6 查询 Message
- 1.7.7 直接删除 Message
- 1.8 报警/统计 API
- 1.8.1 设置/获取 用户联系/配额信息
- 1.8.2 添加/移除/获取 Queue 报警策略
- 1.8.3 设置/获取 Queue 每日消息统计
- 1.9 高级队列功能
- 1.9.1 多读者/Tag
- 1.9.1.1 模型介绍
- 1.9.1.2 相关API
- 1.9.2 死信队列
- 1.9.3 Topic Queue
- 1.9.4 Priority Queue
- 1.10 计费规则
- 1.11 新旧版认证迁移
文章来源于网络收集而来,版权归原创者所有,如有侵权请及时联系!
1.3.3 代码示例
代码示例 (Java)
初始化:
String endpoint = "http://staging.emq.api.xiaomi.com";
// 设置ak sk
Credential credential = new Credential().setSecretKeyId(secretKeyId).
setSecretKey(secretKey).setType(UserType.APP_SECRET);
EMQClientFactory clientFactory = new EMQClientFactory(credential,
generateHttpClient(10, 10));
// properties配置
Properties properties = new Properties();
properties.setProperty("galaxy.emq.service.endpoint", endpoint); // endpoint
properties.setProperty("galaxy.emq.client.timeout", String.valueOf(30000)); // 可选 socket 超时时间
properties.setProperty("galaxy.emq.client.conn.timeout", String.valueOf(60000)); // 可选 客户端超时时间
properties.setProperty("galaxy.emq.client.auto.retry", String.valueOf(true)); // 可选 是否重试
properties.setProperty("galaxy.emq.client.retry.number", String.valueOf(3)); // 可选 重试次数
EMQClientConfig config = new EMQClientConfig(properties);
QueueService.Iface queueClient = clientFactory.newQueueClient(config);
MessageService.Iface messageClient = clientFactory.newMessageClient(config);
创建队列:
CreateQueueRequest createQueueRequest = new CreateQueueRequest(name);
CreateQueueResponse createQueueResponse = queueClient.createQueue(
createQueueRequest);
// 之后的操作都需要使用这里的queueName,而不是第一行中的name
String queueName = createQueueResponse.getQueueName();
发送者:
while(true) {
SendMessageRequest sendMessageRequest =
new SendMessageRequest(queueName, messageBody);
SendMessageResponse sendMessageResponse =
messageClient.sendMessage(sendMessageRequest);
}
接收者:
while(true) {
ReceiveMessageRequest receiveMessageRequest =
new ReceiveMessageRequest(queueName);
List<ReceiveMessageResponse> receiveMessageResponse =
messageClient.receiveMessage(receiveMessageRequest);
if (!receiveMessageResponse.isEmpty()) {
// process receiveMessageResponse
DeleteMessageBatchRequest deleteMessageBatchRequest =
new DeleteMessageBatchRequest();
deleteMessageBatchRequest.setQueueName(queueName);
for (ReceiveMessageResponse response : receiveMessageResponse) {
deleteMessageBatchRequest.addToDeleteMessageBatchRequestEntryList(
new DeleteMessageBatchRequestEntry(response.getReceiptHandle()));
}
messageClient.deleteMessageBatch(deleteMessageBatchRequest);
}
}
删除队列:
DeleteQueueRequest deleteQueueRequest = new DeleteQueueRequest(queueName);
queueClient.deleteQueue(deleteQueueRequest);
更多更详细的示例请参考各语言SDK下的Example, 或直接参考:SDK Example
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论