这里介绍关于RabbitMQ下有关延迟交换机的用途,若何使用、如何安装等。
延迟交换机是RabbitMQ的一个插件,通过这个插件,用户可以声明一个带x-delayed-message类型的交换机,这歌交换机可以发布一类带有x-delay 自定义头的消息,这类消息会在定义的毫秒数后在把消息从交换机发布到队列,也就是区别于传统消息,延迟交换机中的消息,不会立即发送给交换机规则对应的队列,而是在x-delay 所对应的毫秒数到后才发送给消息队列,因此被称为延迟交换机。
在一些需要延迟发送的消息的场景,比如定时发送邮件或者短信给用户,如果用户订阅一个抽奖活动开始短信通知,在活动开始前2天、1天、2小时通知该用户,由于活动开始的时间是不一定的,传统定时器轮训的方式未必准备而且当有大量用户和数据的情况下难以保证性能,因此可以采用延迟交换机去实现,首先在创建活动的时候,先计算需要发信息的是在具体的创建时间后的毫秒数到延迟交换机,等待然后消费者监听对应的队列,在收到消息后查询需要发送的用户去发送消短信或者推送通知就可以了。
基本要求: rabbitmq 3.8以上 Erlang 23.2以上,这里以RabbitMq 3.8.17作为目标
系统: Linux
下载地址:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange 下的release选择版本
具体步骤:
rabbitmq-plugins directories #列出插件的目录
结果如下:
Listing plugin directories used by node rabbit@c0e11fd16694
Plugin archives directory: /opt/rabbitmq/plugins
Plugin expansion directory: /var/lib/rabbitmq/mnesia/rabbit@c0e11fd16694-plugins-expand
Enabled plugins file: /etc/rabbitmq/enabled_plugins
所以可以将对应版本下载到/opt/rabbitmq/plugins 或者/var/lib/rabbitmq/mnesia/rabbit@c0e11fd16694-plugins-expand下,为ez文件
然后启用就可以了
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-delayed-type", "direct");
channel.exchangeDeclare("my-exchange", "x-delayed-message", true, false, args);
创建和发送
// ... elided code ...
byte[] messageBodyBytes = "delayed payload".getBytes("UTF-8");
Map<String, Object> headers = new HashMap<String, Object>();
headers.put("x-delay", 5000);
AMQP.BasicProperties.Builder props = new AMQP.BasicProperties.Builder().headers(headers);
channel.basicPublish("my-exchange", "", props.build(), messageBodyBytes);
byte[] messageBodyBytes2 = "more delayed payload".getBytes("UTF-8");
Map<String, Object> headers2 = new HashMap<String, Object>();
headers2.put("x-delay", 1000);
AMQP.BasicProperties.Builder props2 = new AMQP.BasicProperties.Builder().headers(headers2);
channel.basicPublish("my-exchange", "", props2.build(), messageBodyBytes2);
本文为Lokie.Wang原创文章,转载无需和我联系,但请注明来自lokie博客http://lokie.wang