@a-mehrabi/aranode-kafka-client 中文文档教程
Kafka Client Module
aranode 流引擎的 kafka 客户端实现。
Table of Contents
Installation
要在您的 aranode 项目中安装此模块, 你只需要使用以下命令添加包:
$ yarn add @a-mehrabi/aranode-kafka-client
之后,当你想使用它时, 您必须将其包含在自定义模块中,如下所示:
path: .env
ARANODE_CUSTOM_MODULES=@a-mehrabi/aranode-kafka-client
Usage
要使用 Kafka 客户端,您需要三个描述:
Kafka 客户端配置 配置描述 (kafkaClientConfig)
Kafka 生产者 适配器描述 (kafkaProducer)
Kafka consumer 适配器描述 (kafkaConsumer)
Kafka client config
Kafka 客户端配置description 使您能够定义kafka客户端的配置,包括clientId、代理配置等。
您不在该配置中定义生产者和消费者的配置。
version: 1
kind: config
name: config-name
config:
kafka:
clientId: client-id
brokers:
- localhost:9092
retry:
retries: 8
initialRetryTime: 100
maxRetryTime: 2
factor: 3
multiplier: 4
Kafka Producer
要将消息发布到 Kafka 服务器,您必须创建一个适配器描述。 kafka 生产者适配器描述,为指定的 kafka 客户端设置一个处理程序。
流程示例:
version: 1
kind: flow
name: flow-name
entryPoint: start
flow:
start:
transform:
path: 'dist/producer-transformer.js'
nextNode: kaka
kafka:
port:
name: kafka-client-out-port
type: outbound
terminal: true
配置示例:
path: adapter.yml
version: 1
kind: adapter
name: adapter-name
adapter:
producer:
client: kafka-client-config-name
topic: topic-name
path: bind.yml
version: 1
kind: bindPro
name: bind-name
bind:
flow-name:
- type: outbound
port: port-name
adapter: adapter-name
Kafka Consumer
kafka consumer 允许一组机器或进程来协调对主题列表的访问,在消费者之间分配负载。
流程示例:
version: 1
kind: flow
name: flow-name
entryPoint: start
flow:
start:
nextNode: consumeData
consumeData:
transform:
path: 'dist/consumer-transformer.js'
terminal: true
配置示例:
path: adapter.yml
version: 1
kind: adapter
name: consumer-adapter-name
adapter:
consumer:
client: kafka-client-config-name
groupId: group-id
topic: topic-name
fromBeginning: true
path: bind.yml
version: 1
kind: bind
name: bind-name
bind:
flow-name:
- type: inbound
port: port-name
adapter: consumer-adapter-name
API
kafkaClientConfig
类型:配置描述
选项:
clientId (string), 必需的
Kafka客户端id
brokers (string[]), 必需
的Kafka客户端必须配置至少一个broker。 代理被认为是种子代理,仅用于引导客户端和加载初始元
列表中的
数据重试机制,用于重试对Kafka的连接和API调用(当使用生产者或消费者时)
retries (number),可选,默认=
5
>每次调用重试
initialRetryTime(数字),可选,默认值 =
300
用于计算以毫秒为单位的重试的初始值(这仍然是按照随机化因子随机化的)
maxRetryTime (数字),可选,默认值 =
30000
以毫秒为单位的重试最长等待时间
因子(数字),可选,默认值 =
0.2
Randomization factor 要加载和使用的服务的名称
kafkaProducer
类型:适配器描述
选项:
client(字符串),必需的
Kafka 客户端配置名称
topic (string), required, default =
null
Topic name
retry ([k: string]: number), optional
重试option 可用于设置重试机制的配置,用于重试对 Kafka 的连接和 API 调用(当使用生产者或消费者时)
retries (number), optional, default =
5
每次调用的最大重试次数
initialRetryTime (number), optional, default =
300
用于计算重试的初始值,以毫秒为单位(这仍然是随机的随机因子)
maxRetryTime(数字),可选,默认 =
30000
重试的最大等待时间,以毫秒为单位
factor(数字),可选, default =
0.2
随机化因子
multiplier (number), optional, default =
2
Exponential factor
metadataMaxAge (number),可选,默认 =
300000
以毫秒为单位的时间段,之后我们强制刷新元数据,即使我们没有看到任何分区领导更改以主动发现任何新代理或分区
allowAutoTopicCreation ( boolean), optional, default =
true
查询不存在主题的元数据时允许创建主题
idempotent (boolean), optional, default =
false
实验性的。 如果启用,生产者将确保每条消息只写入一次。 Acks 必须设置为 -1(“全部”)。 重试将默认为 MAXSAFEINTEGER。
transactionalId(字符串),可选
transactionTimeout(数字),可选,默认值 =
60000
事务协调器将等待的最长时间(以毫秒为单位)在主动中止正在进行的交易之前,从生产者那里获得交易状态更新。
如果此值大于 broker 中设置的事务最大超时毫秒数,请求将失败并出现 InvalidTransactionTimeout 错误maxInFlightRequests (number),可选,默认 =
null
Max number随时可能正在进行的请求。 如果为假则没有限制
kafkaConsumer
类型:适配器描述
选项:
client(字符串),必需的
Kafka 客户端配置名称
groupId(字符串),必需的
组 id
topic (string), required
Topic name
fromBeginning (boolean), required
消费者组在开始获取消息时将使用最新提交的偏移量。 如果偏移量无效或未定义,则 fromBeginning 定义消费者组的行为。
sessionTimeout (number), optional, default =
30000
以毫秒为单位描述的消费者超时时间
Kafka Client Module
kafka client implementation for aranode flow engine.
Table of Contents
Installation
To install this module in your aranode project, you just need to add the package using following command:
$ yarn add @a-mehrabi/aranode-kafka-client
After that, when you want to use it, you must include it in custom modules like the following:
path: .env
ARANODE_CUSTOM_MODULES=@a-mehrabi/aranode-kafka-client
Usage
For using Kafka client, you need three descriptions:
Kafka client config config description (kafkaClientConfig)
Kafka producer adapter description (kafkaProducer)
Kafka consumer adapter description (kafkaConsumer)
Kafka client config
Kafka Client config description enables you to define the config of the kafka client, including clientId, brokers config, etc.
You don't define configs of the producer and consumer in this config.
version: 1
kind: config
name: config-name
config:
kafka:
clientId: client-id
brokers:
- localhost:9092
retry:
retries: 8
initialRetryTime: 100
maxRetryTime: 2
factor: 3
multiplier: 4
Kafka Producer
to publish messages to Kafka server you have to create a adapter description. kafka producer adapter description, sets a handler for specified kafka client.
flow example:
version: 1
kind: flow
name: flow-name
entryPoint: start
flow:
start:
transform:
path: 'dist/producer-transformer.js'
nextNode: kaka
kafka:
port:
name: kafka-client-out-port
type: outbound
terminal: true
Configs example:
path: adapter.yml
version: 1
kind: adapter
name: adapter-name
adapter:
producer:
client: kafka-client-config-name
topic: topic-name
path: bind.yml
version: 1
kind: bindPro
name: bind-name
bind:
flow-name:
- type: outbound
port: port-name
adapter: adapter-name
Kafka Consumer
kafka consumer allow a group of machines or processes to coordinate access to a list of topics, distributing the load among the consumers.
flow example:
version: 1
kind: flow
name: flow-name
entryPoint: start
flow:
start:
nextNode: consumeData
consumeData:
transform:
path: 'dist/consumer-transformer.js'
terminal: true
Configs example:
path: adapter.yml
version: 1
kind: adapter
name: consumer-adapter-name
adapter:
consumer:
client: kafka-client-config-name
groupId: group-id
topic: topic-name
fromBeginning: true
path: bind.yml
version: 1
kind: bind
name: bind-name
bind:
flow-name:
- type: inbound
port: port-name
adapter: consumer-adapter-name
API
kafkaClientConfig
Type: config description
Options:
clientId (string), required
Kafka client id
brokers (string[]), required
Kafka client must be configured with at least one broker. The brokers on the list are considered seed brokers and are only used to bootstrap the client and load initial metadata
retry ([k: string]: number), optional
The retry option can be used to set the configuration of the retry mechanism, which is used to retry connections and API calls to Kafka (when using producers or consumers)
retries (number), optional, default =
5
Max number of retries per call
initialRetryTime (number), optional, default =
300
Initial value used to calculate the retry in milliseconds (This is still randomized following the randomization factor)
maxRetryTime (number), optional, default =
30000
Maximum wait time for a retry in milliseconds
factor (number), optional, default =
0.2
Randomization factor Name of the service that you want to load and use
kafkaProducer
Type: adapter description
Options:
client (string), required
Kafka client config name
topic (string), required, default =
null
Topic name
retry ([k: string]: number), optional
The retry option can be used to set the configuration of the retry mechanism, which is used to retry connections and API calls to Kafka (when using producers or consumers)
retries (number), optional, default =
5
Max number of retries per call
initialRetryTime (number), optional, default =
300
Initial value used to calculate the retry in milliseconds (This is still randomized following the randomization factor)
maxRetryTime (number), optional, default =
30000
Maximum wait time for a retry in milliseconds
factor (number), optional, default =
0.2
Randomization factor
multiplier (number), optional, default =
2
Exponential factor
metadataMaxAge (number), optional, default =
300000
The period of time in milliseconds after which we force a refresh of metadata even if we haven't seen any partition leadership changes to proactively discover any new brokers or partitions
allowAutoTopicCreation (boolean), optional, default =
true
Allow topic creation when querying metadata for non-existent topics
idempotent (boolean), optional, default =
false
Experimental. If enabled producer will ensure each message is written exactly once. Acks must be set to -1 ("all"). Retries will default to MAXSAFEINTEGER.
transactionalId (string), optional
transactionTimeout (number), optional, default =
60000
The maximum amount of time in ms that the transaction coordinator will wait for a transaction status update from the producer before proactively aborting the ongoing transaction.
If this value is larger than the transaction max timeout ms setting in the broker, the request will fail with a InvalidTransactionTimeout errormaxInFlightRequests (number), optional, default =
null
Max number of requests that may be in progress at any time. If falsey then no limit
kafkaConsumer
Type: adapter description
Options:
client (string), required
Kafka client config name
groupId (string), required
Group id
topic (string), required
Topic name
fromBeginning (boolean), required
The consumer group will use the latest committed offset when starting to fetch messages. If the offset is invalid or not defined, fromBeginning defines the behavior of the consumer group.
sessionTimeout (number), optional, default =
30000
The consumer timeout time described in milliseconds