RabbitMQ 中的延迟消息

发布于 2024-10-07 18:47:17 字数 84 浏览 9 评论 0原文

是否可以通过 RabbitMQ 发送消息,但有一定的延迟? 例如,我想在 30 分钟后使客户端会话过期,并且我发送一条消息,该消息将在 30 分钟后处理。

Is it possible to send message via RabbitMQ with some delay?
For example I want to expire client session after 30 minutes, and I send a message which will be processed after 30 minutes.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(8

梦冥 2024-10-14 18:47:18

感谢 Norman 的回答,我可以在 Node.js 中实现它。

从代码中一切都非常清楚。

var ch = channel;
ch.assertExchange("my_intermediate_exchange", 'fanout', {durable: false});
ch.assertExchange("my_final_delayed_exchange", 'fanout', {durable: false});

// setup intermediate queue which will never be listened.
// all messages are TTLed so when they are "dead", they come to another exchange
ch.assertQueue("my_intermediate_queue", {
      deadLetterExchange: "my_final_delayed_exchange",
      messageTtl: 5000, // 5sec
}, function (err, q) {
      ch.bindQueue(q.queue, "my_intermediate_exchange", '');
});

ch.assertQueue("my_final_delayed_queue", {}, function (err, q) {
      ch.bindQueue(q.queue, "my_final_delayed_exchange", '');

      ch.consume(q.queue, function (msg) {
          console.log("delayed - [x] %s", msg.content.toString());
      }, {noAck: true});
});

Thanks to Norman's answer, I could implement it in Node.js.

Everything is pretty clear from the code.

var ch = channel;
ch.assertExchange("my_intermediate_exchange", 'fanout', {durable: false});
ch.assertExchange("my_final_delayed_exchange", 'fanout', {durable: false});

// setup intermediate queue which will never be listened.
// all messages are TTLed so when they are "dead", they come to another exchange
ch.assertQueue("my_intermediate_queue", {
      deadLetterExchange: "my_final_delayed_exchange",
      messageTtl: 5000, // 5sec
}, function (err, q) {
      ch.bindQueue(q.queue, "my_intermediate_exchange", '');
});

ch.assertQueue("my_final_delayed_queue", {}, function (err, q) {
      ch.bindQueue(q.queue, "my_final_delayed_exchange", '');

      ch.consume(q.queue, function (msg) {
          console.log("delayed - [x] %s", msg.content.toString());
      }, {noAck: true});
});
纵山崖 2024-10-14 18:47:18

由于我没有足够的声誉来添加评论,因此发布了新答案。这只是对 http: //www.javacodegeeks.com/2012/04/rabbitmq-scheduled-message-delivery.html

除了在消息上设置 ttl 之外,您还可以在队列级别设置它。此外,您还可以避免仅仅为了将消息重定向到不同的队列而创建新的交换。以下是示例 Java 代码:

生产者:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.util.HashMap;
import java.util.Map;

public class DelayedProducer {
    private final static String QUEUE_NAME = "ParkingQueue";
    private final static String DESTINATION_QUEUE_NAME = "DestinationQueue";

    public static void main(String[] args) throws Exception{
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("localhost");
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();

        Map<String, Object> arguments = new HashMap<String, Object>();
        arguments.put("x-message-ttl", 10000);
        arguments.put("x-dead-letter-exchange", "");
        arguments.put("x-dead-letter-routing-key", DESTINATION_QUEUE_NAME );
        channel.queueDeclare(QUEUE_NAME, false, false, false, arguments);

        for (int i=0; i<5; i++) {
            String message = "This is a sample message " + i;
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
            System.out.println("message "+i+" got published to the queue!");
            Thread.sleep(3000);
        }

        channel.close();
        connection.close();
    }
}

消费者:

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.QueueingConsumer;

public class Consumer {
   private final static String DESTINATION_QUEUE_NAME = "DestinationQueue";

    public static void main(String[] args) throws Exception{
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        QueueingConsumer consumer = new QueueingConsumer(channel);
        boolean autoAck = false;
        channel.basicConsume(DESTINATION_QUEUE_NAME, autoAck, consumer);

        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println(" [x] Received '" + message + "'");
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    }
}

As I don't have enough reputation to add comment, posting a new answer. This is just an addition to what has been already discussed at http://www.javacodegeeks.com/2012/04/rabbitmq-scheduled-message-delivery.html

Except instead of setting ttl on messages, you can set it at queue level. Also you can avoid creating a new exchange just for the sake of redirecting the messages to different Queue. Here is sample Java code:

Producer:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.util.HashMap;
import java.util.Map;

public class DelayedProducer {
    private final static String QUEUE_NAME = "ParkingQueue";
    private final static String DESTINATION_QUEUE_NAME = "DestinationQueue";

    public static void main(String[] args) throws Exception{
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("localhost");
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();

        Map<String, Object> arguments = new HashMap<String, Object>();
        arguments.put("x-message-ttl", 10000);
        arguments.put("x-dead-letter-exchange", "");
        arguments.put("x-dead-letter-routing-key", DESTINATION_QUEUE_NAME );
        channel.queueDeclare(QUEUE_NAME, false, false, false, arguments);

        for (int i=0; i<5; i++) {
            String message = "This is a sample message " + i;
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
            System.out.println("message "+i+" got published to the queue!");
            Thread.sleep(3000);
        }

        channel.close();
        connection.close();
    }
}

Consumer:

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.QueueingConsumer;

public class Consumer {
   private final static String DESTINATION_QUEUE_NAME = "DestinationQueue";

    public static void main(String[] args) throws Exception{
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        QueueingConsumer consumer = new QueueingConsumer(channel);
        boolean autoAck = false;
        channel.basicConsume(DESTINATION_QUEUE_NAME, autoAck, consumer);

        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println(" [x] Received '" + message + "'");
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    }
}
梦魇绽荼蘼 2024-10-14 18:47:18

它看起来像 这篇博文描述了使用死信交换和消息 ttl 来执行类似的操作。

下面的代码使用 CoffeeScript 和 Node.js 访问 Rabbit 并实现类似的功能。

amqp   = require 'amqp'
events = require 'events'
em     = new events.EventEmitter()
conn   = amqp.createConnection()
  
key = "send.later.#{new Date().getTime()}"
conn.on 'ready', ->
  conn.queue key, {
    arguments:{
      "x-dead-letter-exchange":"immediate"
    , "x-message-ttl": 5000
    , "x-expires": 6000
    }
  }, ->
    conn.publish key, {v:1}, {contentType:'application/json'}
  
  conn.exchange 'immediate'
 
  conn.queue 'right.now.queue', {
      autoDelete: false
    , durable: true
  }, (q) ->
    q.bind('immediate', 'right.now.queue')
    q.subscribe (msg, headers, deliveryInfo) ->
      console.log msg
      console.log headers

It looks like this blog post describes using the dead letter exchange and message ttl to do something similar.

The code below uses CoffeeScript and Node.js to access Rabbit and implement something similar.

amqp   = require 'amqp'
events = require 'events'
em     = new events.EventEmitter()
conn   = amqp.createConnection()
  
key = "send.later.#{new Date().getTime()}"
conn.on 'ready', ->
  conn.queue key, {
    arguments:{
      "x-dead-letter-exchange":"immediate"
    , "x-message-ttl": 5000
    , "x-expires": 6000
    }
  }, ->
    conn.publish key, {v:1}, {contentType:'application/json'}
  
  conn.exchange 'immediate'
 
  conn.queue 'right.now.queue', {
      autoDelete: false
    , durable: true
  }, (q) ->
    q.bind('immediate', 'right.now.queue')
    q.subscribe (msg, headers, deliveryInfo) ->
      console.log msg
      console.log headers
北方。的韩爷 2024-10-14 18:47:18

目前这是不可能的。您必须将过期时间戳存储在数据库或类似的东西中,然后有一个帮助程序来读取这些时间戳并将消息排队。

延迟消息是经常需要的功能,因为它们在许多情况下都很有用。但是,如果您需要使客户端会话过期,我相信消息传递并不是您的理想解决方案,另一种方法可能会更好。

That's currently not possible. You have to store your expiration timestamps in a database or something similiar, and then have a helper program that reads those timestamps and queues a message.

Delayed messages are an often requested feature, as they're useful in many situations. However, if your need is to expire client sessions I believe that messaging is not the ideal solution for you, and that another approach might work better.

病毒体 2024-10-14 18:47:18

假设你控制了消费者,你可以像这样实现消费者的延迟?:

如果我们确定队列中的第 n 个消息始终比第 n+1 个消息具有更小的延迟(对于许多用例来说都是如此):生产者在任务中发送 timeInformation,传达该作业需要的时间要执行的时间(当前时间 + 延迟)。消费者:

1) 从任务中读取scheduledTime

2) 如果当前时间>预定时间继续。

<块引用>

否则延迟=预定时间-当前时间

<块引用>

按照延迟指示的时间睡眠


消费者总是配置有并发参数。因此,其他消息将在队列中等待,直到消费者完成作业。因此,这个解决方案可以很好地工作,尽管它看起来很尴尬,特别是对于大的时间延迟。

Suppose you had control over the consumer, you could achieve the delaying on the consumer like this??:

If we are sure that the nth message in the queue always has a smaller delay than the n+1th message (this can true for many use cases): The producer sends timeInformation in the task conveying the time at which this job needs to be executed (currentTime + delay). The consumer:

1) Reads the scheduledTime from the task

2) if currentTime > scheduledTime go ahead.

Else delay = scheduledTime - currentTime

sleep for time indicated by delay

The consumer is always configured with a concurrency parameter. So, the other messages will just wait in the queue until a consumer finishes the job. So, this solution could just work well though it looks awkward especially for big time delays.

傲娇萝莉攻 2024-10-14 18:47:18

AMQP 协议不支持延迟消息传递,但可以使用 Time-To-Live 和 Expiration 和 < a href="https://www.rabbitmq.com/dlx.html" rel="nofollow noreferrer">死信交换扩展延迟消息传递是可能的。该解决方案在此描述 链接。我从该链接复制了以下步骤:

一步一步:

声明延迟队列
    添加 x-dead-letter-exchange 参数属性,并将其设置为默认交换“”。
    添加 x-dead-letter-routing-key 参数属性,并将其设置为目标队列的名称。
    添加 x-message-ttl 参数属性,并将其设置为您想要延迟消息的毫秒数。
订阅目标队列

GitHub 上的 RabbitMQ 存储库

请注意,有一个名为 Celery 的解决方案,它支持延迟任务排队RabbitMQ 代理通过提供名为 apply_async() 的调用 API 来实现。 Celery 支持 Python、node 和 PHP。

AMQP protocol does not support delayed messaging, but by using Time-To-Live and Expiration and Dead Letter Exchanges extensions delayed messaging is possible. The solution is described in this link. I copied the following steps from that link:

Step by step:

Declare the delayed queue
    Add the x-dead-letter-exchange argument property, and set it to the default exchange "".
    Add the x-dead-letter-routing-key argument property, and set it to the name of the destination queue.
    Add the x-message-ttl argument property, and set it to the number of milliseconds you want to delay the message.
Subscribe to the destination queue

There is also a plugin for delayed messaging in RabbitMQ repository on GitHub.

Note that there is a solution called Celery which supports delayed task queuing on RabbitMQ broker by presenting a calling API called apply_async(). Celery supports Python, node and PHP.

明媚如初 2024-10-14 18:47:17

您可以尝试两种方法:

旧方法:在每个消息/队列(策略)中设置 TTL(生存时间)标头,然后引入 DLQ 来处理它。一旦 ttl 过期,您的消息将从 DLQ 移动到主队列,以便您的侦听器可以处理它。

最新方法: 最近 RabbitMQ 推出了 RabbitMQ 延迟消息插件,使用它您可以实现相同的效果,并且自 RabbitMQ-3.5.8 起就提供了此插件支持。

您可以使用 x-delayed-message 类型声明交换,然后使用自定义标头 x-delay 发布消息,以毫秒为单位表示消息的延迟时间。消息将在 x-delay 毫秒后传递到相应的队列

byte[] messageBodyBytes = "delayed payload".getBytes("UTF-8");
Map<String, Object> headers = new HashMap<String, Object>();
headers.put("x-delay", 5000);
AMQP.BasicProperties.Builder props = new 
AMQP.BasicProperties.Builder().headers(headers);
channel.basicPublish("my-exchange", "", props.build(), messageBodyBytes);

更多信息:git

There are two approaches you can try:

Old Approach: Set the TTL(time to live) header in each message/queue(policy) and then introduce a DLQ to handle it. once the ttl expired your messages will move from DLQ to main queue so that your listener can process it.

Latest Approach: Recently RabbitMQ came up with RabbitMQ Delayed Message Plugin , using which you can achieve the same and this plugin support available since RabbitMQ-3.5.8.

You can declare an exchange with the type x-delayed-message and then publish messages with the custom header x-delay expressing in milliseconds a delay time for the message. The message will be delivered to the respective queues after x-delay milliseconds

byte[] messageBodyBytes = "delayed payload".getBytes("UTF-8");
Map<String, Object> headers = new HashMap<String, Object>();
headers.put("x-delay", 5000);
AMQP.BasicProperties.Builder props = new 
AMQP.BasicProperties.Builder().headers(headers);
channel.basicPublish("my-exchange", "", props.build(), messageBodyBytes);

More here: git

怀里藏娇 2024-10-14 18:47:17

随着 RabbitMQ v2.8 的发布,计划交付现已可用,但作为间接功能:http://www.javacodegeeks.com/2012/04/rabbitmq-scheduled-message-delivery.html

With the release of RabbitMQ v2.8, scheduled delivery is now available but as an indirect feature: http://www.javacodegeeks.com/2012/04/rabbitmq-scheduled-message-delivery.html

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文