Kafka基于消费者组实现消息广播

实现广播的方法:创建多个消费者组

为了实现广播,需要为每个接收广播消息的实体创建一个独立的消费者组。例如,假设有一个主题topic1包含需要广播的消息,并且有3个不同的应用程序(或服务)需要接收这些消息。为每个应用程序创建一个消费者组,这样每个消费者组都可以独立地从topic1中消费消息,从而实现消息的广播。

image-20221111-3

配置和注意事项

分区数量:
● 要确保分区数量足够多,以避免消费者组内的消费者闲置。
● 理想情况下,每个消费者组内的消费者应该能够分配到至少一个分区,否则会有消费者闲置,无法充分利用资源。
性能考虑:
● 创建多个消费者组会增加 Kafka 集群的负载,因为每个消费者组都需要进行消息拉取和处理。
● 要根据实际情况调整 poll 时间、消费者组的数量和分区数量,以达到性能和广播效果的平衡。

1
2
3
4
5
6
7
8
9
10
11
12
import lombok.*;

@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class IoTEvent {
private Integer deviceId;
private Long eventTimestamp;
private String deviceType;
private Integer eventValue;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class IoTEventSerializer implements org.apache.kafka.common.serialization.Serializer<IoTEvent> {
private final ObjectMapper objectMapper = new ObjectMapper();

@Override
@SneakyThrows
public byte[] serialize(String topic, IoTEvent data) {
return objectMapper.writeValueAsBytes(data);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class IoTEventDeserializer implements org.apache.kafka.common.serialization.Deserializer<IoTEvent> {
private final ObjectMapper objectMapper = new ObjectMapper();

@Override
@SneakyThrows
public IoTEvent deserialize(String topic, byte[] data) {
return objectMapper.readValue(data, IoTEvent.class);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;
import java.util.Random;

@Slf4j
public class IoTEventProducer {
public static void main(String[] args) {
Properties kaProperties = new Properties();
kaProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.31.230:9092");
kaProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
kaProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, IoTEventSerializer.class.getName());

try (Producer<String, IoTEvent> producer = new KafkaProducer<>(kaProperties)) {
Random random = new Random();
while (true) {
IoTEvent event = IoTEvent.builder()
.deviceId(random.nextInt(10))
.eventTimestamp(System.currentTimeMillis())
.deviceType("BOT")
.eventValue(random.nextInt(101))
.build();
ProducerRecord<String, IoTEvent> producerRecord = new ProducerRecord<>("iot-events", event.getDeviceId().toString(), event);
RecordMetadata result = producer.send(producerRecord).get();
log.info("topic={}, partition={}, offset = {}, timestamp = {}", result.topic(), result.partition(), result.offset(), result.timestamp());
Thread.sleep(1000); // Sleep for 1 second
}
} catch (Exception e) {
log.error("Error sending message", e);
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
@Slf4j
public class Group1Consumer {
public static void main(String[] args) {
Properties kaProperties = new Properties();
kaProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.31.230:9092");
kaProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
kaProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, IoTEventDeserializer.class.getName());
kaProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "group-1");

try (KafkaConsumer<String, IoTEvent> consumer = new KafkaConsumer<>(kaProperties)) {
consumer.subscribe(Collections.singletonList("iot-events"));
while (true) {
// 消费者从Kafka主题中拉取消息,等待时间为100毫秒,期间接收到新消息立即返回
ConsumerRecords<String, IoTEvent> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, IoTEvent> record : records) {
log.info("topic={}, partition={}, offset={}, key={}, value={}",
record.topic(), record.partition(), record.offset(), record.key(), record.value());
}
}
} catch (Exception e) {
log.error("Error consuming message", e);
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

@Slf4j
public class Group2Consumer {
public static void main(String[] args) {
Properties kaProperties = new Properties();
kaProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.31.230:9092");
kaProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
kaProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, IoTEventDeserializer.class.getName());
kaProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "group-2");

try (KafkaConsumer<String, IoTEvent> consumer = new KafkaConsumer<>(kaProperties)) {
consumer.subscribe(Collections.singletonList("iot-events"));
while (true) {
// 消费者从Kafka主题中拉取消息,等待时间为100毫秒,期间接收到新消息立即返回
ConsumerRecords<String, IoTEvent> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, IoTEvent> record : records) {
log.info("topic={}, partition={}, offset={}, key={}, value={}",
record.topic(), record.partition(), record.offset(), record.key(), record.value());
}
}
} catch (Exception e) {
log.error("Error consuming message", e);
}
}
}