基于RabbitMQ消息延时队列方案

RabbitMQ延迟队列,主要是借助消息的TTL(Time to Live)和死信exchange(Dead Letter Exchanges)来实现。

TTL

TTL是time to live 的简称,顾名思义指的是消息的存活时间。rabbitMq可以从两种维度设置消息过期时间,分别是队列和消息本身。

队列消息过期时间-Per-Queue Message TTL: 通过设置队列的x-message-ttl参数来设置指定队列上消息的存活时间,其值是一个非负整数,单位为微秒。不同队列的过期时间互相之间没有影响,即使是对于同一条消息。队列中的消息存在队列中的时间超过过期时间则成为死信。

DLX

RabbitMQ中有一种交换器叫DLX,全称为 Dead-Letter-Exchange,可以称之为死信交换器。当消息在一个队列中变成死信(dead message)之后,它会被重新发送到另外一个交换器中,这个交换器就是 DLX,绑定在 DLX 上的队列就称之为死信队列。

队列中的消息在以下三种情况下会变成死信:

  • 消息被拒绝(basic.reject 或者 basic.nack),并且requeue=false;
  • 消息的过期时间到期了;
  • 队列长度限制超过了;

当队列中的消息成为死信以后,如果队列设置了DLX那么消息会被发送到DLX。通过x-dead-letter-exchange设置DLX,通过这个x-dead-letter-routing-key设置消息发送到DLX所用的routing-key,如果不设置默认使用消息本身的routing-key。

死信交换器可以在程序中设置,也可以使用rabbitmqctl工具进行设置.关于死信交换器的介绍请参考RabbitMQ官网https://www.rabbitmq.com/dlx.html.

死信处理过程

死信队列

  • DLX也是一个正常的Exchange,和一般的Exchange没有区别,它能在任何的队列上被指定,实际上就是设置某个队列的属性。
  • 当这个队列中有死信时,RabbitMQ就会自动的将这个消息重新发布到设置的Exchange上去,进而被路由到另一个队列。
  • 可以监听这个队列中的消息做相应的处理。

死信队列设置

  1. 首先需要设置死信队列的exchange和queue,然后进行绑定:
1
2
3
4
Exchange: dlx.exchange
Queue: dlx.queue
RoutingKey: #
#表示只要有消息到达了Exchange,那么都会路由到这个queue上
  1. 然后需要有一个监听,去监听这个队列进行处理
  2. 然后我们进行正常声明交换机、队列、绑定,只不过我们需要在队列加上一个参数即可:arguments.put(" x-dead-letter-exchange","dlx.exchange");,这样消息在过期、requeue、 队列在达到最大长度时,消息就可以直接路由到死信队列!

定时任务

死信队列

因为队列中的消息过期后会成为死信,而死信又会被发布到该消息所在的队列的 DLX 上去,所以通过为消息设置过期时间,然后再消费该消息所在队列的 DLX 所绑定的队列,从而来达到定时处理一个任务的目的。 简单的讲就是当有一个队列 queue1,其 DLX 为 deadEx1,deadEx1 绑定了一个队列 deadQueue1,当队列 queue1 中有一条消息因过期成为死信时,就会被发布到 deadEx1 中去,通过消费队列 deadQueue1 中的消息,也就相当于消费的是 queue1 中的因过期产生的死信消息

示例

dlx.publisher.php:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
<?php

// 死信队列测试


//配置信息
$conn_args = array(
'host' => '127.0.0.1',
'port' => '8101',
'login' => 'guest',
'password' => 'guest',
'vhost'=>'/'
);

$e_name = 'test_cache_exchange'; //交换机名
$q_name = 'test_cache_queue'; //队列名
$k_route = 'test_cache_route'; //路由key

$conn = new AMQPConnection($conn_args);
if (!$conn->connect()) {
die("Cannot connect to the broker!\n");
}

// 创建信道
$channel = new AMQPChannel($conn);

// 创建交换机
$ex = new AMQPExchange($channel);
$ex->setName($e_name);
$ex->setType(AMQP_EX_TYPE_DIRECT); //direct类型
$ex->setFlags(AMQP_DURABLE); //持久化
echo "Exchange Status:".$ex->declare()."\n";

// 创建队列
$q = new AMQPQueue($channel);
$q->setName($q_name);
$q->setFlags(AMQP_DURABLE); //持久化
// 关键的地方
$q->setArguments(array(
'x-dead-letter-exchange' => 'delay_exchange',
'x-dead-letter-routing-key' => 'delay_route',
'x-message-ttl' => 60000,
));
echo "Message Total:".$q->declare()."\n";

// 绑定交换机与队列,并指定路由键
echo 'Queue Bind: '.$q->bind($e_name, $k_route)."\n";


date_default_timezone_set("Asia/Shanghai");
// 发送消息
// $channel->startTransaction(); // 开始事务
for($i=0; $i<10; ++$i){
sleep(1);//休眠1秒
//消息内容
$message = "TEST MESSAGE!".date("h:i:sa");
// echo "Send Message:".$ex->publish($message, $k_route)."\n";
echo "Send Message:".$ex->publish($message, $k_route, AMQP_NOPARAM, array('delivery_mode' => 2))."\n";
// echo $channel->waitForConfirms(),"\n";
}
// $channel->commitTransaction(); // 提交事务

$conn->disconnect();

dlx.consumer.php:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
<?php


// 死信队列测试


//配置信息
$conn_args = array(
'host' => '127.0.0.1',
'port' => '8101',
'login' => 'guest',
'password' => 'guest',
'vhost'=>'/'
);

$e_name = 'delay_exchange'; //交换机名
$q_name = 'delay_queue'; //队列名
$k_route = 'delay_route'; //路由key


$conn = new AMQPConnection($conn_args);
if (!$conn->connect()) {
die("Cannot connect to the broker!\n");
}

// 创建信道
$channel = new AMQPChannel($conn);


// 创建交换机
$ex = new AMQPExchange($channel);
$ex->setName($e_name);
$ex->setType(AMQP_EX_TYPE_DIRECT); //direct类型
$ex->setFlags(AMQP_DURABLE); //持久化
echo "Exchange Status:".$ex->declare()."\n";


// 创建队列
$q = new AMQPQueue($channel);
$q->setName($q_name);
$q->setFlags(AMQP_DURABLE); //持久化
echo "Message Total:".$q->declare()."\n";


/**
* 消费回调函数
* 处理消息
*/
$processMessage = function($envelope, $queue) {
$msg = $envelope->getBody();
echo '获取到消息';
echo $msg."\n"; //处理消息
$queue->ack($envelope->getDeliveryTag()); //手动发送ACK应答
};

// 绑定交换机与队列,并指定路由键
echo 'Queue Bind: '.$q->bind($e_name, $k_route)."\n";


$q->consume($processMessage);

$conn->disconnect();

dlx

有用就打赏一下作者吧!