Docker快速部署KRaft集群

image-20221118-5

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
mkdir /home/kafka
cd /home/kafka
cat > docker-compose.yaml <<-'EOF'
# 定义一组服务,这些服务将在 Docker 中运行,用于搭建 Kafka 集群
services:
# 定义第一个控制器服务,名为 controller-1
controller-1:
# 指定使用的 Docker 镜像,这里使用 Apache Kafka 的最新版本
image: apache/kafka:3.9.0
# 为该容器指定一个名称,方便后续管理和识别
container_name: controller-1
# 配置容器的环境变量,这些变量将影响 Kafka 实例的行为
environment:
# 为该 Kafka 节点分配一个唯一的 ID,用于在集群中标识该节点
KAFKA_NODE_ID: 1
# 指定该节点的角色为控制器,负责集群的元数据管理和协调
KAFKA_PROCESS_ROLES: controller
# 定义该节点监听的地址和端口,这里使用 CONTROLLER 协议监听 9093 端口
KAFKA_LISTENERS: CONTROLLER://:9093
# 指定用于内部 broker 之间通信的监听器名称
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
# 指定用于控制器通信的监听器名称
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
# 列出控制器集群中的所有投票节点及其地址和端口,用于选举和协调
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@controller-1:9093,2@controller-2:9093,3@controller-3:9093
# 设置消费者组初始重平衡的延迟时间为 0 毫秒,减少重平衡等待时间
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0

# 定义第二个控制器服务,名为 controller-2
controller-2:
image: apache/kafka:3.9.0
container_name: controller-2
environment:
KAFKA_NODE_ID: 2
KAFKA_PROCESS_ROLES: controller
KAFKA_LISTENERS: CONTROLLER://:9093
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@controller-1:9093,2@controller-2:9093,3@controller-3:9093
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0

# 定义第三个控制器服务,名为 controller-3
controller-3:
image: apache/kafka:3.9.0
container_name: controller-3
environment:
KAFKA_NODE_ID: 3
KAFKA_PROCESS_ROLES: controller
KAFKA_LISTENERS: CONTROLLER://:9093
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@controller-1:9093,2@controller-2:9093,3@controller-3:9093
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0

# 定义第一个 broker 服务,名为 broker-1
broker-1:
image: apache/kafka:3.9.0
container_name: broker-1
# 将容器内部的 9092 端口映射到宿主机的 29092 端口,方便外部访问
ports:
- 29092:9092
environment:
KAFKA_NODE_ID: 4
# 指定该节点的角色为 broker,负责存储和处理消息
KAFKA_PROCESS_ROLES: broker
# 定义该节点监听的地址和端口,支持两种协议,分别用于内部和外部通信
KAFKA_LISTENERS: 'PLAINTEXT://:19092,PLAINTEXT_HOST://:9092'
# 向外部客户端和其他 broker 广告该节点的地址和端口
KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://broker-1:19092,PLAINTEXT_HOST://192.168.31.230:29092'
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
# 定义不同监听器使用的安全协议
# `CONTROLLER:PLAINTEXT`: 指定控制器监听器使用明文传输(无加密)。
# `PLAINTEXT:PLAINTEXT`: 指定普通监听器也使用明文传输。
# `PLAINTEXT_HOST:PLAINTEXT`: 指定特定主机的监听器同样使用明文传输。
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@controller-1:9093,2@controller-2:9093,3@controller-3:9093
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
# 指定该服务依赖于 controller-1、controller-2 和 controller-3 服务,确保在启动前这些服务已启动
depends_on:
- controller-1
- controller-2
- controller-3

# 定义第二个 broker 服务,名为 broker-2
broker-2:
image: apache/kafka:3.9.0
container_name: broker-2
ports:
- 39092:9092
environment:
KAFKA_NODE_ID: 5
KAFKA_PROCESS_ROLES: broker
KAFKA_LISTENERS: 'PLAINTEXT://:19092,PLAINTEXT_HOST://:9092'
KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://broker-2:19092,PLAINTEXT_HOST://192.168.31.230:39092'
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@controller-1:9093,2@controller-2:9093,3@controller-3:9093
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
depends_on:
- controller-1
- controller-2
- controller-3

# 定义第三个 broker 服务,名为 broker-3
broker-3:
image: apache/kafka:3.9.0
container_name: broker-3
ports:
- 49092:9092
environment:
KAFKA_NODE_ID: 6
KAFKA_PROCESS_ROLES: broker
KAFKA_LISTENERS: 'PLAINTEXT://:19092,PLAINTEXT_HOST://:9092'
KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://broker-3:19092,PLAINTEXT_HOST://192.168.31.230:49092'
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@controller-1:9093,2@controller-2:9093,3@controller-3:9093
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
depends_on:
- controller-1
- controller-2
- controller-3

EOF

docker-compose up -d

image-20221118-6

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
cat > docker-compose.yaml <<-'EOF'
services:
kafka-1:
image: apache/kafka:3.9.0
container_name: kafka-1
ports:
- 29092:9092
environment:
KAFKA_NODE_ID: 1
KAFKA_PROCESS_ROLES: "broker,controller"
KAFKA_LISTENERS: 'PLAINTEXT://:19092,PLAINTEXT_HOST://:9092,CONTROLLER://:9093'
KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://kafka-1:19092,PLAINTEXT_HOST://192.168.31.230:29092,CONTROLLER://kafka-1:9093'
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka-1:9093,2@kafka-2:9093,3@kafka-3:9093'
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0

kafka-2:
image: apache/kafka:3.9.0
container_name: kafka-2
ports:
- 39092:9092
environment:
KAFKA_NODE_ID: 2
KAFKA_PROCESS_ROLES: "broker,controller"
KAFKA_LISTENERS: 'PLAINTEXT://:19092,PLAINTEXT_HOST://:9092,CONTROLLER://:9093'
KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://kafka-2:19092,PLAINTEXT_HOST://192.168.31.230:39092,CONTROLLER://kafka-2:9093'
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka-1:9093,2@kafka-2:9093,3@kafka-3:9093'
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0

kafka-3:
image: apache/kafka:3.9.0
container_name: kafka-3
ports:
- 49092:9092
environment:
KAFKA_NODE_ID: 3
KAFKA_PROCESS_ROLES: "broker,controller"
KAFKA_LISTENERS: 'PLAINTEXT://:19092,PLAINTEXT_HOST://:9092,CONTROLLER://:9093'
KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://kafka-3:19092,PLAINTEXT_HOST://192.168.31.230:49092,CONTROLLER://kafka-3:9093'
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka-1:9093,2@kafka-2:9093,3@kafka-3:9093'
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
EOF

docker-compose up -d
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;
import java.util.Random;

@Slf4j
public class IoTEventProducer {
public static void main(String[] args) {
Properties kaProperties = new Properties();
kaProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.31.230:29092,192.168.31.230:39092,192.168.31.230:49092");
kaProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
kaProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, IoTEventSerializer.class.getName());

try (Producer<String, IoTEvent> producer = new KafkaProducer<>(kaProperties)) {
Random random = new Random();
while (true) {
IoTEvent event = IoTEvent.builder()
.deviceId(random.nextInt(10))
.eventTimestamp(System.currentTimeMillis())
.deviceType("BOT")
.eventValue(random.nextInt(101))
.build();
ProducerRecord<String, IoTEvent> producerRecord = new ProducerRecord<>("iot-events", event.getDeviceId().toString(), event);
RecordMetadata result = producer.send(producerRecord).get();
log.info("topic={}, partition={}, offset = {}, timestamp = {}", result.topic(), result.partition(), result.offset(), result.timestamp());
Thread.sleep(1000); // Sleep for 1 second
}
} catch (Exception e) {
log.error("Error sending message", e);
}
}
}