使用 Node.js 的 AMQP,如何发布/订阅?

发布于 2024-12-01 19:23:33 字数 4447 浏览 1 评论 0原文

我正在创建一个操作 Node-AMQP 模块的类: https://github.com/postwait/node -amqp

但我无法使用它发布/订阅:

  var Queue = require('./Queue.js');
  var queue = new Queue();
  queue.addTaskToQueue('salut', 5);
  queue.subscribeTaskQueue('salut');

这是我正在使用的类(我在 CoffeeScript 中给出代码,对于那些不了解 CoffeeScript 的人则在 Node.js 中给出代码):

谢谢为了你的帮助。

在 CoffeeScript 中:

amqp = require('amqp')

class Queue

    constructor: (ip = 'localhost') ->
            @ip = ip
            @receivedObject
            @connection = amqp.createConnection({ host: @ip })
            @queue

    subscribeTaskQueue: (queueToSubscribe) ->
            self = @
            self.connection.on('ready', ->
                    q = self.connection.queue(queueToSubscribe)
                    q.bind('#')

                    q.subscribe({ ack: true }, (message) ->
                            self.receivedObject = message
                            console.log(self.receivedObject)
                    )
            )

    addTaskToQueue: (queue, objectToSend) ->
            @queue = @connection.queue("salut", { durable: true })
            @connection.publish(queue, objectToSend)

module.exports = Queue

在 Node.js 中,

(function() {
  var Queue, amqp;
  amqp = require('amqp');
  Queue = (function() {
    function Queue(ip) {
      if (ip == null) {
        ip = 'localhost';
      }
      this.ip = ip;
      this.receivedObject;
      this.connection = amqp.createConnection({
        host: this.ip
      });
      this.queue;
    }
    Queue.prototype.subscribeTaskQueue = function(queueToSubscribe) {
      var self;
      self = this;
      return self.connection.on('ready', function() {
        var q;
        q = self.connection.queue(queueToSubscribe);
        q.bind('#');
        return q.subscribe({
          ack: true
        }, function(message) {
          self.receivedObject = message;
          return console.log(self.receivedObject);
        });
      });
    };
    Queue.prototype.addTaskToQueue = function(queue, objectToSend) {
      this.queue = this.connection.queue("salut", {
        durable: true
      });
      return this.connection.publish(queue, objectToSend);
    };
    return Queue;
  })();
  module.exports = Queue;
}).call(this);

我做了你所说的+我做了一些修改。

这是我的 Queue.coffee 类:

amqp = require('amqp')

class Queue

    constructor: (ip = "localhost", queueName = "salut") ->
            @ip = ip
            @receivedObject = "test"
            @connection = amqp.createConnection({ host: 'localhost' })
            @queueName = queueName

    subscribeTaskQueue: () ->
            @connection.on('ready', ->
                    q = @connection.queue(@queueName)
                    q.bind('#')

                    q.subscribe({ ack: true }, (message) ->
                            @receivedObject = message
                            console.log(@receivedObject)
                    )
            )

    addTaskToQueue: (objectToSend = "hello") ->
            @connection.publish(@queueName, objectToSend)


queue = new Queue("localhost", "salut")

queue.connection.on 'ready', ->
  queue.addTaskToQueue 'salut', 5
  queue.subscribeTaskQueue 'salut'

这是rabbituser.coffee:

Queue = require('./Queue.js')
queue = new Queue("localhost", "salut")

queue.addTaskToQueue("hello")
queue.subscribeTaskQueue()

当我执行命令:noderabbituser.js 时,

我得到:

node.js:134
        throw e; // process.nextTick error, or 'error' event on first tick
        ^
TypeError: Cannot read property '' of undefined
    at Connection.exchange (/home/armand/node_modules/amqp/amqp.js:1242:21)
    at Connection.publish (/home/armand/node_modules/amqp/amqp.js:1258:60)
    at Queue.addTaskToQueue (/home/armand/Desktop/RockSolidProject/coucheAMQP/Queue.js:36:30)
    at Object.<anonymous> (/home/armand/Desktop/RockSolidProject/coucheAMQP/rabbituser.js:5:9)
    at Object.<anonymous> (/home/armand/Desktop/RockSolidProject/coucheAMQP/rabbituser.js:7:4)
    at Module._compile (module.js:402:26)
    at Object..js (module.js:408:10)
    at Module.load (module.js:334:31)
    at Function._load (module.js:293:12)
    at Array.<anonymous> (module.js:421:10)

I am making a class manipulating the Node-AMQP module available here : https://github.com/postwait/node-amqp

But I am not able to publish/subscribe using this :

  var Queue = require('./Queue.js');
  var queue = new Queue();
  queue.addTaskToQueue('salut', 5);
  queue.subscribeTaskQueue('salut');

Here is the class that I am using (I give the Code in CoffeeScript, and in Node.js for those who doesn't know CoffeeScript) :

Thanks for your help.

In CoffeeScript :

amqp = require('amqp')

class Queue

    constructor: (ip = 'localhost') ->
            @ip = ip
            @receivedObject
            @connection = amqp.createConnection({ host: @ip })
            @queue

    subscribeTaskQueue: (queueToSubscribe) ->
            self = @
            self.connection.on('ready', ->
                    q = self.connection.queue(queueToSubscribe)
                    q.bind('#')

                    q.subscribe({ ack: true }, (message) ->
                            self.receivedObject = message
                            console.log(self.receivedObject)
                    )
            )

    addTaskToQueue: (queue, objectToSend) ->
            @queue = @connection.queue("salut", { durable: true })
            @connection.publish(queue, objectToSend)

module.exports = Queue

In Node.js

(function() {
  var Queue, amqp;
  amqp = require('amqp');
  Queue = (function() {
    function Queue(ip) {
      if (ip == null) {
        ip = 'localhost';
      }
      this.ip = ip;
      this.receivedObject;
      this.connection = amqp.createConnection({
        host: this.ip
      });
      this.queue;
    }
    Queue.prototype.subscribeTaskQueue = function(queueToSubscribe) {
      var self;
      self = this;
      return self.connection.on('ready', function() {
        var q;
        q = self.connection.queue(queueToSubscribe);
        q.bind('#');
        return q.subscribe({
          ack: true
        }, function(message) {
          self.receivedObject = message;
          return console.log(self.receivedObject);
        });
      });
    };
    Queue.prototype.addTaskToQueue = function(queue, objectToSend) {
      this.queue = this.connection.queue("salut", {
        durable: true
      });
      return this.connection.publish(queue, objectToSend);
    };
    return Queue;
  })();
  module.exports = Queue;
}).call(this);

I did what you say + I made little modifications.

Here is my class Queue.coffee :

amqp = require('amqp')

class Queue

    constructor: (ip = "localhost", queueName = "salut") ->
            @ip = ip
            @receivedObject = "test"
            @connection = amqp.createConnection({ host: 'localhost' })
            @queueName = queueName

    subscribeTaskQueue: () ->
            @connection.on('ready', ->
                    q = @connection.queue(@queueName)
                    q.bind('#')

                    q.subscribe({ ack: true }, (message) ->
                            @receivedObject = message
                            console.log(@receivedObject)
                    )
            )

    addTaskToQueue: (objectToSend = "hello") ->
            @connection.publish(@queueName, objectToSend)


queue = new Queue("localhost", "salut")

queue.connection.on 'ready', ->
  queue.addTaskToQueue 'salut', 5
  queue.subscribeTaskQueue 'salut'

And here is rabbituser.coffee :

Queue = require('./Queue.js')
queue = new Queue("localhost", "salut")

queue.addTaskToQueue("hello")
queue.subscribeTaskQueue()

When I do the command : node rabbituser.js

I get :

node.js:134
        throw e; // process.nextTick error, or 'error' event on first tick
        ^
TypeError: Cannot read property '' of undefined
    at Connection.exchange (/home/armand/node_modules/amqp/amqp.js:1242:21)
    at Connection.publish (/home/armand/node_modules/amqp/amqp.js:1258:60)
    at Queue.addTaskToQueue (/home/armand/Desktop/RockSolidProject/coucheAMQP/Queue.js:36:30)
    at Object.<anonymous> (/home/armand/Desktop/RockSolidProject/coucheAMQP/rabbituser.js:5:9)
    at Object.<anonymous> (/home/armand/Desktop/RockSolidProject/coucheAMQP/rabbituser.js:7:4)
    at Module._compile (module.js:402:26)
    at Object..js (module.js:408:10)
    at Module.load (module.js:334:31)
    at Function._load (module.js:293:12)
    at Array.<anonymous> (module.js:421:10)

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

好久不见√ 2024-12-08 19:23:33

我不确定您遇到的错误是什么,但我会指出一些事情:

  1. 在构造函数中,当您编写 @receivedObject@queue——没有任何作用。在 JavaScript 中,每个对象都是一个哈希,您可以随时附加属性;如果 @receivedObject@queue 最初在 Queue 实例中未定义,则不必在构造函数中定义它们。

  2. 问题是否可能是您在连接存在之前(即在 self.queue 之前)在 addTaskToQueue 中调用 @connection.queue 。 connection.on 'ready' 回调)?

也许如果您将代码更改为“

queue = new Queue()
queue.connection.on 'ready', ->
  queue.addTaskToQueue 'salut', 5
  queue.subscribeTaskQueue 'salut'

如果这不能解决您的问题,您能否描述一下您遇到的确切错误以及遇到该错误的位置?”

I'm not sure what the error you're encountering is, but I will point out a few things:

  1. In your constructor, when you write @receivedObject and @queue—that doesn't do anything. In JavaScript, every object is a hash, and you can attach properties at any time; if @receivedObject and @queue are initially undefined in a Queue instance, then you don't have to define them in the constructor.

  2. Is it possible that the problem is that you're calling @connection.queue in addTaskToQueue before the connection exists (that is, before the self.connection.on 'ready' callback)?

Perhaps if you changed your code to

queue = new Queue()
queue.connection.on 'ready', ->
  queue.addTaskToQueue 'salut', 5
  queue.subscribeTaskQueue 'salut'

If that doesn't solve your problem, could you please describe the precise error you're encountering and where you're encountering it?

荒芜了季节 2024-12-08 19:23:33

为了能够使用 amqp 作为发布/订阅者,现在可以使用 rabbitmq-nodejs-client< /a>

To afford to use amqp as publish/subscriber, right now it is available rabbitmq-nodejs-client

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文