
开发环境准备
● 添加依赖
在pom.xml文件中添加Spring Kafka依赖:
1 2 3 4 5 6
| ... <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> ...
|
● 配置 Kafka
在application.yml文件中进行Kafka相关配置:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| spring: application: name: ex00900 server: port: 8080 kafka: bootstrap-servers: 192.168.31.230:29092,192.168.31.230:39092,192.168.31.230:49092, consumer: group-id: kafka-consumer-group auto-offset-reset: earliest enable-auto-commit: false key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: com.itlaoqi.ex00900.IoTEventDeserializer producer: key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: com.itlaoqi.ex00900.IoTEventSerializer listener: # `spring.kafka.listener.ack-mode`用于指定消费者的消息确认模式。 # Ack模式相关信息: # 1. RECORD模式:提交时机为每条消息处理后,不可手动控制,可能导致重复消费,适用于需要逐条确认的场景。 # 2. BATCH模式:提交时机为每批消息处理后,不可手动控制,可能导致重复消费,适用于批量处理消息的场景。 # 3. TIME模式:提交时机基于时间间隔,不可手动控制,可能导致重复消费,适用于对消息处理结果要求不高的场景。 # 4. COUNT模式:提交时机基于消息数量,不可手动控制,可能导致重复消费,适用于需要按固定数量提交的场景。 # 5. COUNT_TIME模式:提交时机基于时间和数量,不可手动控制,可能导致重复消费,适用于需要兼顾时间和数量的场景。 # 6. MANUAL模式:提交时机为手动调用、同步提交,可手动控制,不会导致重复消费,适用于需要精确控制偏移量提交时机的场景。 # 7. MANUAL_IMMEDIATE模式:提交时机为手动调用、异步提交,可手动控制,不会导致重复消费,适用于需要精确控制偏移量提交时机、避免阻塞的场景。 ack-mode: manual
|
其中,ack-mode用于指定消费者的消息确认模式,这里选择了manual模式,即手动调用、同步提交,可手动控制偏移量提交时机,避免重复消费。
定义实体类
● 创建一个物联网终端事件实体类IoTEvent:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| package com.itlaoqi.ex00900;
import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Data; import lombok.NoArgsConstructor;
@Data @Builder @NoArgsConstructor @AllArgsConstructor public class IoTEvent { private Integer deviceId; private Long eventTimestamp; private String deviceType; private Integer eventValue; }
|
实现序列化和反序列化
● 序列化器IoTEventSerializer
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| package com.itlaoqi.ex00900;
import com.fasterxml.jackson.databind.ObjectMapper; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j;
@Slf4j public class IoTEventSerializer implements org.apache.kafka.common.serialization.Serializer<IoTEvent> { private final ObjectMapper objectMapper = new ObjectMapper();
@Override @SneakyThrows public byte[] serialize(String topic, IoTEvent data) { return objectMapper.writeValueAsBytes(data); } }
|
● 反序列化器 IoTEventDeserializer
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| package com.itlaoqi.ex00900;
import com.fasterxml.jackson.databind.ObjectMapper; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j;
@Slf4j public class IoTEventDeserializer implements org.apache.kafka.common.serialization.Deserializer<IoTEvent> { private final ObjectMapper objectMapper = new ObjectMapper();
@Override @SneakyThrows public IoTEvent deserialize(String topic, byte[] data) { return objectMapper.readValue(data, IoTEvent.class); } }
|
消息生产者
创建一个RESTful控制器IotEventController来发送消息:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| package com.itlaoqi.ex00900;
import jakarta.annotation.Resource; import lombok.SneakyThrows; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.SendResult; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController;
import java.util.Random; import java.util.concurrent.CompletableFuture;
@RestController public class IotEventController { @Resource private KafkaTemplate<String, IoTEvent> kafkaTemplate; @GetMapping("/send") @SneakyThrows public IoTEvent sendEvent() { CompletableFuture<SendResult<String, IoTEvent>> future = kafkaTemplate.send("iot-events", IoTEvent.builder().deviceId(1).deviceType("x10t").eventTimestamp(System.currentTimeMillis()).eventValue(new Random().nextInt(100)).build()); SendResult<String, IoTEvent> result = future.get(); return result.getProducerRecord().value(); } }
|
当访问/send接口时,会生成一个IoTEvent对象并发送到iot-events主题。
消息消费者
创建一个消息消费者类IotEventConsumer:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| package com.itlaoqi.ex00900;
import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.support.Acknowledgment; import org.springframework.stereotype.Component;
@Component public class IotEventConsumer {
@KafkaListener(topics = "iot-events") public void listen(IoTEvent event, Acknowledgment acknowledgment) { System.out.println("Received message: " + event); acknowledgment.acknowledge(); } }
|
使用@KafkaListener注解监听iot-events主题,当接收到消息后,打印消息内容并手动确认消息。