1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71
| import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.serialization.StringSerializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Properties; import java.util.concurrent.ExecutionException;
public class OrderProducer { private static final Logger log = LoggerFactory.getLogger(OrderProducer.class);
public static void main(String[] args) throws InterruptedException, ExecutionException { Properties kaProperties = new Properties(); kaProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.31.230:9092"); kaProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); kaProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName());
kaProperties.put(ProducerConfig.ACKS_CONFIG, "all"); kaProperties.put(ProducerConfig.RETRIES_CONFIG, "3");
kaProperties.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1");
try (Producer<String, Order> producer = new KafkaProducer<>(kaProperties)) { Order order = new Order("orders", System.currentTimeMillis(), "Laptop", 1, 999.99);
ProducerRecord<String, Order> producerRecord = new ProducerRecord<>("orders", null, order); RecordMetadata result = producer.send(producerRecord).get(); log.info("offset = {}, topic = {}, timestamp = {}", result.offset(), result.topic(), result.timestamp()); } catch (Exception e) { log.error("Error sending message", e); } }
public static class JsonSerializer implements org.apache.kafka.common.serialization.Serializer<Order> { private final ObjectMapper objectMapper = new ObjectMapper();
@Override public byte[] serialize(String topic, Order data) { try { return objectMapper.writeValueAsBytes(data); } catch (Exception e) { log.error("Error serializing JSON message", e); throw new RuntimeException("Error serializing JSON message", e); } } } }
|