实现广播的方法:创建多个消费者组
为了实现广播,需要为每个接收广播消息的实体创建一个独立的消费者组。例如,假设有一个主题topic1包含需要广播的消息,并且有3个不同的应用程序(或服务)需要接收这些消息。为每个应用程序创建一个消费者组,这样每个消费者组都可以独立地从topic1中消费消息,从而实现消息的广播。

配置和注意事项
分区数量:
● 要确保分区数量足够多,以避免消费者组内的消费者闲置。
● 理想情况下,每个消费者组内的消费者应该能够分配到至少一个分区,否则会有消费者闲置,无法充分利用资源。
性能考虑:
● 创建多个消费者组会增加 Kafka 集群的负载,因为每个消费者组都需要进行消息拉取和处理。
● 要根据实际情况调整 poll 时间、消费者组的数量和分区数量,以达到性能和广播效果的平衡。
1 2 3 4 5 6 7 8 9 10 11 12
| import lombok.*;
@Data @Builder @NoArgsConstructor @AllArgsConstructor public class IoTEvent { private Integer deviceId; private Long eventTimestamp; private String deviceType; private Integer eventValue; }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| import com.fasterxml.jackson.databind.ObjectMapper; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j;
@Slf4j public class IoTEventSerializer implements org.apache.kafka.common.serialization.Serializer<IoTEvent> { private final ObjectMapper objectMapper = new ObjectMapper();
@Override @SneakyThrows public byte[] serialize(String topic, IoTEvent data) { return objectMapper.writeValueAsBytes(data); } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| import com.fasterxml.jackson.databind.ObjectMapper; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j;
@Slf4j public class IoTEventDeserializer implements org.apache.kafka.common.serialization.Deserializer<IoTEvent> { private final ObjectMapper objectMapper = new ObjectMapper();
@Override @SneakyThrows public IoTEvent deserialize(String topic, byte[] data) { return objectMapper.readValue(data, IoTEvent.class); } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.producer.*; import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties; import java.util.Random;
@Slf4j public class IoTEventProducer { public static void main(String[] args) { Properties kaProperties = new Properties(); kaProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.31.230:9092"); kaProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); kaProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, IoTEventSerializer.class.getName());
try (Producer<String, IoTEvent> producer = new KafkaProducer<>(kaProperties)) { Random random = new Random(); while (true) { IoTEvent event = IoTEvent.builder() .deviceId(random.nextInt(10)) .eventTimestamp(System.currentTimeMillis()) .deviceType("BOT") .eventValue(random.nextInt(101)) .build(); ProducerRecord<String, IoTEvent> producerRecord = new ProducerRecord<>("iot-events", event.getDeviceId().toString(), event); RecordMetadata result = producer.send(producerRecord).get(); log.info("topic={}, partition={}, offset = {}, timestamp = {}", result.topic(), result.partition(), result.offset(), result.timestamp()); Thread.sleep(1000); } } catch (Exception e) { log.error("Error sending message", e); } } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration; import java.util.Collections; import java.util.Properties; @Slf4j public class Group1Consumer { public static void main(String[] args) { Properties kaProperties = new Properties(); kaProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.31.230:9092"); kaProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); kaProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, IoTEventDeserializer.class.getName()); kaProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "group-1");
try (KafkaConsumer<String, IoTEvent> consumer = new KafkaConsumer<>(kaProperties)) { consumer.subscribe(Collections.singletonList("iot-events")); while (true) { ConsumerRecords<String, IoTEvent> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, IoTEvent> record : records) { log.info("topic={}, partition={}, offset={}, key={}, value={}", record.topic(), record.partition(), record.offset(), record.key(), record.value()); } } } catch (Exception e) { log.error("Error consuming message", e); } } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration; import java.util.Collections; import java.util.Properties;
@Slf4j public class Group2Consumer { public static void main(String[] args) { Properties kaProperties = new Properties(); kaProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.31.230:9092"); kaProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); kaProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, IoTEventDeserializer.class.getName()); kaProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "group-2");
try (KafkaConsumer<String, IoTEvent> consumer = new KafkaConsumer<>(kaProperties)) { consumer.subscribe(Collections.singletonList("iot-events")); while (true) { ConsumerRecords<String, IoTEvent> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, IoTEvent> record : records) { log.info("topic={}, partition={}, offset={}, key={}, value={}", record.topic(), record.partition(), record.offset(), record.key(), record.value()); } } } catch (Exception e) { log.error("Error consuming message", e); } } }
|