Kafka消息事务

事务协调者

在Apache Kafka中,事务协调者(Transaction Coordinator)是Kafka Broker内置的一个组件。它负责管理生产者和消费者之间的事务性操作,确保消息生产和消费的原子性和一致性。
具体来说,事务协调者的主要职责包括:

  1. 事务状态管理:跟踪每个事务的状态(例如,开启、准备提交、已提交、已中止等),并确保这些状态在Broker之间保持一致。
  2. 幂等性和恰好一次语义:通过与生产者协作,确保每条消息只能被写入一次,即使在网络故障或重试的情况下也不会重复写入。
  3. 跨分区事务支持:允许生产者在一个事务中向多个主题和分区发送消息,并保证这些操作要么全部成功,要么全部失败。
  4. 消费偏移量管理:在事务性消费场景下,协调者还负责管理消费者的偏移量提交,确保只有当所有相关操作都成功完成时才会更新偏移量。

处理步骤

image-20221119-2

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.errors.ProducerFencedException;

import java.util.Properties;

public class TransactionalProducer {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.31.230:29092,192.168.31.230:39092,192.168.31.230:49092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

// 启用事务配置
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true"); // 必须开启幂等性
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "order-transaction-id-1"); // 唯一事务ID

KafkaProducer<String, String> producer = new KafkaProducer<>(props);

// 初始化事务
producer.initTransactions();

try {
// 开启事务
producer.beginTransaction();

// 发送订单消息到 orders Topic
ProducerRecord<String, String> orderRecord = new ProducerRecord<>(
"orders", "order-123", "{\"user\":\"Alice\", \"amount\":100}"
);
producer.send(orderRecord);

// 发送库存扣减消息到 inventory Topic
ProducerRecord<String, String> inventoryRecord = new ProducerRecord<>(
"inventory", "item-001", "{\"item\":\"phone\", \"count\":-1}"
);
producer.send(inventoryRecord);

// 提交事务(两条消息会原子性写入)
producer.commitTransaction();
} catch (ProducerFencedException e) {
// 事务终止(如发生异常)
producer.abortTransaction();
e.printStackTrace();
} finally {
producer.close();
}
}
}

整个事务的执行过程如下所示。

  1. KAFKA 生产者通过 initTransactions API 将自定义的 transactional.id 注册到 transactional coordinator。此时 coordinator 会关闭所有有相同 transactional.id 且处于 pending 状态的事务,同时也会递增 epoch 来屏蔽僵尸生产者(zombie producers)。该操作对每个 producer session 只执行一次 producer.initTransaction()。
  2. KAFKA 生产者通过 beginTransaction API 开启事务,并通过 send API 发送消息到目标 topic。此时消息对应的 partition 会首先被注册到 transactional coordinator,然后 producer 按照正常流程发送消息到目标 topic,且在发送消息时内部会通过校验屏蔽掉僵尸生产者 producer.beginTransaction();producer.send()*N;。
  3. KAFKA 生产者通过 commitTransaction API 提交事务或通过 abortTransaction API 回滚事务。此时会向 transactional coordinator 提交请求,开始两阶段提交协议 producer.commitTransaction();producer.abortTransaction();
    ○ 第一阶段,transactional coordinator 更新内存中的事务状态为“prepare_commit”,并将该状态持久化到 transaction log 中
    ○ 第二阶段,coordinator 首先写 transaction marker 标记到目标 topic 的目标 partition,在向目标topic的目标partition写完控制消息后,会更新事务状态为“commited”或“abort”,并将该状态持久化到 transaction log中。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class TransactionalConsumer {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.31.230:29092,192.168.31.230:39092,192.168.31.230:49092");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.GROUP_ID_CONFIG, "order-consumer-group");

// 设置只读取已提交的事务消息
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("orders"));

while (true) {
consumer.poll(Duration.ofMillis(100)).forEach(record -> {
System.out.printf("收到订单消息: key=%s, value=%s%n", record.key(), record.value());
});
}
}
}