@Configuration public class RabbitMQConfig { private static final String EXCHANGE_NAME = "delay_exchange"; private static final String QUEUE_NAME = "delay_queue"; @Bean public DirectExchange exchange() { return new DirectExchange(EXCHANGE_NAME); } @Bean public Queue queue() { Map<String, Object> arguments = new HashMap<>(); arguments.put("x-delayed-type", "direct"); return new Queue(QUEUE_NAME, true, false, false, arguments); } @Bean public Binding binding(Queue queue, DirectExchange exchange) { return BindingBuilder.bind(queue).to(exchange).with(QUEUE_NAME); } @Bean public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { RabbitTemplate template = new RabbitTemplate(connectionFactory); template.setMessageConverter(new Jackson2JsonMessageConverter()); return template; } } @Service public class DelayMessageProducer { private static final String EXCHANGE_NAME = "delay_exchange"; private static final String QUEUE_NAME = "delay_queue"; @Autowired private RabbitTemplate rabbitTemplate; public void sendDelayedMessage(String message, long delayMillis) { rabbitTemplate.convertAndSend(EXCHANGE_NAME, QUEUE_NAME, message, messagePostProcessor -> { messagePostProcessor.getMessageProperties().setDelay((int) delayMillis); return messagePostProcessor; }); } } @Service public class DelayMessageConsumer { private static final String QUEUE_NAME = "delay_queue"; @RabbitListener(queues = QUEUE_NAME) public void receiveMessage(String message) { System.out.println("Received message: " + message); } }


上一篇:
下一篇:
切换中文