Java程序接入Kakfa

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com.lixiang</groupId>
<artifactId>ex00100</artifactId>
<version>1.0-SNAPSHOT</version>

<properties>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<!-- Kafka客户端库,用于与Kafka集群进行通信 -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.9.0</version>
</dependency>

<!-- Logback日志框架,用于记录应用程序的日志 -->
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.10</version>
</dependency>
</dependencies>
</project>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

public class HelloWorldProducer {

public static void main(String[] args) {

// 创建Properties对象以存储Kafka生产者的配置属性
Properties kaProperties = new Properties();

// 设置Kafka服务器的地址
kaProperties.put("bootstrap.servers", "192.168.31.230:9092");

// 设置键的序列化器为字符串类型
kaProperties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");

// 设置值的序列化器为字符串类型
kaProperties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

// 创建Kafka生产者并使用之前设置的配置属性
try (Producer<String, String> producer = new KafkaProducer<>(kaProperties)) {

// 创建一个ProducerRecord对象,指定主题和消息内容
ProducerRecord<String, String> producerRecord =
new ProducerRecord<>("kinaction_helloworld", "hello world again!");

// 发送消息
producer.send(producerRecord);

}

}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.List;
import java.util.Properties;

public class HelloWorldConsumer {

final static Logger log = LoggerFactory.getLogger(HelloWorldConsumer.class);

private volatile boolean keepConsuming = true;

public static void main(String[] args) {
// 创建Properties对象来存储Kafka消费者的配置属性
Properties kaProperties = new Properties(); //<1>

// 设置Kafka服务器的地址
kaProperties.put("bootstrap.servers", "192.168.31.230:9092");


// 设置消费者组的ID,同一组内的消费者会竞争消费分区
kaProperties.put("group.id", "kinaction_helloconsumer");

// 启用自动提交偏移量,表示消费者会自动向Kafka报告它读取到的最后一条消息的位置
kaProperties.put("enable.auto.commit", "true");

// 设置自动提交偏移量的时间间隔,这里设置为每1000毫秒提交一次
kaProperties.put("auto.commit.interval.ms", "1000");

// 设置键的反序列化器为字符串类型
kaProperties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

// 设置值的反序列化器为字符串类型
kaProperties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");


// 创建HelloWorldConsumer实例
HelloWorldConsumer helloWorldConsumer = new HelloWorldConsumer();

// 使用配置属性开始消费Kafka主题的消息
helloWorldConsumer.consume(kaProperties);

// 添加关闭钩子,当JVM关闭时,调用HelloWorldConsumer的shutdown方法进行清理
Runtime.getRuntime().addShutdownHook(new Thread(helloWorldConsumer::shutdown));
}

private void consume(Properties kaProperties) {
// 创建Kafka消费者并使用之前设置的配置属性
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(kaProperties)) {

// 订阅指定的主题
consumer.subscribe(List.of("kinaction_helloworld")); //<2>

// 当继续消费标志为真时循环执行
while (keepConsuming) {

// 拉取消息,最长等待250毫秒
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(250)); //<3>

// 遍历接收到的消息记录
for (ConsumerRecord<String, String> record : records) { //<4>

// 打印消息的偏移量和值
log.info("kinaction_info offset = {}, kinaction_value = {}", record.offset(), record.value());
}
}
}

}

private void shutdown() {
keepConsuming = false;
}
}