Kafka 的有且仅有一次语义与事务消息

发布于 2024-03-06 09:06:54 字数 13527 浏览 20 评论 0

最近看到 Kafka 官方 wiki 上有一篇关于有且仅有一次语义与事务消息的文档(见 这里 ),里面说的非常详细。对于有且仅有一次语义与事务消息是什么东西,大家可以看我的上一篇博客,或者看 Kafka 的这篇 wiki,这里不做展开。这篇文章主要整理关于该语义和事务消息的 API 接口、数据流和配置。

生产者接口

生产者 API 的改动

生产者新增了 5 个新的方法(initTransactions, beginTransaction, sendOffsets, commitTransaction, abortTransaction),并且发送接口也增加了一个新的异常。见下面:

public interface Producer<K,V> extends Closeable {
/*
Needs to be called before any of the other transaction methods. Assumes that
the transactional.id is specified in the producer configuration.
This method does the following:
Ensures any transactions initiated by previous instances of the producer
are completed. If the previous instance had failed with a transaction in
progress, it will be aborted. If the last transaction had begun completion,
but not yet finished, this method awaits its completion.
Gets the internal producer id and epoch, used in all future transactional
messages issued by the producer.
@throws IllegalStateException if the TransactionalId for the producer is not set
  in the configuration.
*/
void initTransactions() throws IllegalStateException;
/*
Should be called before the start of each new transaction. @throws ProducerFencedException if another producer is with the same
  transactional.id is active.
*/
void beginTransaction() throws ProducerFencedException;
/*
Sends a list of consumed offsets to the consumer group coordinator, and also marks those offsets as part of the current transaction. These offsets will be considered consumed only if the transaction is committed successfully. This method should be used when you need to batch consumed and produced messages together, typically in a consume-transform-produce pattern. @throws ProducerFencedException if another producer is with the same
  transactional.id is active.
*/
void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,
String consumerGroupId) throws ProducerFencedException;
/*
Commits the ongoing transaction. @throws ProducerFencedException if another producer is with the same
  transactional.id is active.
*/
void commitTransaction() throws ProducerFencedException;
/*
Aborts the ongoing transaction. @throws ProducerFencedException if another producer is with the same
  transactional.id is active.
*/
void abortTransaction() throws ProducerFencedException;
/*
Send the given record asynchronously and return a future which will eventually contain the response information. @param record The record to send @return A future which will eventually contain the response information
*/
public Future<RecordMetadata> send(ProducerRecord<K, V> record);
/*
Send a record and invoke the given callback when the record has been acknowledged by the server
**/
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback);
}​

OutOfOrderSequence 异常

如果 broker 检测出数据丢失,生产者接口会抛出 OutOfOrderSequenceException 异常。换句话说,就是 broker 发现序列号比预期序列号高。异常会在 Future 中返回,并且如果存在 callback 的话会把异常传给 callback。这是一个严重异常,生产者后续调用 send, beginTransaction, commitTransaction 等方法都会抛出一个 IlegalStateException。

应用示例

以下是一个使用上述 API 的简单应用:

public class KafkaTransactionsExample {
public static void main(String args[]) {
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerConfig);
// Note that the ‘transactional.id’ configuration _must_ be specified in the
// producer config in order to use transactions.
KafkaProducer&lt;String, String&gt; producer = new KafkaProducer&lt;&gt;(producerConfig);
// We need to initialize transactions once per producer instance. To use transactions,
// it is assumed that the application id is specified in the config with the key
// transactional.id.
//
// This method will recover or abort transactions initiated by previous instances of a
// producer with the same app id. Any other transactional messages will report an error
// if initialization was not performed.
//
// The response indicates success or failure. Some failures are irrecoverable and will
// require a new producer  instance. See the documentation for TransactionMetadata for a
// list of error codes.
producer.initTransactions();
 
while(true) {
  ConsumerRecords&lt;String, String&gt; records = consumer.poll(CONSUMER_POLL_TIMEOUT);
  if (!records.isEmpty()) {
    // Start a new transaction. This will begin the process of batching the consumed
    // records as well
    // as an records produced as a result of processing the input records.
    //
    // We need to check the response to make sure that this producer is able to initiate
    // a new transaction.
    producer.beginTransaction();
     
    // Process the input records and send them to the output topic(s).
    List&lt;ProducerRecord&lt;String, String&gt;&gt; outputRecords = processRecords(records);
    for (ProducerRecord&lt;String, String&gt; outputRecord : outputRecords) {
      producer.send(outputRecord);
    }
     
    // To ensure that the consumed and produced messages are batched, we need to commit
    // the offsets through
    // the producer and not the consumer.
    //
    // If this returns an error, we should abort the transaction.
     
    sendOffsetsResult = producer.sendOffsetsToTransaction(getUncommittedOffsets());
     
  
    // Now that we have consumed, processed, and produced a batch of messages, let's
    // commit the results.
    // If this does not report success, then the transaction will be rolled back.
    producer.endTransaction();
  }
}
}
}
​

新的配置

broker 配置

  • transactional.id.timeout.ms:事务协调者超过多长时间没有收到生产者 TransactionalId 的事务状态更新就认为其过期。默认值为 604800000(7 天),这个值使得每星期执行一次的生产者任务可以持续维护其 ID。
  • max.transaction.timeout.ms:事务超时时间。如果 client 请求事务时间超过这个值,那么 broker 会在 InitPidRequest 中返回一个 InvalidTransactionTimeout 异常。这防止 client 出现超时时间太长,这会使得消费者消费事务相关的主题时变慢。默认值为 900000(15 分钟),这是一个保守的上限值。
  • transaction.state.log.replication.facto:事务状态主题的副本个数,默认值为 3。
  • transaction.state.log.num.partitions:事务状态主题的分区个数,默认值为 50。
  • transaction.state.log.min.isr:事务状态主题每个分区拥有多少个 insync 的副本才被视为上线。默认为 2。
  • transaction.state.log.segment.bytes:事务状态主题的日志段大小,默认为 104857600 字节。

生产者配置

  • enable.idempotence:是否使用幂等写(默认为 false)。如果为 false,生产者发送消息请求时不会携带 PID 字段,保持为与之前的语义一样。如果希望使用事务,那么这个值必须置位 true。如果为 true,那么会额外要求 acks=all,retries > 1,和 max.inflight.requests.per.connection=1。因为如果这些条件不满足,那么无法保证幂等性。如果应用没有显示指明这些属性,那么在启用幂等性时生产者会设置 acks=all,retries=Integer.MAX_VALUE,和 max.inflight.requests.per.connection=1。
  • transaction.timeout.ms:生产者超过多久没有更新事务状态,事务协调者会将其进行中的事务回滚。这个值会随着 InitPidRequest 一起发送给事务协调者。如果这个值大于 broker 设置的 max.transaction.timeout.ms,那么请求会抛出 InvalidTransactionTimeout 异常。默认值为 60000,防止下游消费阻塞等待超过 1 分钟。
  • transactional.id:事务投递所使用的 TransactionalId 值。这个可以保证多个生产者会话的可靠性语义,因为这可以保证在使用相同 TransactionalId 的情况下,老的事务必须完成才能开启新的事务。需要注意的是,如果启用这个值,必须先设置 enable.idempotence 为 true。此值默认为空,意味着没有使用事务。

消费者配置

  • isolation.level:以下是可以取的值(默认为 read_uncommitted):1)read_uncommitted:按位移顺序按序消费消息,无论其为提交还是未提交。2)read_committed:按位移顺序按序消费消息,但只消费非事务消息和已提交的事务消息;为了保持位移顺序,read_committed 会使得消费者需要在获取到同一事务中的所有消息前需要缓存消息。

语义保证

生产者幂等性保证

为了实现生产者幂等性语义,我们引入了生产者 ID(也称为 PID)和消息序列号的概念。每一个新的生产者在初始化的时候都会赋予一个 PID。PID 的设置是对使用者透明的,不会在客户端中暴露出来。

对于一个指定的 PID,序列号从 0 开始并且单调递增,每个主题分区都有一个序列号序列。生产者发送消息到 broker 后会增加序列号。broker 则在内存中维护每个 PID 发到主题分区的序列号,一旦发现当前收到的序列号没有比上一次收到的序列号刚好大 1,那么就会拒绝当前的生产者请求。如果消息携带的序列号比预期低而导致重复异常,生产者会忽略掉这个异常;如果消息携带的序列号比预期高而导致乱序异常,这就意味着有一些消息可能丢失了,这个异常是非常严重的。

通过这样的方法,就保证了即便生产者在出现失败的情况下进行重试,每个消息也只会在日志中仅出现一次。由于每个新的生产者实例都会分配一个新的唯一 PID,因此只能保证单个生产者会话中实现幂等性。

这些幂等的生产者语义对于像指标跟踪和审计等应用可能非常有用。

事务保证

事务保证的核心就是,使得应用能够原子性的生产消息到多个分区,写入到这些分区的消息要么都成功要么都失败。

进一步地,由于消费者也是通过写入到位移主题来进行记录的,因此事务的能力可以用来使得应用将消费动作和生产动作原子化,也就是说消息被消费了当且仅当整个“消费-转换-生产”的链条都执行完毕。

另外,有状态的应用也可以实现跨越多个会话的连续性。也就是说,Kafka 可以保证跨越应用边界的生产幂等性和事务性。为了达到这个目标,应用需要提供一个唯一 ID,而且这个唯一 ID 能够跨越应用边界保持稳定不变。在下面的阐述中,会使用 TransactionalId 表示这个 ID。TransactionalId 和 PID 是一一对应的,区别在于 TransactionalId 是用户提供的,至于为什么 TransactionalId 能够保证跨越生产者会话的幂等性的原因下面来分析。

当提供了这样的一个 TransactionalId,Kafka 保证:

  1. 对于一个 TransactionalId,只会有一个活跃的生产者。当具有相同 TransactionalId 的生产者上线时,会把老的生产者强制下线。
  2. 事务恢复跨越应用会话。如果一个应用实例死亡,下一个实例启动时会保证之前进行中的事务会被结束(提交或回滚),这样就保证了新的实例处于一个干净的状态。

需要注意的是,这里提到的事务保证是从生产者的角度来看的。对于消费者,这个保证会稍微弱一点。具体来讲,我们不能保证一个已提交事务的所有消息可以一起被消费。原因如下:

  1. 对于 compact 类型的主题,一个事务中的消息可能被更新的版本所代替。
  2. 事务可能跨越日志段。因此当老的日志段被删除了,可能会损失一个事务的开始部分。
  3. 消费者可以定位到事务中的任意位置开始消费,因此可能会丢失该事务的开始部分消息。
  4. 消费者可能消费不到事务中涉及到的分区。因此不能读取到该事务的所有消息。

核心概念

为了实现事务,也就是保证一组消息可以原子性生产和消费,Kafka 引入了如下概念;

  1. 引入了事务协调者(Transaction Coordinator)的概念。与消费者的组协调者类似,每个生产者会有对应的事务协调者,赋予 PID 和管理事务的逻辑都由事务协调者来完成。
  2. 引入了事务日志(Transaction Log)的内部主题。与消费者位移主题类似,事务日志是每个事务的持久化多副本存储。事务协调者使用事务日志来保存当前活跃事务的最新状态快照。
  3. 引入了控制消息(Control Message)的概念。这些消息是客户端产生的并写入到主题的特殊消息,但对于使用者来说不可见。它们是用来让 broker 告知消费者之前拉取的消息是否被原子性提交。控制消息之前在 这里 被提到过。
  4. 引入了 TransactionalId 的概念,TransactionalId 可以让使用者唯一标识一个生产者。一个生产者被设置了相同的 TransactionalId 的话,那么该生产者的不同实例会恢复或回滚之前实例的未完成事务。
  5. 引入了生产者 epoch 的概念。生产者 epoch 可以保证对于一个指定的 TransactionalId 只会有一个合法的生产者实例,从而保证事务性即便出现故障的情况下。

除了引入了上述概念之外,Kafka 还有新的请求类型、已有请求类型的版本升级和新的消息格式,以支持事务。这些细节在本篇文章中不过多涉及。

数据流

data-flow

在上面这幅图中,尖角框代表不同的机器,圆角框代表 Kafka 的主题分区(TopicPartition),对角线圆角框代表运行在 broker 中的逻辑实体。

每个箭头代表一个 rpc 或者主题的写入。这些操作的先后顺序见旁边的数字,下面按顺序来介绍这些操作。

1. 查询事务协调者(FindCoordinatorRequest 请求)

事务协调者是设置 PID 和管理事务的核心,因此生产者第一件事就是向 broker 发起 FindCoordinatorRequest 请求(之前命名为 GroupCoordinatorRequest,此版本将其重命名)获取其协调者。

2. 获取生产者 ID(InitPidRequest 请求)

在查询到事务协调者之后,生产者下一步就是获取其生产者 ID,这一步是通过向事务协调者发送 InitPidRequest 来实现。

2.1 如果指定了 TransactionalId 的话

如果在配置中指定了 transactional.id,transactional.id 会在 InitPidRequest 请求中传递过来,transactional.id 与生产者 ID 的映射会在步骤 2a 中记录到事务日志。这样未来的生产者如果发送了相同的 transactional.id 则返回这个相同的 PID,从而可以恢复或回滚之前未完成的事务。

在返回 PID 之外,InitPidRequest 还会完成如下任务:

  1. 增加生产者的 epoch 值,这样之前的生产者僵尸实例会被断开,不能继续操作事务。
  2. 恢复(提交或回滚)之前该 PID 对应的生产者实例的未完成事务。

InitPidRequest 请求是同步的,一旦返回,生产者可以发送数据和开启新的事务。

2.2 如果 TransactionalId 未指定

如果 TransactionalId 未指定,会赋予一个新的 PID,该生产者可以在其当前会话期间实现幂等性和事务性语义。

3. 开启事务(beginTransaction 方法)

新的 KafkaProducer 有一个 beginTransaction() 方法,调用该方法会开启一个新的事务。生产者在本地状态中记录事务已经开始,只有发送第一个记录时协调者才会知道事务开始状态。

4. 消费-转换-生产的循环

在这个阶段中,生产者开始事务中的消费-转换-生产循环,这个阶段比较长而且可能由多个请求组成。

4.1 AddPartitionsToTxnRequest

在一个事务中,如果需要写入一个新的主题分区,那么生产者会发送此请求到事务协调者。协调者在步骤 4.1a 中会记录该分区添加到事务中。这个信息是必要的,因为这样才能写入提交或回滚标记到事务中的每个分区(见 5.2 步骤)。如果这是事务写入的第一个分区,那么协调者还会开始事务定时器。

4.2 ProduceRequest

生产者通过一个或多个 ProduceRequests 请求(在生产者 send 方法内部发出)写入消息到主题中。这些请求包含 PID,epoch 和序列号,见图中的 4.2a。

4.3 AddOffsetCommitsToTxnRequest

生产者有一个新的 sendOffsetsToTransaction 方法,该方法可以将消息消费和生产结合起来。方法参数包含一个 Map<TopicPartitions, OffsetAndMetadata>和一个 groupId。

sendOffsetsToTransaction 内部发送一个带有 groupId 的 AddOffsetCommitsToTxnRequests 请求到事务协调者,事务协调者从内部的__consumer-offsets 主题中根据此消费者组获取到相应的主题分区。事务协调者在步骤 4.3a 中把这个主题分区记录到事务日志中。

4.4 TxnOffsetCommitRequest

生产者发送 TxnOffsetCommitRequest 请求到消费协调者来在主题__consumer-offsets 中持久化位移(见 4.4a)。消费协调者通过请求中的 PID 和生产者 epoch 来验证生产者是否允许发起该请求。

已消费的位移在提交事务之后才对外可见,此过程在下面来讨论。

5. 提交或回滚事务

消息数据写入之后,使用者需要调用 KafkaProducer 中的 commitTransaction 或 abortTransaction 方法,这两个方法分别为事务的提交和回滚处理方法。

5.1 EndTxnRequest

当生产者结束事务的时候,需要调用 KafkaProducer.endTransaction 或者 KafkaProducer.abortTransaction 方法。前者使得步骤 4 中的数据对下游的消费者可见,后者则从日志中抹除已生产的数据:这些数据不会对用户可见,也就是说下游消费者会读取并丢弃这些回滚消息。

无论调用哪个方法,生产者都是会发起 EndTxnRequest 请求到事务协调者,然后通过参数来指明事务提交或回滚。接收到此请求后,协调者会:

  1. 写入 PREPARE_COMMIT 或 PREPARE_ABORT 消息到事务日志(见 5.1a)
  2. 通过 WriteTxnMarkerRequest 请求写入命令消息(COMMIT 或 ABORT)到用户日志中(见下面 5.2)
  3. 写入 COMMITTED 或 ABORTED 消息到事务日志中

5.2 WriteTxnMarkerRequest 请求

这个请求是事务协调者发给事务中每个分区的 leader 的。接收到此请求后,每个 broker 会写入 COMMIT(PID) 或 ABORT(PID) 控制消息到日志中(步骤 5.2a)。

这个消息向消费者指明该 PID 的消息传递给用户还是丢弃。因此,消费者接收到带有 PID 的消息后会缓存起来,直到读取到 COMMIT 或者 ABORT 消息,然后决定消息是通知用户还是丢弃。

另外,如果事务中涉及到__consumer-offsets 主题,那么 commit 或者 abort 的标记同样写入到日志中,消费协调者会被告知这些位移是否标记为已消费(事务提交则为已消费,事务回滚则忽略这些位移)。见步骤 4.2a。

5.3 写入最后的提交或回滚消息

在 commit 或 abort 标记写入到数据日志后,事务协调者写入最终的 COMMITTED 或 ABORTED 消息到事务日志,标记该事务已经完成(见图中的步骤 5.3)。在这个时候,事务日志中关于这个事务的大部分消息都可以被删除;只需要保留该事务的 PID 和时间戳,这样可以最终删除关于该生产者的 TransactionalId->PID 映射,详情可参考 PID 过期的相关资料。

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据

关于作者

过期情话

暂无简介

0 文章
0 评论
23 人气
更多

推荐作者

qq_E2Iff7

文章 0 评论 0

Archangel

文章 0 评论 0

freedog

文章 0 评论 0

Hunk

文章 0 评论 0

18819270189

文章 0 评论 0

wenkai

文章 0 评论 0

    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
    原文