使用 Node.js 的 AMQP,如何发布/订阅?
我正在创建一个操作 Node-AMQP 模块的类: https://github.com/postwait/node -amqp
但我无法使用它发布/订阅:
var Queue = require('./Queue.js');
var queue = new Queue();
queue.addTaskToQueue('salut', 5);
queue.subscribeTaskQueue('salut');
这是我正在使用的类(我在 CoffeeScript 中给出代码,对于那些不了解 CoffeeScript 的人则在 Node.js 中给出代码):
谢谢为了你的帮助。
在 CoffeeScript 中:
amqp = require('amqp')
class Queue
constructor: (ip = 'localhost') ->
@ip = ip
@receivedObject
@connection = amqp.createConnection({ host: @ip })
@queue
subscribeTaskQueue: (queueToSubscribe) ->
self = @
self.connection.on('ready', ->
q = self.connection.queue(queueToSubscribe)
q.bind('#')
q.subscribe({ ack: true }, (message) ->
self.receivedObject = message
console.log(self.receivedObject)
)
)
addTaskToQueue: (queue, objectToSend) ->
@queue = @connection.queue("salut", { durable: true })
@connection.publish(queue, objectToSend)
module.exports = Queue
在 Node.js 中,
(function() {
var Queue, amqp;
amqp = require('amqp');
Queue = (function() {
function Queue(ip) {
if (ip == null) {
ip = 'localhost';
}
this.ip = ip;
this.receivedObject;
this.connection = amqp.createConnection({
host: this.ip
});
this.queue;
}
Queue.prototype.subscribeTaskQueue = function(queueToSubscribe) {
var self;
self = this;
return self.connection.on('ready', function() {
var q;
q = self.connection.queue(queueToSubscribe);
q.bind('#');
return q.subscribe({
ack: true
}, function(message) {
self.receivedObject = message;
return console.log(self.receivedObject);
});
});
};
Queue.prototype.addTaskToQueue = function(queue, objectToSend) {
this.queue = this.connection.queue("salut", {
durable: true
});
return this.connection.publish(queue, objectToSend);
};
return Queue;
})();
module.exports = Queue;
}).call(this);
我做了你所说的+我做了一些修改。
这是我的 Queue.coffee 类:
amqp = require('amqp')
class Queue
constructor: (ip = "localhost", queueName = "salut") ->
@ip = ip
@receivedObject = "test"
@connection = amqp.createConnection({ host: 'localhost' })
@queueName = queueName
subscribeTaskQueue: () ->
@connection.on('ready', ->
q = @connection.queue(@queueName)
q.bind('#')
q.subscribe({ ack: true }, (message) ->
@receivedObject = message
console.log(@receivedObject)
)
)
addTaskToQueue: (objectToSend = "hello") ->
@connection.publish(@queueName, objectToSend)
queue = new Queue("localhost", "salut")
queue.connection.on 'ready', ->
queue.addTaskToQueue 'salut', 5
queue.subscribeTaskQueue 'salut'
这是rabbituser.coffee:
Queue = require('./Queue.js')
queue = new Queue("localhost", "salut")
queue.addTaskToQueue("hello")
queue.subscribeTaskQueue()
当我执行命令:noderabbituser.js 时,
我得到:
node.js:134
throw e; // process.nextTick error, or 'error' event on first tick
^
TypeError: Cannot read property '' of undefined
at Connection.exchange (/home/armand/node_modules/amqp/amqp.js:1242:21)
at Connection.publish (/home/armand/node_modules/amqp/amqp.js:1258:60)
at Queue.addTaskToQueue (/home/armand/Desktop/RockSolidProject/coucheAMQP/Queue.js:36:30)
at Object.<anonymous> (/home/armand/Desktop/RockSolidProject/coucheAMQP/rabbituser.js:5:9)
at Object.<anonymous> (/home/armand/Desktop/RockSolidProject/coucheAMQP/rabbituser.js:7:4)
at Module._compile (module.js:402:26)
at Object..js (module.js:408:10)
at Module.load (module.js:334:31)
at Function._load (module.js:293:12)
at Array.<anonymous> (module.js:421:10)
I am making a class manipulating the Node-AMQP module available here : https://github.com/postwait/node-amqp
But I am not able to publish/subscribe using this :
var Queue = require('./Queue.js');
var queue = new Queue();
queue.addTaskToQueue('salut', 5);
queue.subscribeTaskQueue('salut');
Here is the class that I am using (I give the Code in CoffeeScript, and in Node.js for those who doesn't know CoffeeScript) :
Thanks for your help.
In CoffeeScript :
amqp = require('amqp')
class Queue
constructor: (ip = 'localhost') ->
@ip = ip
@receivedObject
@connection = amqp.createConnection({ host: @ip })
@queue
subscribeTaskQueue: (queueToSubscribe) ->
self = @
self.connection.on('ready', ->
q = self.connection.queue(queueToSubscribe)
q.bind('#')
q.subscribe({ ack: true }, (message) ->
self.receivedObject = message
console.log(self.receivedObject)
)
)
addTaskToQueue: (queue, objectToSend) ->
@queue = @connection.queue("salut", { durable: true })
@connection.publish(queue, objectToSend)
module.exports = Queue
In Node.js
(function() {
var Queue, amqp;
amqp = require('amqp');
Queue = (function() {
function Queue(ip) {
if (ip == null) {
ip = 'localhost';
}
this.ip = ip;
this.receivedObject;
this.connection = amqp.createConnection({
host: this.ip
});
this.queue;
}
Queue.prototype.subscribeTaskQueue = function(queueToSubscribe) {
var self;
self = this;
return self.connection.on('ready', function() {
var q;
q = self.connection.queue(queueToSubscribe);
q.bind('#');
return q.subscribe({
ack: true
}, function(message) {
self.receivedObject = message;
return console.log(self.receivedObject);
});
});
};
Queue.prototype.addTaskToQueue = function(queue, objectToSend) {
this.queue = this.connection.queue("salut", {
durable: true
});
return this.connection.publish(queue, objectToSend);
};
return Queue;
})();
module.exports = Queue;
}).call(this);
I did what you say + I made little modifications.
Here is my class Queue.coffee :
amqp = require('amqp')
class Queue
constructor: (ip = "localhost", queueName = "salut") ->
@ip = ip
@receivedObject = "test"
@connection = amqp.createConnection({ host: 'localhost' })
@queueName = queueName
subscribeTaskQueue: () ->
@connection.on('ready', ->
q = @connection.queue(@queueName)
q.bind('#')
q.subscribe({ ack: true }, (message) ->
@receivedObject = message
console.log(@receivedObject)
)
)
addTaskToQueue: (objectToSend = "hello") ->
@connection.publish(@queueName, objectToSend)
queue = new Queue("localhost", "salut")
queue.connection.on 'ready', ->
queue.addTaskToQueue 'salut', 5
queue.subscribeTaskQueue 'salut'
And here is rabbituser.coffee :
Queue = require('./Queue.js')
queue = new Queue("localhost", "salut")
queue.addTaskToQueue("hello")
queue.subscribeTaskQueue()
When I do the command : node rabbituser.js
I get :
node.js:134
throw e; // process.nextTick error, or 'error' event on first tick
^
TypeError: Cannot read property '' of undefined
at Connection.exchange (/home/armand/node_modules/amqp/amqp.js:1242:21)
at Connection.publish (/home/armand/node_modules/amqp/amqp.js:1258:60)
at Queue.addTaskToQueue (/home/armand/Desktop/RockSolidProject/coucheAMQP/Queue.js:36:30)
at Object.<anonymous> (/home/armand/Desktop/RockSolidProject/coucheAMQP/rabbituser.js:5:9)
at Object.<anonymous> (/home/armand/Desktop/RockSolidProject/coucheAMQP/rabbituser.js:7:4)
at Module._compile (module.js:402:26)
at Object..js (module.js:408:10)
at Module.load (module.js:334:31)
at Function._load (module.js:293:12)
at Array.<anonymous> (module.js:421:10)
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(2)
我不确定您遇到的错误是什么,但我会指出一些事情:
在构造函数中,当您编写
@receivedObject
和@queue——没有任何作用。在 JavaScript 中,每个对象都是一个哈希,您可以随时附加属性;如果
@receivedObject
和@queue
最初在Queue
实例中未定义,则不必在构造函数中定义它们。问题是否可能是您在连接存在之前(即在
self.queue 之前)在
回调)?addTaskToQueue
中调用@connection.queue
。 connection.on 'ready'也许如果您将代码更改为“
如果这不能解决您的问题,您能否描述一下您遇到的确切错误以及遇到该错误的位置?”
I'm not sure what the error you're encountering is, but I will point out a few things:
In your constructor, when you write
@receivedObject
and@queue
—that doesn't do anything. In JavaScript, every object is a hash, and you can attach properties at any time; if@receivedObject
and@queue
are initially undefined in aQueue
instance, then you don't have to define them in the constructor.Is it possible that the problem is that you're calling
@connection.queue
inaddTaskToQueue
before the connection exists (that is, before theself.connection.on 'ready'
callback)?Perhaps if you changed your code to
If that doesn't solve your problem, could you please describe the precise error you're encountering and where you're encountering it?
为了能够使用 amqp 作为发布/订阅者,现在可以使用 rabbitmq-nodejs-client< /a>
To afford to use amqp as publish/subscriber, right now it is available rabbitmq-nodejs-client