Spring Boot开发Kafka应用

image-20221120-2

开发环境准备

● 添加依赖
在pom.xml文件中添加Spring Kafka依赖:

1
2
3
4
5
6
...
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
...

● 配置 Kafka
在application.yml文件中进行Kafka相关配置:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
spring:
application:
name: ex00900
server:
port: 8080
kafka:
bootstrap-servers: 192.168.31.230:29092,192.168.31.230:39092,192.168.31.230:49092,
consumer:
group-id: kafka-consumer-group
auto-offset-reset: earliest
enable-auto-commit: false
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: com.itlaoqi.ex00900.IoTEventDeserializer
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: com.itlaoqi.ex00900.IoTEventSerializer
listener:
# `spring.kafka.listener.ack-mode`用于指定消费者的消息确认模式。
# Ack模式相关信息:
# 1. RECORD模式:提交时机为每条消息处理后,不可手动控制,可能导致重复消费,适用于需要逐条确认的场景。
# 2. BATCH模式:提交时机为每批消息处理后,不可手动控制,可能导致重复消费,适用于批量处理消息的场景。
# 3. TIME模式:提交时机基于时间间隔,不可手动控制,可能导致重复消费,适用于对消息处理结果要求不高的场景。
# 4. COUNT模式:提交时机基于消息数量,不可手动控制,可能导致重复消费,适用于需要按固定数量提交的场景。
# 5. COUNT_TIME模式:提交时机基于时间和数量,不可手动控制,可能导致重复消费,适用于需要兼顾时间和数量的场景。
# 6. MANUAL模式:提交时机为手动调用、同步提交,可手动控制,不会导致重复消费,适用于需要精确控制偏移量提交时机的场景。
# 7. MANUAL_IMMEDIATE模式:提交时机为手动调用、异步提交,可手动控制,不会导致重复消费,适用于需要精确控制偏移量提交时机、避免阻塞的场景。
ack-mode: manual

其中,ack-mode用于指定消费者的消息确认模式,这里选择了manual模式,即手动调用、同步提交,可手动控制偏移量提交时机,避免重复消费。

定义实体类

● 创建一个物联网终端事件实体类IoTEvent:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
package com.itlaoqi.ex00900;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

/**
* 物联网终端事件实体类
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class IoTEvent {
private Integer deviceId;
private Long eventTimestamp;
private String deviceType;
private Integer eventValue;
}

实现序列化和反序列化

● 序列化器IoTEventSerializer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
package com.itlaoqi.ex00900;

import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class IoTEventSerializer implements org.apache.kafka.common.serialization.Serializer<IoTEvent> {
private final ObjectMapper objectMapper = new ObjectMapper();

@Override
@SneakyThrows
public byte[] serialize(String topic, IoTEvent data) {
return objectMapper.writeValueAsBytes(data);
}
}

● 反序列化器 IoTEventDeserializer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
package com.itlaoqi.ex00900;

import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class IoTEventDeserializer implements org.apache.kafka.common.serialization.Deserializer<IoTEvent> {
private final ObjectMapper objectMapper = new ObjectMapper();

@Override
@SneakyThrows
public IoTEvent deserialize(String topic, byte[] data) {
return objectMapper.readValue(data, IoTEvent.class);
}
}

消息生产者

创建一个RESTful控制器IotEventController来发送消息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
package com.itlaoqi.ex00900;

import jakarta.annotation.Resource;
import lombok.SneakyThrows;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.Random;
import java.util.concurrent.CompletableFuture;

@RestController
public class IotEventController {
@Resource
private KafkaTemplate<String, IoTEvent> kafkaTemplate;
@GetMapping("/send")
@SneakyThrows
public IoTEvent sendEvent() {
CompletableFuture<SendResult<String, IoTEvent>> future = kafkaTemplate.send("iot-events", IoTEvent.builder().deviceId(1).deviceType("x10t").eventTimestamp(System.currentTimeMillis()).eventValue(new Random().nextInt(100)).build());
SendResult<String, IoTEvent> result = future.get();
return result.getProducerRecord().value();
}
}

当访问/send接口时,会生成一个IoTEvent对象并发送到iot-events主题。

消息消费者

创建一个消息消费者类IotEventConsumer:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
package com.itlaoqi.ex00900;

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;

@Component
public class IotEventConsumer {
/*
自动提交模式
@KafkaListener(topics = "iot-events")
public void listen(IoTEvent event) {
System.out.println("Received message: " + event);
}*/

/*手动提交模式*/
@KafkaListener(topics = "iot-events")
public void listen(IoTEvent event, Acknowledgment acknowledgment) {
System.out.println("Received message: " + event);
acknowledgment.acknowledge();
}
}

使用@KafkaListener注解监听iot-events主题,当接收到消息后,打印消息内容并手动确认消息。