Kafka基于偏移量和时间重放消息

基于偏移量重放消息

  1. 数据重新处理
    错误修复:处理消息时出错致数据不准确或丢失,可重放消息修复存储错误或逻辑错误。
  2. 功能测试和验证
    新功能测试:使用历史消息测试新功能的准确性和性能。
    性能测试:将偏移量设至高负载时段,评估系统优化或升级后的性能。
  3. 业务逻辑调整和审计
    业务更新:业务逻辑变更,重放消息以符合新规则。
    审计检查:设置偏移量,对特定时段消息重新处理,确保合规性。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
import java.util.Set;

@Slf4j
public class OffsetSeekConsumer {
public static void main(String[] args) {
Properties kaProperties = new Properties();
kaProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.31.230:9092");
kaProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
kaProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, IoTEventDeserializer.class.getName());
kaProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "iot-events-group");
//自动提交消费进度
kaProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
// 自动提交的时间间隔
kaProperties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");

try (KafkaConsumer<String, IoTEvent> consumer = new KafkaConsumer<>(kaProperties)) {

consumer.subscribe(Collections.singletonList("iot-events"));

// 获取当前消费者分配的主题分区集合。
Set<TopicPartition> assignment = consumer.assignment();

// 等待直到消费者被分配到主题分区。
// 由于初始分配可能需要一些时间,因此使用一个 while 循环不断轮询,
// 每次轮询间隔 100 毫秒,直到获取到分配的分区。
while (assignment.isEmpty()) {
// 调用 poll 方法以确保消费者与 Kafka 集群保持连接,并等待分配。
consumer.poll(Duration.ofMillis(100));
// 更新分配的分区集合。
assignment = consumer.assignment();
}

// 遍历所有分配的分区,并将每个分区的消费位置设置为偏移量 10。
for (TopicPartition topicPartition : assignment) {
// 将指定分区的消费位置设置为偏移量 10。
// 这意味着消费者将从每个分区的第 10 条消息开始读取。
consumer.seek(topicPartition, 10);
}

while (true) {
ConsumerRecords<String, IoTEvent> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, IoTEvent> record : records) {
log.info("topic={}, partition={}, offset={}, key={}, value={}",
record.topic(), record.partition(), record.offset(), record.key(), record.value());
}
}
} catch (Exception e) {
log.error("Error consuming message", e);
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
2025-01-16 11:40:57 [INFO] (KafkaMetricsCollector.java:270) - initializing Kafka metrics collector
2025-01-16 11:40:57 [INFO] (AppInfoParser.java:125) - Kafka version: 3.9.0
2025-01-16 11:40:57 [INFO] (AppInfoParser.java:126) - Kafka commitId: 84caaa6e9da06435
2025-01-16 11:40:57 [INFO] (AppInfoParser.java:127) - Kafka startTimeMs: 1736998857485
2025-01-16 11:40:57 [INFO] (ClassicKafkaConsumer.java:481) - [Consumer clientId=consumer-iot-events-group-1, groupId=iot-events-group] Subscribed to topic(s): iot-events
2025-01-16 11:40:57 [INFO] (Metadata.java:365) - [Consumer clientId=consumer-iot-events-group-1, groupId=iot-events-group] Cluster ID: B4mRavu_Tm-rLq1koR7IgQ
2025-01-16 11:40:57 [INFO] (AbstractCoordinator.java:937) - [Consumer clientId=consumer-iot-events-group-1, groupId=iot-events-group] Discovered group coordinator kafka:9092 (id: 2147482646 rack: null)
2025-01-16 11:40:57 [INFO] (AbstractCoordinator.java:605) - [Consumer clientId=consumer-iot-events-group-1, groupId=iot-events-group] (Re-)joining group
2025-01-16 11:40:57 [INFO] (AbstractCoordinator.java:1103) - [Consumer clientId=consumer-iot-events-group-1, groupId=iot-events-group] Request joining group due to: need to re-join with the given member-id: consumer-iot-events-group-1-2cf66a8d-cadf-462d-b62a-05c54ba70f26
2025-01-16 11:40:57 [INFO] (AbstractCoordinator.java:605) - [Consumer clientId=consumer-iot-events-group-1, groupId=iot-events-group] (Re-)joining group
2025-01-16 11:41:40 [INFO] (AbstractCoordinator.java:666) - [Consumer clientId=consumer-iot-events-group-1, groupId=iot-events-group] Successfully joined group with generation Generation{generationId=10, memberId='consumer-iot-events-group-1-2cf66a8d-cadf-462d-b62a-05c54ba70f26', protocol='range'}
2025-01-16 11:41:40 [INFO] (ConsumerCoordinator.java:664) - [Consumer clientId=consumer-iot-events-group-1, groupId=iot-events-group] Finished assignment for group at generation 10: {consumer-iot-events-group-1-2cf66a8d-cadf-462d-b62a-05c54ba70f26=Assignment(partitions=[iot-events-0])}
2025-01-16 11:41:40 [INFO] (AbstractCoordinator.java:843) - [Consumer clientId=consumer-iot-events-group-1, groupId=iot-events-group] Successfully synced group in generation Generation{generationId=10, memberId='consumer-iot-events-group-1-2cf66a8d-cadf-462d-b62a-05c54ba70f26', protocol='range'}
2025-01-16 11:41:40 [INFO] (ConsumerCoordinator.java:324) - [Consumer clientId=consumer-iot-events-group-1, groupId=iot-events-group] Notifying assignor about the new Assignment(partitions=[iot-events-0])
2025-01-16 11:41:40 [INFO] (ConsumerRebalanceListenerInvoker.java:58) - [Consumer clientId=consumer-iot-events-group-1, groupId=iot-events-group] Adding newly assigned partitions: iot-events-0
2025-01-16 11:41:40 [INFO] (ConsumerUtils.java:209) - Setting offset for partition iot-events-0 to the committed offset FetchPosition{offset=791, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=Optional[kafka:9092 (id: 1001 rack: null)], epoch=0}}
2025-01-16 11:41:40 [INFO] (ClassicKafkaConsumer.java:764) - [Consumer clientId=consumer-iot-events-group-1, groupId=iot-events-group] Seeking to offset 10 for partition iot-events-0
2025-01-16 11:41:41 [INFO] (OffsetConsumer.java:54) - topic=iot-events, partition=0, offset=10, key=3, value=IoTEvent(deviceId=3, eventTimestamp=1736997681860, deviceType=BOT, eventValue=88)
2025-01-16 11:41:41 [INFO] (OffsetConsumer.java:54) - topic=iot-events, partition=0, offset=11, key=3, value=IoTEvent(deviceId=3, eventTimestamp=1736997682880, deviceType=BOT, eventValue=62)
2025-01-16 11:41:41 [INFO] (OffsetConsumer.java:54) - topic=iot-events, partition=0, offset=12, key=1, value=IoTEvent(deviceId=1, eventTimestamp=1736997683885, deviceType=BOT, eventValue=29)
2025-01-16 11:41:41 [INFO] (OffsetConsumer.java:54) - topic=iot-events, partition=0, offset=13, key=5, value=IoTEvent(deviceId=5, eventTimestamp=1736997684894, deviceType=BOT, eventValue=4)

基于时间重放消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.Set;

@Slf4j
public class TimeSeekConsumer {
public static void main(String[] args) {
Properties kaProperties = new Properties();
kaProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.31.230:9092");
kaProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
kaProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, IoTEventDeserializer.class.getName());
kaProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "iot-events-group");
//自动提交消费进度
kaProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
// 自动提交的时间间隔
kaProperties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");

try (KafkaConsumer<String, IoTEvent> consumer = new KafkaConsumer<>(kaProperties)) {

consumer.subscribe(Collections.singletonList("iot-events"));

// 获取当前消费者分配的主题分区集合。
Set<TopicPartition> assignment = consumer.assignment();

// 等待直到消费者被分配到主题分区。
// 由于初始分配可能需要一些时间,因此使用一个 while 循环不断轮询,
// 每次轮询间隔 100 毫秒,直到获取到分配的分区。
while (assignment.isEmpty()) {
// 调用 poll 方法以确保消费者与 Kafka 集群保持连接,并等待分配。
consumer.poll(Duration.ofMillis(100));
// 更新分配的分区集合。
assignment = consumer.assignment();
}

// 指定时间戳(例如,当前时间减去1小时)
long timestamp = 1736997681860l;
// 创建一个Map来存储每个分区的时间戳
Map<TopicPartition, Long> timestampsToSearch = new HashMap<>();
for (TopicPartition topicPartition : assignment) {
timestampsToSearch.put(topicPartition, timestamp);
}

// 获取指定时间的偏移量
Map<TopicPartition, OffsetAndTimestamp> offsets = consumer.offsetsForTimes(timestampsToSearch);

// 遍历所有分配的分区,并将每个分区的消费位置设置为指定时间的偏移量。
for (TopicPartition topicPartition : assignment) {
OffsetAndTimestamp offsetAndTimestamp = offsets.get(topicPartition);
if (offsetAndTimestamp != null) {
// 将指定分区的消费位置设置为指定时间的偏移量。
consumer.seek(topicPartition, offsetAndTimestamp.offset());
} else {
// 如果没有找到指定时间的偏移量,则从头开始消费
consumer.seekToBeginning(Collections.singletonList(topicPartition));
}
}

while (true) {
ConsumerRecords<String, IoTEvent> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, IoTEvent> record : records) {
log.info("topic={}, partition={}, offset={}, key={}, value={}",
record.topic(), record.partition(), record.offset(), record.key(), record.value());
}
}
} catch (Exception e) {
log.error("Error consuming message", e);
}
}
}