1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57
| import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.kafka.clients.producer.*; import org.apache.kafka.common.serialization.StringSerializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Properties; import java.util.concurrent.ExecutionException;
public class OrderProducer { private static final Logger log = LoggerFactory.getLogger(OrderProducer.class);
public static void main(String[] args) throws InterruptedException, ExecutionException { Properties kaProperties = new Properties(); kaProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.31.230:9092"); kaProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); kaProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName());
kaProperties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, RoundRobinPartitioner.class.getName()); kaProperties.put(ProducerConfig.ACKS_CONFIG, "all"); kaProperties.put(ProducerConfig.RETRIES_CONFIG, "3"); kaProperties.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1");
try (Producer<String, Order> producer = new KafkaProducer<>(kaProperties)) { for(int i = 0; i < 10; i++) { Order order = new Order("o" + i, System.currentTimeMillis(), "Laptop", 1, 999.99);
ProducerRecord<String, Order> producerRecord = new ProducerRecord<>("porders", null, order); RecordMetadata result = producer.send(producerRecord).get(); log.info("topic={}, partition={}, offset = {}, timestamp = {}", result.topic(), result.partition(), result.offset(), result.timestamp()); } } catch (Exception e) { log.error("Error sending message", e); } }
public static class JsonSerializer implements org.apache.kafka.common.serialization.Serializer<Order> { private final ObjectMapper objectMapper = new ObjectMapper();
@Override public byte[] serialize(String topic, Order data) { try { return objectMapper.writeValueAsBytes(data); } catch (Exception e) { log.error("Error serializing JSON message", e); throw new RuntimeException("Error serializing JSON message", e); } } } }
|