@a-mehrabi/aranode-kafka-client 中文文档教程

发布于 3年前 浏览 27 更新于 3年前

Kafka Client Module

aranode 流引擎的 kafka 客户端实现。

Table of Contents

Installation

要在您的 aranode 项目中安装此模块, 你只需要使用以下命令添加包:

$ yarn add @a-mehrabi/aranode-kafka-client

之后,当你想使用它时, 您必须将其包含在自定义模块中,如下所示:

path: .env

ARANODE_CUSTOM_MODULES=@a-mehrabi/aranode-kafka-client

Usage

要使用 Kafka 客户端,您需要三个描述:

  1. Kafka 客户端配置 配置描述 (kafkaClientConfig)

  2. Kafka 生产者 适配器描述 (kafkaProducer)

  3. Kafka consumer 适配器描述 (kafkaConsumer)

Kafka client config

Kafka 客户端配置description 使您能够定义kafka客户端的配置,包括clientId、代理配置等。

您不在该配置中定义生产者和消费者的配置。

version: 1
kind: config
name: config-name

config:
  kafka:
    clientId: client-id
    brokers:
      - localhost:9092
    retry:
      retries: 8
      initialRetryTime: 100
      maxRetryTime: 2
      factor: 3
      multiplier: 4

Kafka Producer

要将消息发布到 Kafka 服务器,您必须创建一个适配器描述。 kafka 生产者适配器描述,为指定的 kafka 客户端设置一个处理程序。

流程示例:

version: 1
kind: flow
name: flow-name
entryPoint: start

flow:
  start:
    transform:
      path: 'dist/producer-transformer.js'
    nextNode: kaka

  kafka:
    port:
      name: kafka-client-out-port
      type: outbound
    terminal: true

配置示例:

path: adapter.yml

version: 1
kind: adapter
name: adapter-name

adapter:
  producer:
    client: kafka-client-config-name
    topic: topic-name

path: bind.yml

version: 1
kind: bindPro
name: bind-name

bind:
  flow-name:
    - type: outbound
      port: port-name
      adapter: adapter-name

Kafka Consumer

kafka consumer 允许一组机器或进程来协调对主题列表的访问,在消费者之间分配负载。

流程示例:

version: 1
kind: flow
name: flow-name
entryPoint: start

flow:
  start:
    nextNode: consumeData

  consumeData:
    transform:
      path: 'dist/consumer-transformer.js'
    terminal: true

配置示例:

path: adapter.yml

version: 1
kind: adapter
name: consumer-adapter-name

adapter:
  consumer:
    client: kafka-client-config-name
    groupId: group-id
    topic: topic-name
    fromBeginning: true

path: bind.yml

version: 1
kind: bind
name: bind-name

bind:
  flow-name:
    - type: inbound
      port: port-name
      adapter: consumer-adapter-name

API

kafkaClientConfig

类型:配置描述

选项:

  • clientId (string), 必需的

    Kafka客户端id

  • brokers (string[]), 必需

    的Kafka客户端必须配置至少一个broker。 代理被认为是种子代理,仅用于引导客户端和加载初始元

  • 列表中的

    数据重试机制,用于重试对Kafka的连接和API调用(当使用生产者或消费者时)

  • retries (number),可选,默认= 5

    >每次调用重试

  • initialRetryTime(数字),可选,默认值 = 300

    用于计算以毫秒为单位的重试的初始值(这仍然是按照随机化因子随机化的)

  • ma​​xRetryTime (数字),可选,默认值 = 30000

    以毫秒为单位的重试最长等待时间

  • 因子(数字),可选,默认值 = 0.2

    Randomization factor 要加载和使用的服务的名称

kafkaProducer

类型:适配器描述

选项:

  • client(字符串),必需的

    Kafka 客户端配置名称

  • topic (string), required, default = null

    Topic name

  • retry ([k: string]: number), optional

    重试option 可用于设置重试机制的配置,用于重试对 Kafka 的连接和 API 调用(当使用生产者或消费者时)

  • retries (number), optional, default = 5

    每次调用的最大重试次数

  • initialRetryTime (number), optional, default = 300

    用于计算重试的初始值,以毫秒为单位(这仍然是随机的随机因子)

  • ma​​xRetryTime(数字),可选,默认 = 30000

    重试的最大等待时间,以毫秒为单位

  • factor(数字),可选, default = 0.2

    随机化因子

  • multiplier (number), optional, default = 2

    Exponential factor

  • metadataMaxAge (number),可选,默认 = 300000

    以毫秒为单位的时间段,之后我们强制刷新元数据,即使我们没有看到任何分区领导更改以主动发现任何新代理或分区

  • allowAutoTopicCreation ( boolean), optional, default = true

    查询不存在主题的元数据时允许创建主题

  • idempotent (boolean), optional, default = false

    实验性的。 如果启用,生产者将确保每条消息只写入一次。 Acks 必须设置为 -1(“全部”)。 重试将默认为 MAXSAFEINTEGER。

  • transactionalId(字符串),可选

  • transactionTimeout(数字),可选,默认值 = 60000

    事务协调器将等待的最长时间(以毫秒为单位)在主动中止正在进行的交易之前,从生产者那里获得交易状态更新。
    如果此值大于 broker 中设置的事务最大超时毫秒数,请求将失败并出现 InvalidTransactionTimeout 错误

  • ma​​xInFlightRequests (number),可选,默认 = null

    Max number随时可能正在进行的请求。 如果为假则没有限制

kafkaConsumer

类型:适配器描述

选项:

  • client(字符串),必需的

    Kafka 客户端配置名称

  • groupId(字符串),必需的

    组 id

  • topic (string), required

    Topic name

  • fromBeginning (boolean), required

    消费者组在开始获取消息时将使用最新提交的偏移量。 如果偏移量无效或未定义,则 fromBeginning 定义消费者组的行为。

  • sessionTimeout (number), optional, default = 30000

    以毫秒为单位描述的消费者超时时间

Kafka Client Module

kafka client implementation for aranode flow engine.

Table of Contents

Installation

To install this module in your aranode project, you just need to add the package using following command:

$ yarn add @a-mehrabi/aranode-kafka-client

After that, when you want to use it, you must include it in custom modules like the following:

path: .env

ARANODE_CUSTOM_MODULES=@a-mehrabi/aranode-kafka-client

Usage

For using Kafka client, you need three descriptions:

  1. Kafka client config config description (kafkaClientConfig)

  2. Kafka producer adapter description (kafkaProducer)

  3. Kafka consumer adapter description (kafkaConsumer)

Kafka client config

Kafka Client config description enables you to define the config of the kafka client, including clientId, brokers config, etc.

You don't define configs of the producer and consumer in this config.

version: 1
kind: config
name: config-name

config:
  kafka:
    clientId: client-id
    brokers:
      - localhost:9092
    retry:
      retries: 8
      initialRetryTime: 100
      maxRetryTime: 2
      factor: 3
      multiplier: 4

Kafka Producer

to publish messages to Kafka server you have to create a adapter description. kafka producer adapter description, sets a handler for specified kafka client.

flow example:

version: 1
kind: flow
name: flow-name
entryPoint: start

flow:
  start:
    transform:
      path: 'dist/producer-transformer.js'
    nextNode: kaka

  kafka:
    port:
      name: kafka-client-out-port
      type: outbound
    terminal: true

Configs example:

path: adapter.yml

version: 1
kind: adapter
name: adapter-name

adapter:
  producer:
    client: kafka-client-config-name
    topic: topic-name

path: bind.yml

version: 1
kind: bindPro
name: bind-name

bind:
  flow-name:
    - type: outbound
      port: port-name
      adapter: adapter-name

Kafka Consumer

kafka consumer allow a group of machines or processes to coordinate access to a list of topics, distributing the load among the consumers.

flow example:

version: 1
kind: flow
name: flow-name
entryPoint: start

flow:
  start:
    nextNode: consumeData

  consumeData:
    transform:
      path: 'dist/consumer-transformer.js'
    terminal: true

Configs example:

path: adapter.yml

version: 1
kind: adapter
name: consumer-adapter-name

adapter:
  consumer:
    client: kafka-client-config-name
    groupId: group-id
    topic: topic-name
    fromBeginning: true

path: bind.yml

version: 1
kind: bind
name: bind-name

bind:
  flow-name:
    - type: inbound
      port: port-name
      adapter: consumer-adapter-name

API

kafkaClientConfig

Type: config description

Options:

  • clientId (string), required

    Kafka client id

  • brokers (string[]), required

    Kafka client must be configured with at least one broker. The brokers on the list are considered seed brokers and are only used to bootstrap the client and load initial metadata

  • retry ([k: string]: number), optional

    The retry option can be used to set the configuration of the retry mechanism, which is used to retry connections and API calls to Kafka (when using producers or consumers)

  • retries (number), optional, default = 5

    Max number of retries per call

  • initialRetryTime (number), optional, default = 300

    Initial value used to calculate the retry in milliseconds (This is still randomized following the randomization factor)

  • maxRetryTime (number), optional, default = 30000

    Maximum wait time for a retry in milliseconds

  • factor (number), optional, default = 0.2

    Randomization factor Name of the service that you want to load and use

kafkaProducer

Type: adapter description

Options:

  • client (string), required

    Kafka client config name

  • topic (string), required, default = null

    Topic name

  • retry ([k: string]: number), optional

    The retry option can be used to set the configuration of the retry mechanism, which is used to retry connections and API calls to Kafka (when using producers or consumers)

  • retries (number), optional, default = 5

    Max number of retries per call

  • initialRetryTime (number), optional, default = 300

    Initial value used to calculate the retry in milliseconds (This is still randomized following the randomization factor)

  • maxRetryTime (number), optional, default = 30000

    Maximum wait time for a retry in milliseconds

  • factor (number), optional, default = 0.2

    Randomization factor

  • multiplier (number), optional, default = 2

    Exponential factor

  • metadataMaxAge (number), optional, default = 300000

    The period of time in milliseconds after which we force a refresh of metadata even if we haven't seen any partition leadership changes to proactively discover any new brokers or partitions

  • allowAutoTopicCreation (boolean), optional, default = true

    Allow topic creation when querying metadata for non-existent topics

  • idempotent (boolean), optional, default = false

    Experimental. If enabled producer will ensure each message is written exactly once. Acks must be set to -1 ("all"). Retries will default to MAXSAFEINTEGER.

  • transactionalId (string), optional

  • transactionTimeout (number), optional, default = 60000

    The maximum amount of time in ms that the transaction coordinator will wait for a transaction status update from the producer before proactively aborting the ongoing transaction.
    If this value is larger than the transaction max timeout ms setting in the broker, the request will fail with a InvalidTransactionTimeout error

  • maxInFlightRequests (number), optional, default = null

    Max number of requests that may be in progress at any time. If falsey then no limit

kafkaConsumer

Type: adapter description

Options:

  • client (string), required

    Kafka client config name

  • groupId (string), required

    Group id

  • topic (string), required

    Topic name

  • fromBeginning (boolean), required

    The consumer group will use the latest committed offset when starting to fetch messages. If the offset is invalid or not defined, fromBeginning defines the behavior of the consumer group.

  • sessionTimeout (number), optional, default = 30000

    The consumer timeout time described in milliseconds

    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
    原文