352-wascally 中文文档教程
Wascally
这是对 amqplib 的一种非常自以为是的抽象,有助于简化某些常见任务,并(希望)减少在节点服务中使用 RabbitMQ 所需的工作量。
Features:
- Gracefully handle re-connections
- Automatically re-define all topology on re-connection
- Automatically re-send any unconfirmed messages on re-connection
- Support the majority of RabbitMQ's extensions
- Handle batching of acknowledgements and rejections
- Topology & configuration via the JSON configuration method (thanks to @JohnDMathis!)
Assumptions & Defaults:
- Fault-tolerance/resilience over throughput
- Default to publish confirmation
- Default to ack mode on consumers
- Heterogenous services that include statically typed languages
- JSON as the only serialization provider
Demos
API Reference
这个库通过 when.js 为许多调用实现了承诺。
Sending & Receiving Messages
publish( exchangeName, options, [connectionName] )
此语法使用选项对象而不是参数,下面是一个显示所有可用属性的示例:
rabbit.publish( 'exchange.name', {
routingKey: 'hi',
type: 'company.project.messages.textMessage',
correlationId: 'one',
body: { text: 'hello!' },
messageId: '100',
expiresAfter: 1000 // TTL in ms, in this example 1 second
timestamp: // posix timestamp (long)
headers: {
'random': 'application specific value'
}
},
connectionName: '' // another optional way to provide connection name if needed
);
publish( exchangeName, typeName, messageBody, [routingKey], [correlationId], [connectionName] )
消息体是简单的对象。 将用于设置 AMQP 的 properties.type 的消息需要类型说明符。 如果没有提供路由键,将使用类型说明符。 '' 的路由键将阻止使用类型说明符。
// the first 3 arguments are required
// routing key is optional and defaults to the value of typeName
// connectionName is only needed if you have multiple connections to different servers or vhosts
rabbit.publish( 'log.entries', 'company.project.messages.logEntry', {
date: Date.now(),
level: logLevel,
message: message
}, 'log.' + logLevel, someValueToCorrelateBy );
request( exchangeName, options, [connectionName] )
这就像发布一样工作,除了返回的承诺提供来自另一方的响应(或响应)。
// when multiple responses are provided, all but the last will be provided via the .progress callback.
// the last/only reply will always be provided to the .then callback
rabbit.request( 'request.exchange', {
// see publish example to see options for the outgoing message
} )
.progress( function( reply ) {
// if multiple replies are provided, all but the last will be sent via the progress callback
} )
.then( function( final ) {
// the last message in a series OR the only reply will be sent to this callback
} );
handle( typeName, handler, [context] )
处理呼叫应该发生在之前开始订阅。
注册消息处理程序以处理基于 typeName 的消息。 调用 handle 将返回对处理程序的引用,稍后可以将其删除。 传递给处理程序的消息是原始 Rabbit 负载。 body 属性包含发布的消息正文。 消息有ack
, nack
(requeue the message) and reject
(don't requeue the message) 方法控制 Rabbit 对消息的处理.
Explicit Error Handling
在这个例子中,任何可能的错误都在显式的 try/catch 中被捕获:
var handler = rabbit.handle( 'company.project.messages.logEntry', function( message ) {
try {
// do something meaningful?
console.log( message.body );
message.ack();
} catch( err ) {
message.nack();
}
} );
handler.remove();
Automatically Nack On Error
这个例子展示了如何用一个 try catch wascally 包装所有处理程序:
- nacks the message on error
- console.log that an error has occurred in a handle
// after this call, any new callbacks attached via handle will be wrapped in a try/catch
// that nacks the message on an error
rabbit.nackOnError();
var handler = rabbit.handle( 'company.project.messages.logEntry', function( message ) {
console.log( message.body );
message.ack();
} );
handler.remove();
// after this call, new callbacks attached via handle will *not* be wrapped in a try/catch
rabbit.ignoreHandlerErrors();
Late-bound Error Handling
提供一个策略来处理多个句柄的错误或在事后附加一个错误处理程序。
var handler = rabbit.handle( 'company.project.messages.logEntry', function( message ) {
console.log( message.body );
message.ack();
} );
handler.catch( function( err, msg ) {
// do something with the error & message
msg.nack();
} );
!!! IMPORTANT !!!
未能处理错误将导致静默失败和消息丢失。
Unhandled Messages
在以前的版本中,如果订阅以 ack 模式(默认)启动而没有处理消息的处理程序,则消息将在 limbo 中丢失,直到连接(或通道)关闭,然后消息将返回到队列. 这是非常令人困惑和不受欢迎的行为。 为了帮助防止这种情况,新的默认行为是任何收到的没有任何合格处理程序的消息将被 nack
并立即发送回队列。
这仍然有问题,因为它会在客户端和服务器上造成混乱,因为消息将无限期地重新传送。
要更改此行为,请使用以下调用之一:
注意:一次只能激活其中一种策略 将
onUnhandled( handler )
rabbit.onUnhandled( function( message ) {
// handle the message here
} );
nackUnhandled() - default
所有未处理的消息发送回队列。
rabbit.nackUnhandled();
rejectUnhandled()
拒绝未处理的消息,以便不会重新排队。 不要使用它,除非所有队列都存在死信交换。
rabbit.rejectUnhandled();
startSubscription( queueName, [connectionName] )
建议:在开始订阅之前为预期类型设置处理程序。
在指定的队列上启动一个消费者。 connectionName
是可选的,仅当订阅非默认连接上的队列时才需要。
Message API
Wascally 默认(并假定)队列处于确认模式。 它批处理 ack 和 nack 操作以提高总吞吐量。 Ack/Nack 调用不会立即生效。
message.ack()
将消息排队等待确认。
message.nack()
将消息排入队列以供拒绝。 这将重新排队消息。
message.reject()
拒绝消息而不重新排队。 请谨慎使用并考虑在使用此功能之前将死信交换分配给队列。
message.reply( message, [more], [replyType] )
确认消息并将消息发送回请求者。 消息
只是回复的正文。 为 more
提供 true 将导致消息发送到请求承诺的 .progress 回调,以便您可以发送多个回复。 replyType
参数设置回复消息的类型。 (在使用静态类型语言进行消息传递时很重要)
Queues in noBatch
mode
Wascally 现在支持将队列放入非批处理行为的能力。 这会导致立即对通道进行 ack、nack 和 reject 调用。 当处理消息长时间运行并且消费者限制到位时,此功能非常理想。 请注意,此功能确实会对消息吞吐量产生重大影响。
Reply Queues
默认情况下,wascally 为每个连接创建一个唯一的回复队列,该队列会在连接关闭时自动订阅和删除。 这可以修改或完全关闭。
更改行为是通过将三个值之一传递给连接哈希上的 replyQueue
属性来完成的:
!!! 重要的 !!! 使用前两个选项时,wascally 无法防止跨服务实例或连接的队列命名冲突。
Custom Name
仅更改 wascally 创建的回复队列的名称 - autoDelete
和 subscribe
将设置为 true
。
rabbit.addConnection( {
name: 'default',
replyQueue: 'myOwnQueue',
user: 'guest',
pass: 'guest',
server: '127.0.0.1',
port: 5672,
timeout: 2000,
vhost: '%2f'
} );
Custom Behavior
要完全控制队列名称和行为,请提供队列定义来代替名称。
wascally 不提供默认值 - 它只会使用提供的定义
rabbit.addConnection( {
name: 'default',
replyQueue: {
name: 'myOwnQueue',
subscribe: 'true',
durable: true
},
user: 'guest',
pass: 'guest',
server: '127.0.0.1',
port: 5672,
timeout: 2000,
vhost: '%2f'
} );
No Automatic Reply Queue
只有在未使用请求/响应或提供自定义总体策略时才选择此选项
rabbit.addConnection( {
name: 'default',
replyQueue: false,
user: 'guest',
pass: 'guest',
server: '127.0.0.1',
port: 5672,
timeout: 2000,
vhost: '%2f'
} );
Managing Topology
addExchange( exchangeName, exchangeType, [options], [connectionName] )
该调用返回一个承诺,可用于确定何时创建交换服务器。
有效的 exchangeTypes:
- 'direct'
- 'fanout'
- 'topic'
Options 是一个散列,可以包含以下内容:
- autoDelete true|false delete when consumer count goes to 0
- durable true|false survive broker restarts
- persistent true|false a.k.a. persistent delivery, messages saved to disk
- alternate 'alt.exchange' define an alternate exchange
addQueue( queueName, [options], [connectionName] )
调用返回一个承诺,可用于确定何时在服务器上创建队列。
Options 是一个散列,可以包含以下内容:
- autoDelete true|false delete when consumer count goes to 0
- durable true|false survive broker restarts
- exclusive true|false limits queue to the current connection only (danger)
- subscribe true|false auto-start the subscription
- limit 2^16 max number of unacked messages allowed for consumer
- noAck true|false the server will remove messages from the queue as soon as they are delivered
- noBatch true|false causes ack, nack & reject to take place immediately
- queueLimit 2^32 max number of ready messages a queue can hold
- messageTtl 2^32 time in ms before a message expires on the queue
- expires 2^32 time in ms before a queue with 0 consumers expires
- deadLetter 'dlx.exchange' the exchange to dead-letter messages to
bindExchange( sourceExchange, targetExchange, [routingKeys], [connectionName] )
将目标交换绑定到源交换。 消息从源流向目标。
bindQueue( sourceExchange, targetQueue, [routingKeys], [connectionName] )
将目标队列绑定到源交换。 消息从源流向目标。
Configuration via JSON
注意:将 subscribe 设置为 true 将导致订阅在队列创建后立即开始。
此示例显示了上述大部分可用选项。
var settings = {
connection: {
user: 'guest',
pass: 'guest',
server: '127.0.0.1',
port: 5672,
timeout: 2000,
vhost: '%2fmyhost'
},
exchanges:[
{ name: 'config-ex.1', type: 'fanout' },
{ name: 'config-ex.2', type: 'topic', alternate: 'alternate-ex.2', persistent: true },
{ name: 'dead-letter-ex.2', type: 'fanout' }
],
queues:[
{ name:'config-q.1', limit: 100, queueLimit: 1000 },
{ name:'config-q.2', subscribe: true, deadLetter: 'dead-letter-ex.2' }
],
bindings:[
{ exchange: 'config-ex.1', target: 'config-q.1', keys: [ 'bob','fred' ] },
{ exchange: 'config-ex.2', target: 'config-q.2', keys: 'test1' }
]
};
要建立所有设置都已就绪并准备就绪的连接,请调用配置:
var rabbit = require( 'wascally' );
rabbit.configure( settings ).done( function() {
// ready to go!
} );
Closing Connections
Wascally 将在关闭通道和连接之前尝试解决所有未完成的发布和接收消息(ack/nack/reject)。 如果您想将某些操作推迟到所有事情都已安全解决之后,请使用从任一关闭调用返回的承诺。
!!! 注意! - 使用重置是危险的。 与连接关联的所有拓扑都将被删除,这意味着如果您决定重新连接,wascly 将无法重新建立它。
close( [connectionName], [reset] )
关闭连接,可选地重置所有先前为连接定义的拓扑。 如果未提供 connectionName
,则使用 default
。
closeAll( [reset] )
关闭所有 连接,可选择重置所有连接的拓扑。
AMQPS, SSL/TLS Support
提供以下设置相关环境变量的配置选项将导致 wascally 尝试通过 AMQPS 进行连接。 有关哪些设置执行什么角色的更多详细信息,请参阅 SSL 上的 amqplib 页面。
connection: { // sample connection hash
caPath: '', // comma delimited paths to CA files. RABBIT_CA
certPath: '', // path to cert file. RABBIT_CERT
keyPath: '', // path to key file. RABBIT_KEY
passphrase: '', // passphrase associated with cert/pfx. RABBIT_PASSPHRASE
pfxPath: '' // path to pfx file. RABBIT_PFX
}
Channel Prefetch Limits
Wascally 大多在幕后隐藏了通道的概念,但仍然允许您指定通道选项,例如通道预取限制。 而不是指定 this 在通道对象上,但是,它在队列定义中被指定为 limit
。
queues: [{
// ...
limit: 5
}]
// or
rabbit.addQueue("some.q", {
// ...
limit: 5
});
此队列配置将在用于使用此队列的通道上设置预取限制为 5。
注意: 队列 limit
与 queueLimit
选项不同 - 后者设置队列中允许的最大消息数。
Additional Learning Resources
Watch Me Code
感谢 Derick Bailey 的投入,wascally 的 API 和文档有了很大的改进。 您可以在他的 Watch Me Code 系列中学习 Derick 的实践经验。
RabbitMQ In Action
Alvaro Vidella 和 Jason Williams 在 RabbitMQ 上写了这本书。
Enterprise Integration Patterns
Gregor Hophe 和 Bobby Woolf 在消息传递方面的权威著作。 站点 提供了模式的基本描述,book 有很多细节。
我怎么推荐这本书都不为过; 理解模式将为您提供成功所需的概念工具。
Contributing
覆盖不足、测试失败或偏离风格的 PR 将不被接受。
Behavior & Integration Tests
PR 应该在集成和行为规范中包括修改的或附加的测试范围。 集成测试假设 RabbitMQ 在本地主机上运行,并启用了来宾/来宾凭据和一致的哈希交换插件。 您可以使用以下命令启用该插件:
rabbit-plugins enable rabbitmq_consistent_hash_exchange
运行 gulp 将在每次文件更改后运行这两个集合并显示覆盖率摘要。 要查看详细报告,请运行一次 gulp coverage 以调出浏览器。
Style
该项目同时具有 .editorconfig
和 .esformatter
文件,以帮助保持对样式的坚持。 还请利用 .jshintrc
文件并避免 linter 警告。
Roadmap
- additional test coverage
- support RabbitMQ backpressure mechanisms
- (configurable) limits & behavior when publishing during connectivity issues
- ability to capture/log unpublished messages on shutdown
- add support for Rabbit's HTTP API
- enable better cluster utilization by spreading connections out over all nodes in cluster
Wascally
This is a very opinionated abstraction over amqplib to help simplify certain common tasks and (hopefully) reduce the effort required to use RabbitMQ in your Node services.
Features:
- Gracefully handle re-connections
- Automatically re-define all topology on re-connection
- Automatically re-send any unconfirmed messages on re-connection
- Support the majority of RabbitMQ's extensions
- Handle batching of acknowledgements and rejections
- Topology & configuration via the JSON configuration method (thanks to @JohnDMathis!)
Assumptions & Defaults:
- Fault-tolerance/resilience over throughput
- Default to publish confirmation
- Default to ack mode on consumers
- Heterogenous services that include statically typed languages
- JSON as the only serialization provider
Demos
API Reference
This library implements promises for many of the calls via when.js.
Sending & Receiving Messages
publish( exchangeName, options, [connectionName] )
This syntax uses an options object rather than arguments, here's an example showing all of the available properties:
rabbit.publish( 'exchange.name', {
routingKey: 'hi',
type: 'company.project.messages.textMessage',
correlationId: 'one',
body: { text: 'hello!' },
messageId: '100',
expiresAfter: 1000 // TTL in ms, in this example 1 second
timestamp: // posix timestamp (long)
headers: {
'random': 'application specific value'
}
},
connectionName: '' // another optional way to provide connection name if needed
);
publish( exchangeName, typeName, messageBody, [routingKey], [correlationId], [connectionName] )
Messages bodies are simple objects. A type specifier is required for the message which will be used to set AMQP's properties.type. If no routing key is provided, the type specifier will be used. A routing key of '' will prevent the type specifier from being used.
// the first 3 arguments are required
// routing key is optional and defaults to the value of typeName
// connectionName is only needed if you have multiple connections to different servers or vhosts
rabbit.publish( 'log.entries', 'company.project.messages.logEntry', {
date: Date.now(),
level: logLevel,
message: message
}, 'log.' + logLevel, someValueToCorrelateBy );
request( exchangeName, options, [connectionName] )
This works just like a publish except that the promise returned provides the response (or responses) from the other side.
// when multiple responses are provided, all but the last will be provided via the .progress callback.
// the last/only reply will always be provided to the .then callback
rabbit.request( 'request.exchange', {
// see publish example to see options for the outgoing message
} )
.progress( function( reply ) {
// if multiple replies are provided, all but the last will be sent via the progress callback
} )
.then( function( final ) {
// the last message in a series OR the only reply will be sent to this callback
} );
handle( typeName, handler, [context] )
Handle calls should happen before starting subscriptions.
Message handlers are registered to handle a message based on the typeName. Calling handle will return a reference to the handler that can later be removed. The message that is passed to the handler is the raw Rabbit payload. The body property contains the message body published. The message has ack
, nack
(requeue the message) and reject
(don't requeue the message) methods control what Rabbit does with the message.
Explicit Error Handling
In this example, any possible error is caught in an explicit try/catch:
var handler = rabbit.handle( 'company.project.messages.logEntry', function( message ) {
try {
// do something meaningful?
console.log( message.body );
message.ack();
} catch( err ) {
message.nack();
}
} );
handler.remove();
Automatically Nack On Error
This example shows how to have wascally wrap all handlers with a try catch that:
- nacks the message on error
- console.log that an error has occurred in a handle
// after this call, any new callbacks attached via handle will be wrapped in a try/catch
// that nacks the message on an error
rabbit.nackOnError();
var handler = rabbit.handle( 'company.project.messages.logEntry', function( message ) {
console.log( message.body );
message.ack();
} );
handler.remove();
// after this call, new callbacks attached via handle will *not* be wrapped in a try/catch
rabbit.ignoreHandlerErrors();
Late-bound Error Handling
Provide a strategy for handling errors to multiple handles or attach an error handler after the fact.
var handler = rabbit.handle( 'company.project.messages.logEntry', function( message ) {
console.log( message.body );
message.ack();
} );
handler.catch( function( err, msg ) {
// do something with the error & message
msg.nack();
} );
!!! IMPORTANT !!!
Failure to handle errors will result in silent failures and lost messages.
Unhandled Messages
In previous versions, if a subscription was started in ack mode (the default) without a handler to process the message, the message would get lost in limbo until the connection (or channel) was closed and then the messages would be returned to the queue. This is very confusing and undesirable behavior. To help protect against this, the new default behavior is that any message received that doesn't have any elligible handlers will get nack
'd and sent back to the queue immediately.
This is still problematic because it can create churn on the client and server as the message will be redelivered indefinitely.
To change this behavior, use one of the following calls:
Note: only one of these strategies can be activated at a time
onUnhandled( handler )
rabbit.onUnhandled( function( message ) {
// handle the message here
} );
nackUnhandled() - default
Sends all unhandled messages back to the queue.
rabbit.nackUnhandled();
rejectUnhandled()
Rejects unhandled messages so that will will not be requeued. DO NOT use this unless there are dead letter exchanges for all queues.
rabbit.rejectUnhandled();
startSubscription( queueName, [connectionName] )
Recommendation: set handlers for anticipated types up before starting subscriptions.
Starts a consumer on the queue specified. connectionName
is optional and only required if subscribing to a queue on a connection other than the default one.
Message API
Wascally defaults to (and assumes) queues are in ack mode. It batches ack and nack operations in order to improve total throughput. Ack/Nack calls do not take effect immediately.
message.ack()
Enqueues the message for acknowledgement.
message.nack()
Enqueues the message for rejection. This will re-enqueue the message.
message.reject()
Rejects the message without re-queueing it. Please use with caution and consider having a dead-letter-exchange assigned to the queue before using this feature.
message.reply( message, [more], [replyType] )
Acknowledges the messages and sends the message back to the requestor. The message
is only the body of the reply. Providing true to more
will cause the message to get sent to the .progress callback of the request promise so that you can send multiple replies. The replyType
argument sets the type of the reply message. (important when messaging with statically typed languages)
Queues in noBatch
mode
Wascally now supports the ability to put queues into non-batching behavior. This causes ack, nack and reject calls to take place against the channel immediately. This feature is ideal when processing messages are long-running and consumer limits are in place. Be aware that this feature does have a significant impact on message throughput.
Reply Queues
By default, wascally creates a unique reply queue for each connection which is automatically subscribed to and deleted on connection close. This can be modified or turned off altogether.
Changing the behavior is done by passing one of three values to the replyQueue
property on the connection hash:
!!! IMPORTANT !!! wascally cannot prevent queue naming collisions across services instances or connections when using the first two options.
Custom Name
Only changes the name of the reply queue that wascally creates - autoDelete
and subscribe
will be set to true
.
rabbit.addConnection( {
name: 'default',
replyQueue: 'myOwnQueue',
user: 'guest',
pass: 'guest',
server: '127.0.0.1',
port: 5672,
timeout: 2000,
vhost: '%2f'
} );
Custom Behavior
To take full control of the queue name and behavior, provide a queue definition in place of the name.
wascally provides no defaults - it will only use the definition provided
rabbit.addConnection( {
name: 'default',
replyQueue: {
name: 'myOwnQueue',
subscribe: 'true',
durable: true
},
user: 'guest',
pass: 'guest',
server: '127.0.0.1',
port: 5672,
timeout: 2000,
vhost: '%2f'
} );
No Automatic Reply Queue
Only pick this option if request/response isn't in use or when providing a custom overall strategy
rabbit.addConnection( {
name: 'default',
replyQueue: false,
user: 'guest',
pass: 'guest',
server: '127.0.0.1',
port: 5672,
timeout: 2000,
vhost: '%2f'
} );
Managing Topology
addExchange( exchangeName, exchangeType, [options], [connectionName] )
The call returns a promise that can be used to determine when the exchange has been created on the server.
Valid exchangeTypes:
- 'direct'
- 'fanout'
- 'topic'
Options is a hash that can contain the following:
- autoDelete true|false delete when consumer count goes to 0
- durable true|false survive broker restarts
- persistent true|false a.k.a. persistent delivery, messages saved to disk
- alternate 'alt.exchange' define an alternate exchange
addQueue( queueName, [options], [connectionName] )
The call returns a promise that can be used to determine when the queue has been created on the server.
Options is a hash that can contain the following:
- autoDelete true|false delete when consumer count goes to 0
- durable true|false survive broker restarts
- exclusive true|false limits queue to the current connection only (danger)
- subscribe true|false auto-start the subscription
- limit 2^16 max number of unacked messages allowed for consumer
- noAck true|false the server will remove messages from the queue as soon as they are delivered
- noBatch true|false causes ack, nack & reject to take place immediately
- queueLimit 2^32 max number of ready messages a queue can hold
- messageTtl 2^32 time in ms before a message expires on the queue
- expires 2^32 time in ms before a queue with 0 consumers expires
- deadLetter 'dlx.exchange' the exchange to dead-letter messages to
bindExchange( sourceExchange, targetExchange, [routingKeys], [connectionName] )
Binds the target exchange to the source exchange. Messages flow from source to target.
bindQueue( sourceExchange, targetQueue, [routingKeys], [connectionName] )
Binds the target queue to the source exchange. Messages flow from source to target.
Configuration via JSON
Note: setting subscribe to true will result in subscriptions starting immediately upon queue creation.
This example shows most of the available options described above.
var settings = {
connection: {
user: 'guest',
pass: 'guest',
server: '127.0.0.1',
port: 5672,
timeout: 2000,
vhost: '%2fmyhost'
},
exchanges:[
{ name: 'config-ex.1', type: 'fanout' },
{ name: 'config-ex.2', type: 'topic', alternate: 'alternate-ex.2', persistent: true },
{ name: 'dead-letter-ex.2', type: 'fanout' }
],
queues:[
{ name:'config-q.1', limit: 100, queueLimit: 1000 },
{ name:'config-q.2', subscribe: true, deadLetter: 'dead-letter-ex.2' }
],
bindings:[
{ exchange: 'config-ex.1', target: 'config-q.1', keys: [ 'bob','fred' ] },
{ exchange: 'config-ex.2', target: 'config-q.2', keys: 'test1' }
]
};
To establish a connection with all settings in place and ready to go call configure:
var rabbit = require( 'wascally' );
rabbit.configure( settings ).done( function() {
// ready to go!
} );
Closing Connections
Wascally will attempt to resolve all outstanding publishes and recieved messages (ack/nack/reject) before closing the channels and connection. If you would like to defer certain actions until after everything has been safely resolved, then use the promise returned from either close call.
!!! CAUTION !!! - using reset is dangerous. All topology associated with the connection will be removed meaning wasclly will not be able to re-establish it all should you decide to reconnect.
close( [connectionName], [reset] )
Closes the connection, optionall resetting all previously defined topology for the connection. The connectionName
uses default
if one is not provided.
closeAll( [reset] )
Closes all connections, optionally resetting the topology for all of them.
AMQPS, SSL/TLS Support
Providing the following configuration options setting the related environment varibles will cause wascally to attempt connecting via AMQPS. For more details about which settings perform what role, refer to the amqplib's page on SSL.
connection: { // sample connection hash
caPath: '', // comma delimited paths to CA files. RABBIT_CA
certPath: '', // path to cert file. RABBIT_CERT
keyPath: '', // path to key file. RABBIT_KEY
passphrase: '', // passphrase associated with cert/pfx. RABBIT_PASSPHRASE
pfxPath: '' // path to pfx file. RABBIT_PFX
}
Channel Prefetch Limits
Wascally mostly hides the notion of a channel behind the scenes, but still allows you to specify channel options such as the channel prefetch limit. Rather than specifying this on a channel object, however, it is specified as a limit
on a queue defintion.
queues: [{
// ...
limit: 5
}]
// or
rabbit.addQueue("some.q", {
// ...
limit: 5
});
This queue configuration will set a prefetch limit of 5 on the channel that is used for consuming this queue.
Note: The queue limit
is not the same as the queueLimit
option - the latter of which sets the maximum number of messages allowed in the queue.
Additional Learning Resources
Watch Me Code
Thanks to Derick Bailey's input, the API and documentation for wascally have improved a lot. You can learn from Derick's hands-on experience in his Watch Me Code series.
RabbitMQ In Action
Alvaro Vidella and Jason Williams literally wrote the book on RabbitMQ.
Enterprise Integration Patterns
Gregor Hophe and Bobby Woolf's definitive work on messaging. The site provides basic descriptions of the patterns and the book goes into a lot of detail.
I can't recommend this book highly enough; understanding the patterns will provide you with the conceptual tools need to be successful.
Contributing
PRs with insufficient coverage, broken tests or deviation from the style will not be accepted.
Behavior & Integration Tests
PRs should include modified or additional test coverage in both integration and behavioral specs. Integration tests assume RabbitMQ is running on localhost with guest/guest credentials and the consistent hash exchange plugin enabled. You can enable the plugin with the following command:
rabbit-plugins enable rabbitmq_consistent_hash_exchange
Running gulp will run both sets after every file change and display a coverage summary. To view a detailed report, run gulp coverage once to bring up the browser.
Style
This project has both an .editorconfig
and .esformatter
file to help keep adherance to style simple. Please also take advantage of the .jshintrc
file and avoid linter warnings.
Roadmap
- additional test coverage
- support RabbitMQ backpressure mechanisms
- (configurable) limits & behavior when publishing during connectivity issues
- ability to capture/log unpublished messages on shutdown
- add support for Rabbit's HTTP API
- enable better cluster utilization by spreading connections out over all nodes in cluster