Kafka生产者关键配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com.lixiang.ex00200</groupId>
<artifactId>ex00200</artifactId>
<version>1.0-SNAPSHOT</version>

<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<!-- Kafka客户端库,用于与Kafka集群进行通信 -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.9.0</version>
</dependency>

<!-- Logback日志框架,用于记录应用程序的日志 -->
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.10</version>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.13.0</version>
</dependency>
</dependencies>
</project>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
public class Order {
private String orderId;
private long timestamp;
private String product;
private int quantity;
private double price;

// 无参构造函数
public Order() {}

// 全参构造函数
public Order(String orderId, long timestamp, String product, int quantity, double price) {
this.orderId = orderId;
this.timestamp = timestamp;
this.product = product;
this.quantity = quantity;
this.price = price;
}

// Getter 和 Setter 方法
public String getOrderId() {
return orderId;
}

public void setOrderId(String orderId) {
this.orderId = orderId;
}

public long getTimestamp() {
return timestamp;
}

public void setTimestamp(long timestamp) {
this.timestamp = timestamp;
}

public String getProduct() {
return product;
}

public void setProduct(String product) {
this.product = product;
}

public int getQuantity() {
return quantity;
}

public void setQuantity(int quantity) {
this.quantity = quantity;
}

public double getPrice() {
return price;
}

public void setPrice(double price) {
this.price = price;
}

// toString 方法,方便打印对象信息
@Override
public String toString() {
return "Order{" +
"orderId='" + orderId + '\'' +
", timestamp=" + timestamp +
", product='" + product + '\'' +
", quantity=" + quantity +
", price=" + price +
'}';
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
public class Order {
private String orderId;
private long timestamp;
private String product;
private int quantity;
private double price;

// 无参构造函数
public Order() {}

// 全参构造函数
public Order(String orderId, long timestamp, String product, int quantity, double price) {
this.orderId = orderId;
this.timestamp = timestamp;
this.product = product;
this.quantity = quantity;
this.price = price;
}

// Getter 和 Setter 方法
public String getOrderId() {
return orderId;
}

public void setOrderId(String orderId) {
this.orderId = orderId;
}

public long getTimestamp() {
return timestamp;
}

public void setTimestamp(long timestamp) {
this.timestamp = timestamp;
}

public String getProduct() {
return product;
}

public void setProduct(String product) {
this.product = product;
}

public int getQuantity() {
return quantity;
}

public void setQuantity(int quantity) {
this.quantity = quantity;
}

public double getPrice() {
return price;
}

public void setPrice(double price) {
this.price = price;
}

// toString 方法,方便打印对象信息
@Override
public String toString() {
return "Order{" +
"orderId='" + orderId + '\'' +
", timestamp=" + timestamp +
", product='" + product + '\'' +
", quantity=" + quantity +
", price=" + price +
'}';
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class OrderProducer {
private static final Logger log = LoggerFactory.getLogger(OrderProducer.class);

public static void main(String[] args) throws InterruptedException, ExecutionException {
Properties kaProperties = new Properties();
//它用于指定初始连接的服务器地址列表。
kaProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.31.230:9092"); // 指定Kafka集群地址
kaProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // 设置键序列化类
kaProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName()); // 设置值序列化类
/*
acks配置掌控着生产者消息发送的确认策略。
acks = 0时,生产者发完即走,速度最快但可能丢消息;
acks = 1,会等主副本确认,有一定可靠性;
acks = -1/all则需所有同步副本确认,可靠性高但性能损耗大。
*/
kaProperties.put(ProducerConfig.ACKS_CONFIG, "all"); // 配置消息确认模式为所有副本确认
kaProperties.put(ProducerConfig.RETRIES_CONFIG, "3"); // 设置消息发送失败重试次数
/*
MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION是 Kafka 生产者配置参数。
它用于限制每个连接中可以同时发送但尚未收到响应的请求数量。
如果设置为 1,能保证消息按顺序写入分区。
在启用幂等性或事务时,这个值强制设为 1,有助于确保消息的顺序和事务的一致性。
如果大于 1,可能出现乱序写入,但能提升吞吐量。
*/
kaProperties.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1"); // 限制每个连接的未完成请求数量

try (Producer<String, Order> producer = new KafkaProducer<>(kaProperties)) {
Order order = new Order("orders", System.currentTimeMillis(), "Laptop", 1, 999.99);
/*
ProducerRecord 是 Kafka 生产者发送消息的重要组件。
它用于指定消息发送的目标主题,这决定了消息的基本流向。
可以设定分区,能精准控制消息存储分区,若不指定,Kafka 会按策略分配。
其中的键能够用于分区分配,确保相关消息在同一分区。
时间戳记录消息产生时间,方便后续时间相关处理。
*/
ProducerRecord<String, Order> producerRecord = new ProducerRecord<>("orders", null, order);
// 发送ProducerRecord对象,并同步获取发送结果
RecordMetadata result = producer.send(producerRecord).get();
log.info("offset = {}, topic = {}, timestamp = {}", result.offset(), result.topic(), result.timestamp());
} catch (Exception e) {
log.error("Error sending message", e);
}
}


public static class JsonSerializer implements org.apache.kafka.common.serialization.Serializer<Order> {
private final ObjectMapper objectMapper = new ObjectMapper();

@Override
public byte[] serialize(String topic, Order data) {
try {
return objectMapper.writeValueAsBytes(data);
} catch (Exception e) {
log.error("Error serializing JSON message", e);
throw new RuntimeException("Error serializing JSON message", e);
}
}
}
}