生产者同步与异步发送消息

一、同步发送

基本原理
当生产者使用同步方式发送消息时,它会等待Kafka集群的响应,直到消息被成功写入目标分区(topic中的分区)或者发送过程中出现不可恢复的错误。在代码实现中,通常会使用send()方法发送消息,并且通过get()方法获取发送结果。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class OrderSyncProducer {
private static final Logger log = LoggerFactory.getLogger(OrderSyncProducer.class);

public static void main(String[] args) {
...

try (Producer<String, Order> producer = new KafkaProducer<>(kaProperties)) {
Order order = new Order("O-8817-88172", System.currentTimeMillis(), "Laptop", 1, 999.99);
ProducerRecord<String, Order> producerRecord = new ProducerRecord<>("orders", order.getOrderId(), order);
RecordMetadata result = producer.send(producerRecord).get();
log.info("topic={}, partition={}, offset = {}, timestamp = {}", result.topic(), result.partition(), result.offset(), result.timestamp());

} catch (Exception e) {
log.error("Error sending message", e);
}
}
}

优点
● 可靠性高:能够确保消息被成功写入Kafka集群后才继续执行后续操作。这对于一些对数据准确性要求极高的场景非常重要,比如金融交易数据的发送,每一笔交易消息都必须被正确记录到Kafka中,否则可能会导致严重的后果。
● 顺序性保证:在单线程环境下,同步发送可以很好地保证消息发送的顺序与发送代码中定义的顺序一致。例如,如果有一系列按时间顺序生成的订单消息,同步发送可以确保这些订单消息按照生成的顺序依次写入Kafka分区。
缺点
● 性能较低:因为每发送一条消息都要等待响应,所以生产者的吞吐量会受到限制。尤其是在网络延迟较高或者Kafka集群负载较大的情况下,等待响应的时间会很长,导致整体的消息发送效率低下。
● 容易造成阻塞:如果Kafka集群出现问题或者网络故障,同步发送可能会导致生产者线程长时间阻塞,进而影响整个应用程序的性能和稳定性。

二、异步发送

基本原理
异步发送允许生产者在发送消息后不需要等待Kafka集群的响应就可以继续执行其他操作。生产者将消息发送到一个内部的缓冲队列中,然后由后台线程负责从缓冲队列中取出消息并发送到Kafka集群。当消息成功发送或者发送失败时,可以通过回调函数(callback)来处理相应的结果。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class OrderAsyncProducer {
private static final Logger log = LoggerFactory.getLogger(OrderAsyncProducer.class);

public static void main(String[] args) {
...
try (Producer<String, Order> producer = new KafkaProducer<>(kaProperties)) {
Order order = new Order("O-8817-88172", System.currentTimeMillis(), "Laptop", 1, 999.99);
ProducerRecord<String, Order> producerRecord = new ProducerRecord<>("orders", order.getOrderId(), order);

producer.send(producerRecord, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception == null) {
log.info("topic={}, partition={}, offset = {}, timestamp = {}", metadata.topic(), metadata.partition(), metadata.offset(), metadata.timestamp());
} else {
log.error("Error sending message", exception);
}
}
});
} catch (Exception e) {
log.error("Error sending message", e);
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}

优点
● 高性能:由于不需要等待每条消息的发送响应,生产者可以持续地将消息放入缓冲队列,大大提高了消息发送的吞吐量。这在需要处理大量消息的场景中非常有用,比如日志收集系统,每秒可能会产生数以万计的日志消息,异步发送可以快速地将这些消息发送到Kafka集群。
● 非阻塞:不会因为等待Kafka集群的响应而阻塞生产者线程,使得生产者能够同时处理其他任务,如消息的生成、数据的处理等,提高了整个应用程序的响应能力。
缺点
● 可靠性相对较低:虽然通过回调函数可以处理发送结果,但是如果在消息还未发送完成(在缓冲队列中或者正在发送途中)时,生产者进程意外终止,可能会导致部分消息丢失。