@Configuration
public class RabbitMQConfig {
private static final String EXCHANGE_NAME = "delay_exchange";
private static final String QUEUE_NAME = "delay_queue";
@Bean
public DirectExchange exchange() {
return new DirectExchange(EXCHANGE_NAME);
}
@Bean
public Queue queue() {
Map<String, Object> arguments = new HashMap<>();
arguments.put("x-delayed-type", "direct");
return new Queue(QUEUE_NAME, true, false, false, arguments);
}
@Bean
public Binding binding(Queue queue, DirectExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(QUEUE_NAME);
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
template.setMessageConverter(new Jackson2JsonMessageConverter());
return template;
}
}
@Service
public class DelayMessageProducer {
private static final String EXCHANGE_NAME = "delay_exchange";
private static final String QUEUE_NAME = "delay_queue";
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendDelayedMessage(String message, long delayMillis) {
rabbitTemplate.convertAndSend(EXCHANGE_NAME, QUEUE_NAME, message, messagePostProcessor -> {
messagePostProcessor.getMessageProperties().setDelay((int) delayMillis);
return messagePostProcessor;
});
}
}
@Service
public class DelayMessageConsumer {
private static final String QUEUE_NAME = "delay_queue";
@RabbitListener(queues = QUEUE_NAME)
public void receiveMessage(String message) {
System.out.println("Received message: " + message);
}
}