@abeai/job-consumer 中文文档教程
Overview
该模块可用于实例化能够从多种类型的代理后端接收作业的作业消费者。 每个消费者连接到一个队列以从中接收作业。 “队列”是一个通用术语,用于表示作业管道(Pub/Sub 通道、AMQP 通道/队列、SQS 队列)。
当前支持的后端:
- AMQP protocol via RabbitMQ
- AWS Lambda via AWS SQS - Also supports AWS SQS directly
- Redis via Bull.js
Broker Configuration
所有代理后端配置都由环境变量处理。 您可以在下面找到每个代理后端类型的所有环境变量和预期环境变量。
General
NODE_ENV
NODE_PATH
JOBCONSUMERNAME
JOBBROKERQUEUE
要订阅的队列。
JOBBROKERAPI_URL
@abeai/job-broker API 本身使用的 URL。 这用于重试作业。 有关详细信息,请参阅下面的“选项,重试策略”。
JOBBROKERHOST
支持所需代理后端的主机。
JOBBROKERAUTOGENERATEQUEUE
设置为 true
以自动生成任何已尝试但不存在的队列。 目前这仅适用于 SQS 代理后端。
JOBBROKERUSER
如果支持身份验证,代理后端使用的用户名。
JOBBROKERPASSWORD
如果支持身份验证,代理后端使用的密码。
Required
JOBBROKERBACKEND
所需的经纪人后端。 目前支持以下字符串。
- AMQP
- SQS
- LAMBDA
- REDIS
AMQP
JOBBROKERHOST
amqp://{YOUR AMQP HOST}
SQS
JOBBROKERHOST
https://sqs.{YOUR AWS REGION}.amazonaws.com/{YOUR AWS ACCOUNT ID}/
JOBBROKERUSER
AWS 共享凭证配置文件。
AWS_REGION
AWS 区域。
LAMBDA
Lambda 处理程序称为 _lambdaExecute
。 不需要环境变量。
REDIS
JOBBROKERHOST
redis://{YOUR REDIS HOST}
Interface
Constructor
const consumerFactory = require('@abeai/job-consumer');
consumerFactory(options);
Options
consume(job) (function)
该函数将在收到作业时执行。 完成后,作业将从其各自的队列中删除。
job
任意解析的 JSON 数据传递到消费者各自的队列中。
error(error) (function)
此函数为失败的作业提供自定义错误处理。 通常这将只包括日志记录,因为重试逻辑由下面的 retryStrategy
选项处理。
removeOnError (boolean)
如果设置为 true
,任何失败的作业都将得到代理的确认,并随后从其各自的队列中删除。 默认为 false
。
retryStrategy(options) (function)
如果发生错误(并且 removeOnError
未设置为 true
),此方法将用作作业的重试策略。 您必须返回一个定义重试作业之前等待的时间量的整数,或者返回 null
以取消当前重试并允许作业被其代理确认。
如果未设置此选项且 removeOnError
未设置为 true
,则将重试作业,两次重试之间的等待时间逐渐延长。
options (object)
- attempts: Current number of attempts for the job being retried.
- error: The error which caused the retry attempts.
Overview
This module can be used to instantiate a job-consumer that is able to receive jobs from multiple types of broker backends. Each consumer connects to a single queue to receive jobs from. "queue" is a general term used to represent a job pipeline (Pub/Sub channel, AMQP channel/queue, SQS queue).
Currently supported backends:
- AMQP protocol via RabbitMQ
- AWS Lambda via AWS SQS - Also supports AWS SQS directly
- Redis via Bull.js
Broker Configuration
All broker backend configurations are handled by environment variables. Below you can find all environment variables and the expected environment variables for each broker backend type.
General
NODE_ENV
NODE_PATH
JOBCONSUMERNAME
JOBBROKERQUEUE
The queue to subscribe to.
JOBBROKERAPI_URL
URL used by @abeai/job-broker API itself. This is used to retry jobs. See 'Options, retryStrategy' below for more information.
JOBBROKERHOST
The host that is supporting your desired broker backend.
JOBBROKERAUTOGENERATEQUEUE
Set to true
to automatically generate any queues that are attempted but do not exist. Currently this only applies to SQS broker backends.
JOBBROKERUSER
Username used by broker backend if authentication is supported.
JOBBROKERPASSWORD
Password used by broker backend if authentication is supported.
Required
JOBBROKERBACKEND
Desired broker backend. Currently supports the following strings.
- AMQP
- SQS
- LAMBDA
- REDIS
AMQP
JOBBROKERHOST
amqp://{YOUR AMQP HOST}
SQS
JOBBROKERHOST
https://sqs.{YOUR AWS REGION}.amazonaws.com/{YOUR AWS ACCOUNT ID}/
JOBBROKERUSER
AWS shared credentials profile.
AWS_REGION
AWS region.
LAMBDA
Lambda handler is called _lambdaExecute
. No required environment variables.
REDIS
JOBBROKERHOST
redis://{YOUR REDIS HOST}
Interface
Constructor
const consumerFactory = require('@abeai/job-consumer');
consumerFactory(options);
Options
consume(job) (function)
This function will be executed when a job is received. Upon completion the job will be removed from its respective queue.
job
Arbitrary parsed JSON data passed into the consumer's respective queue.
error(error) (function)
This function provides custom error handling for a failed job. Generally this will only include logging as retry logic is handled by the below retryStrategy
option.
removeOnError (boolean)
If set to true
any failed jobs will be acknowledge with the broker and subsequently removed from its respective queue. Defaults to false
.
retryStrategy(options) (function)
This method will be used as a job's retry strategy in the event of an error (and with removeOnError
not set to true
). You must return either an integer defining the amount of time to wait before retrying the job or null
to cancel the current retry and allow the job to be acknowledged with its broker.
If this option is not set and removeOnError
is not set to true
the job will be retried with a progressively longer wait time between retries.
options (object)
- attempts: Current number of attempts for the job being retried.
- error: The error which caused the retry attempts.