Kafka消息事务
事务协调者
在Apache Kafka中,事务协调者(Transaction Coordinator)是Kafka Broker内置的一个组件。它负责管理生产者和消费者之间的事务性操作,确保消息生产和消费的原子性和一致性。
具体来说,事务协调者的主要职责包括:
- 事务状态管理:跟踪每个事务的状态(例如,开启、准备提交、已提交、已中止等),并确保这些状态在Broker之间保持一致。
- 幂等性和恰好一次语义:通过与生产者协作,确保每条消息只能被写入一次,即使在网络故障或重试的情况下也不会重复写入。
- 跨分区事务支持:允许生产者在一个事务中向多个主题和分区发送消息,并保证这些操作要么全部成功,要么全部失败。
- 消费偏移量管理:在事务性消费场景下,协调者还负责管理消费者的偏移量提交,确保只有当所有相关操作都成功完成时才会更新偏移量。
处理步骤

1 | import org.apache.kafka.clients.producer.KafkaProducer; |
整个事务的执行过程如下所示。
- KAFKA 生产者通过 initTransactions API 将自定义的 transactional.id 注册到 transactional coordinator。此时 coordinator 会关闭所有有相同 transactional.id 且处于 pending 状态的事务,同时也会递增 epoch 来屏蔽僵尸生产者(zombie producers)。该操作对每个 producer session 只执行一次 producer.initTransaction()。
- KAFKA 生产者通过 beginTransaction API 开启事务,并通过 send API 发送消息到目标 topic。此时消息对应的 partition 会首先被注册到 transactional coordinator,然后 producer 按照正常流程发送消息到目标 topic,且在发送消息时内部会通过校验屏蔽掉僵尸生产者 producer.beginTransaction();producer.send()*N;。
- KAFKA 生产者通过 commitTransaction API 提交事务或通过 abortTransaction API 回滚事务。此时会向 transactional coordinator 提交请求,开始两阶段提交协议 producer.commitTransaction();producer.abortTransaction();
○ 第一阶段,transactional coordinator 更新内存中的事务状态为“prepare_commit”,并将该状态持久化到 transaction log 中
○ 第二阶段,coordinator 首先写 transaction marker 标记到目标 topic 的目标 partition,在向目标topic的目标partition写完控制消息后,会更新事务状态为“commited”或“abort”,并将该状态持久化到 transaction log中。
1 | import org.apache.kafka.clients.consumer.ConsumerConfig; |