生产者消息分区

1
2
3
4
5
6
7
8
9
10
11
12
[root@docker-vm kafka]# docker-compose exec -it kafka /bin/bash
WARN[0000] /home/kafka/docker-compose.yaml: the attribute `version` is obsolete, it will be ignored, please remove it to avoid potential confusion

$ /opt/bitnami/kafka/bin/kafka-topics.sh --create --bootstrap-server=localhost:9092 --topic=porders --partitions 3 --replication-factor 1
Created topic porders.

$ /opt/bitnami/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic porders

Topic: porders TopicId: CsLNaSudRgm_Xe5uinwrVg PartitionCount: 3 ReplicationFactor: 1 Configs:
Topic: porders Partition: 0 Leader: 1001 Replicas: 1001 Isr: 1001 Elr: N/A LastKnownElr: N/A
Topic: porders Partition: 1 Leader: 1001 Replicas: 1001 Isr: 1001 Elr: N/A LastKnownElr: N/A
Topic: porders Partition: 2 Leader: 1001 Replicas: 1001 Isr: 1001 Elr: N/A LastKnownElr: N/A
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class OrderProducer {
private static final Logger log = LoggerFactory.getLogger(OrderProducer.class);

public static void main(String[] args) throws InterruptedException, ExecutionException {
Properties kaProperties = new Properties();
//它用于指定初始连接的服务器地址列表。
kaProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.31.230:9092"); // 指定Kafka集群地址
kaProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // 设置键序列化类
kaProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName()); // 设置值序列化类
/*

org.apache.kafka.clients.producer.RoundRobinPartitioner:一种分区策略,在一系列连续记录中,每个记录都被发送到不同的分区,无论是否提供了“键”,直到分区用尽,然后该过程重新开始。
*/
kaProperties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, RoundRobinPartitioner.class.getName());
kaProperties.put(ProducerConfig.ACKS_CONFIG, "all");
kaProperties.put(ProducerConfig.RETRIES_CONFIG, "3");
kaProperties.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1"); // 限制每个连接的未完成请求数量

try (Producer<String, Order> producer = new KafkaProducer<>(kaProperties)) {
for(int i = 0; i < 10; i++) {
Order order = new Order("o" + i, System.currentTimeMillis(), "Laptop", 1, 999.99);
/*
如果未指定分区但存在键,则根据键的哈希值选择分区。
如果既没有分区也没有键,则选择粘性分区,当向该分区生成至少 batch.size 字节的数据时,该分区会发生变化。
*/
ProducerRecord<String, Order> producerRecord = new ProducerRecord<>("porders", null, order);
RecordMetadata result = producer.send(producerRecord).get();
log.info("topic={}, partition={}, offset = {}, timestamp = {}", result.topic(), result.partition(), result.offset(), result.timestamp());
}
} catch (Exception e) {
log.error("Error sending message", e);
}
}


public static class JsonSerializer implements org.apache.kafka.common.serialization.Serializer<Order> {
private final ObjectMapper objectMapper = new ObjectMapper();

@Override
public byte[] serialize(String topic, Order data) {
try {
return objectMapper.writeValueAsBytes(data);
} catch (Exception e) {
log.error("Error serializing JSON message", e);
throw new RuntimeException("Error serializing JSON message", e);
}
}
}
}

未设置Key,未设置partitioner.class

1
2
3
4
5
6
7
8
9
10
11
2025-01-11 21:23:17 [INFO] (TransactionManager.java:503) - [Producer clientId=producer-1] ProducerId set to 10 with epoch 0
2025-01-11 21:23:17 [INFO] (OrderProducer.java:36) - topic=porders, partition=1, offset = 14, timestamp = 1736601797657
2025-01-11 21:23:17 [INFO] (OrderProducer.java:36) - topic=porders, partition=1, offset = 15, timestamp = 1736601797710
2025-01-11 21:23:17 [INFO] (OrderProducer.java:36) - topic=porders, partition=1, offset = 16, timestamp = 1736601797714
2025-01-11 21:23:17 [INFO] (OrderProducer.java:36) - topic=porders, partition=1, offset = 17, timestamp = 1736601797717
2025-01-11 21:23:17 [INFO] (OrderProducer.java:36) - topic=porders, partition=1, offset = 18, timestamp = 1736601797720
2025-01-11 21:23:17 [INFO] (OrderProducer.java:36) - topic=porders, partition=1, offset = 19, timestamp = 1736601797723
2025-01-11 21:23:17 [INFO] (OrderProducer.java:36) - topic=porders, partition=1, offset = 20, timestamp = 1736601797726
2025-01-11 21:23:17 [INFO] (OrderProducer.java:36) - topic=porders, partition=1, offset = 21, timestamp = 1736601797729
2025-01-11 21:23:17 [INFO] (OrderProducer.java:36) - topic=porders, partition=1, offset = 22, timestamp = 1736601797733
2025-01-11 21:23:17 [INFO] (OrderProducer.java:36) - topic=porders, partition=1, offset = 23, timestamp = 1736601797736

设置了Key,未设置partitioner.class

1
2
3
4
5
6
7
8
9
10
11
2025-01-11 21:20:50 [INFO] (TransactionManager.java:503) - [Producer clientId=producer-1] ProducerId set to 7 with epoch 0
2025-01-11 21:20:50 [INFO] (OrderProducer.java:36) - topic=porders, partition=2, offset = 18, timestamp = 1736601650006
2025-01-11 21:20:50 [INFO] (OrderProducer.java:36) - topic=porders, partition=1, offset = 5, timestamp = 1736601650061
2025-01-11 21:20:50 [INFO] (OrderProducer.java:36) - topic=porders, partition=0, offset = 7, timestamp = 1736601650065
2025-01-11 21:20:50 [INFO] (OrderProducer.java:36) - topic=porders, partition=2, offset = 19, timestamp = 1736601650068
2025-01-11 21:20:50 [INFO] (OrderProducer.java:36) - topic=porders, partition=1, offset = 6, timestamp = 1736601650072
2025-01-11 21:20:50 [INFO] (OrderProducer.java:36) - topic=porders, partition=0, offset = 8, timestamp = 1736601650075
2025-01-11 21:20:50 [INFO] (OrderProducer.java:36) - topic=porders, partition=2, offset = 20, timestamp = 1736601650077
2025-01-11 21:20:50 [INFO] (OrderProducer.java:36) - topic=porders, partition=1, offset = 7, timestamp = 1736601650081
2025-01-11 21:20:50 [INFO] (OrderProducer.java:36) - topic=porders, partition=0, offset = 9, timestamp = 1736601650084
2025-01-11 21:20:50 [INFO] (OrderProducer.java:36) - topic=porders, partition=2, offset = 21, timestamp = 1736601650086

已设置partitioner.class

1
2
3
4
5
6
7
8
9
10
11
2025-01-11 21:13:50 [INFO] (TransactionManager.java:503) - [Producer clientId=producer-1] ProducerId set to 6 with epoch 0
2025-01-11 21:13:50 [INFO] (OrderProducer.java:32) - topic=porders, partition=2, offset = 14, timestamp = 1736601230629
2025-01-11 21:13:50 [INFO] (OrderProducer.java:32) - topic=porders, partition=1, offset = 2, timestamp = 1736601230677
2025-01-11 21:13:50 [INFO] (OrderProducer.java:32) - topic=porders, partition=0, offset = 4, timestamp = 1736601230681
2025-01-11 21:13:50 [INFO] (OrderProducer.java:32) - topic=porders, partition=2, offset = 15, timestamp = 1736601230684
2025-01-11 21:13:50 [INFO] (OrderProducer.java:32) - topic=porders, partition=1, offset = 3, timestamp = 1736601230687
2025-01-11 21:13:50 [INFO] (OrderProducer.java:32) - topic=porders, partition=0, offset = 5, timestamp = 1736601230689
2025-01-11 21:13:50 [INFO] (OrderProducer.java:32) - topic=porders, partition=2, offset = 16, timestamp = 1736601230692
2025-01-11 21:13:50 [INFO] (OrderProducer.java:32) - topic=porders, partition=1, offset = 4, timestamp = 1736601230696
2025-01-11 21:13:50 [INFO] (OrderProducer.java:32) - topic=porders, partition=0, offset = 6, timestamp = 1736601230699
2025-01-11 21:13:50 [INFO] (OrderProducer.java:32) - topic=porders, partition=2, offset = 17, timestamp = 1736601230702