1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102
| package com.lixiang.rabbitmq;
import java.time.LocalDate; import java.util.Random;
import com.lixiang.rabbitmq.utils.RabbitConstant; import com.lixiang.rabbitmq.utils.RabbitUtils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection;
import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j;
@Slf4j public class WeatherProducer { static Random random = new Random(); static String[] cities = { "Beijing", "New York", "Tokyo", "Paris", "London" }; static String[] countries = { "China", "USA", "Japan", "France", "UK" }; static String[] weathers = { "Sunny", "Rainy", "Cloudy", "Snowy" };
private static String createJSON(int idx) { String weather = weathers[random.nextInt(weathers.length)]; int maxTemp, minTemp; switch (weather) { case "Sunny": maxTemp = random.nextInt(11) + 25; minTemp = maxTemp - 10; break; case "Rainy": maxTemp = random.nextInt(6) + 20; minTemp = random.nextInt(10) + 10; break; case "Cloudy": maxTemp = random.nextInt(9) + 20; minTemp = random.nextInt(8) + 15; break; default: maxTemp = random.nextInt(6) - 5; minTemp = random.nextInt(6) - 10; }
String json = String.format( "{\"date\":\"%s\", \"country\":\"%s\", \"city\":\"%s\", \"weather\":\"%s\", " + "\"minTemp\":%d, \"maxTemp\":%d}", LocalDate.now(), countries[idx], cities[idx], weather, minTemp, maxTemp); return json; }
@SneakyThrows public static void main(String[] args) { try (Connection connection = RabbitUtils.getConnection(); Channel channel = connection.createChannel();) {
channel.confirmSelect(); channel.addConfirmListener((deliveryTag, multiple) -> { log.info("消息确认成功,编号:{},是否批量:{}", deliveryTag, multiple); }, (deliveryTag, multiple) -> { log.error("消息确认失败,编号:{},是否批量:{}", deliveryTag, multiple); });
channel.addReturnListener((returnMessage) -> { log.warn("消息无法路由! 应答码:{} 原因:{} 路由键:{}", returnMessage.getReplyCode(), returnMessage.getReplyText(), returnMessage.getRoutingKey()); });
channel.exchangeDeclare(RabbitConstant.EXCHANGE_WEATHER_TOPIC, "topic", true); channel.queueDeclare(RabbitConstant.QUEUE_BEIJING, true, false, false, null); channel.queueBind(RabbitConstant.QUEUE_BEIJING, RabbitConstant.EXCHANGE_WEATHER_TOPIC, "city.Beijing.#"); channel.queueDeclare(RabbitConstant.QUEUE_PARIS, true, false, false, null); channel.queueBind(RabbitConstant.QUEUE_PARIS, RabbitConstant.EXCHANGE_WEATHER_TOPIC, "city.Paris.*");
while (true) { int idx = random.nextInt(countries.length); String routingKey = "city." + cities[idx] + ".weather"; String json = createJSON(idx); log.info("路由键:{},天气信息:{}", routingKey, json);
channel.basicPublish(RabbitConstant.EXCHANGE_WEATHER_TOPIC, routingKey,
true, null, json.getBytes()); Thread.sleep(1000); } } } }
|