Gitlib Gitlib
首页
  • 分类
  • 标签
  • 归档
  • Golang开发实践万字总结
  • MySQL核心知识汇总
  • Redis实践总结
  • MQ实践万字总结
  • Docker数据持久化总结
  • Docker网络模式深度解读
  • 常用游戏反外挂技术总结
  • 读书笔记
  • 心情杂货
  • 行业杂谈
  • 友情链接
关于我
GitHub (opens new window)

Ravior

以梦为马,莫负韶华
首页
  • 分类
  • 标签
  • 归档
  • Golang开发实践万字总结
  • MySQL核心知识汇总
  • Redis实践总结
  • MQ实践万字总结
  • Docker数据持久化总结
  • Docker网络模式深度解读
  • 常用游戏反外挂技术总结
  • 读书笔记
  • 心情杂货
  • 行业杂谈
  • 友情链接
关于我
GitHub (opens new window)
  • 基础架构

  • MQ

    • RabbitMQ

      • RabbitMQ开发环境搭建
      • RabbitMQ集群实践
      • RabbitMQ如何保证消息不丢失
      • 基于RabbitMQ消息延时队列方案
        • TTL
        • DLX
        • 死信处理过程
        • 死信队列设置
        • 定时任务
        • 示例
    • Kafka

    • MQ万字总结
  • 微服务

  • 分布式

  • 高并发

  • 大数据

  • 容器化

  • 架构设计
  • MQ
  • RabbitMQ
Ravior
2018-03-12
目录

基于RabbitMQ消息延时队列方案

RabbitMQ延迟队列,主要是借助消息的TTL(Time to Live)和死信exchange(Dead Letter Exchanges)来实现。

# TTL

TTL是time to live 的简称,顾名思义指的是消息的存活时间。rabbitMq可以从两种维度设置消息过期时间,分别是队列和消息本身。

队列消息过期时间-Per-Queue Message TTL: 通过设置队列的x-message-ttl参数来设置指定队列上消息的存活时间,其值是一个非负整数,单位为微秒。不同队列的过期时间互相之间没有影响,即使是对于同一条消息。队列中的消息存在队列中的时间超过过期时间则成为死信。

# DLX

RabbitMQ中有一种交换器叫DLX,全称为 Dead-Letter-Exchange,可以称之为死信交换器。当消息在一个队列中变成死信(dead message)之后,它会被重新发送到另外一个交换器中,这个交换器就是 DLX,绑定在 DLX 上的队列就称之为死信队列。

队列中的消息在以下三种情况下会变成死信:

  • 消息被拒绝(basic.reject 或者 basic.nack),并且requeue=false;
  • 消息的过期时间到期了;
  • 队列长度限制超过了;

当队列中的消息成为死信以后,如果队列设置了DLX那么消息会被发送到DLX。通过x-dead-letter-exchange设置DLX,通过这个x-dead-letter-routing-key设置消息发送到DLX所用的routing-key,如果不设置默认使用消息本身的routing-key。

死信交换器可以在程序中设置,也可以使用rabbitmqctl工具进行设置.关于死信交换器的介绍请参考RabbitMQ官网https://www.rabbitmq.com/dlx.html.

# 死信处理过程

死信队列

  • DLX也是一个正常的Exchange,和一般的Exchange没有区别,它能在任何的队列上被指定,实际上就是设置某个队列的属性。
  • 当这个队列中有死信时,RabbitMQ就会自动的将这个消息重新发布到设置的Exchange上去,进而被路由到另一个队列。
  • 可以监听这个队列中的消息做相应的处理。

# 死信队列设置

  1. 首先需要设置死信队列的exchange和queue,然后进行绑定:
Exchange: dlx.exchange
Queue: dlx.queue
RoutingKey: #
#表示只要有消息到达了Exchange,那么都会路由到这个queue上
1
2
3
4
  1. 然后需要有一个监听,去监听这个队列进行处理
  2. 然后我们进行正常声明交换机、队列、绑定,只不过我们需要在队列加上一个参数即可:arguments.put(" x-dead-letter-exchange","dlx.exchange");,这样消息在过期、requeue、 队列在达到最大长度时,消息就可以直接路由到死信队列!

# 定时任务

死信队列

因为队列中的消息过期后会成为死信,而死信又会被发布到该消息所在的队列的 DLX 上去,所以通过为消息设置过期时间,然后再消费该消息所在队列的 DLX 所绑定的队列,从而来达到定时处理一个任务的目的。 简单的讲就是当有一个队列 queue1,其 DLX 为 deadEx1,deadEx1 绑定了一个队列 deadQueue1,当队列 queue1 中有一条消息因过期成为死信时,就会被发布到 deadEx1 中去,通过消费队列 deadQueue1 中的消息,也就相当于消费的是 queue1 中的因过期产生的死信消息

# 示例

dlx.publisher.php:

<?php

// 死信队列测试


//配置信息 
$conn_args = array( 
    'host' => '127.0.0.1',  
    'port' => '8101',  
    'login' => 'guest',  
    'password' => 'guest', 
    'vhost'=>'/' 
); 

$e_name = 'test_cache_exchange'; //交换机名 
$q_name = 'test_cache_queue'; //队列名 
$k_route = 'test_cache_route'; //路由key 

$conn = new AMQPConnection($conn_args);   
if (!$conn->connect()) {   
    die("Cannot connect to the broker!\n");   
}   

// 创建信道
$channel = new AMQPChannel($conn); 

// 创建交换机    
$ex = new AMQPExchange($channel);   
$ex->setName($e_name); 
$ex->setType(AMQP_EX_TYPE_DIRECT); //direct类型  
$ex->setFlags(AMQP_DURABLE); //持久化 
echo "Exchange Status:".$ex->declare()."\n";  

// 创建队列    
$q = new AMQPQueue($channel); 
$q->setName($q_name);   
$q->setFlags(AMQP_DURABLE); //持久化  
// 关键的地方
$q->setArguments(array(
        'x-dead-letter-exchange' => 'delay_exchange',
        'x-dead-letter-routing-key' => 'delay_route',
        'x-message-ttl' => 60000,
));
echo "Message Total:".$q->declare()."\n"; 

// 绑定交换机与队列,并指定路由键 
echo 'Queue Bind: '.$q->bind($e_name, $k_route)."\n";


date_default_timezone_set("Asia/Shanghai");
// 发送消息 
// $channel->startTransaction(); // 开始事务  
for($i=0; $i<10; ++$i){ 
    sleep(1);//休眠1秒
    //消息内容 
    $message = "TEST MESSAGE!".date("h:i:sa");   
    // echo "Send Message:".$ex->publish($message, $k_route)."\n";  
    echo "Send Message:".$ex->publish($message, $k_route, AMQP_NOPARAM, array('delivery_mode' => 2))."\n";  
    // echo $channel->waitForConfirms(),"\n";
} 
// $channel->commitTransaction(); // 提交事务 

$conn->disconnect(); 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63

dlx.consumer.php:

<?php


// 死信队列测试


//配置信息 
$conn_args = array( 
    'host' => '127.0.0.1',  
    'port' => '8101',  
    'login' => 'guest',  
    'password' => 'guest', 
    'vhost'=>'/' 
); 

$e_name = 'delay_exchange'; //交换机名 
$q_name = 'delay_queue'; //队列名 
$k_route = 'delay_route'; //路由key 


$conn = new AMQPConnection($conn_args);   
if (!$conn->connect()) {   
    die("Cannot connect to the broker!\n");   
}   

// 创建信道
$channel = new AMQPChannel($conn); 


// 创建交换机    
$ex = new AMQPExchange($channel);   
$ex->setName($e_name); 
$ex->setType(AMQP_EX_TYPE_DIRECT); //direct类型  
$ex->setFlags(AMQP_DURABLE); //持久化 
echo "Exchange Status:".$ex->declare()."\n";  


// 创建队列    
$q = new AMQPQueue($channel); 
$q->setName($q_name);   
$q->setFlags(AMQP_DURABLE); //持久化  
echo "Message Total:".$q->declare()."\n"; 


/**
 * 消费回调函数
 * 处理消息
 */ 
$processMessage = function($envelope, $queue) { 
    $msg = $envelope->getBody(); 
    echo '获取到消息';
    echo $msg."\n"; //处理消息 
    $queue->ack($envelope->getDeliveryTag()); //手动发送ACK应答 
}; 

// 绑定交换机与队列,并指定路由键 
echo 'Queue Bind: '.$q->bind($e_name, $k_route)."\n";


$q->consume($processMessage);   
 
$conn->disconnect(); 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62

dlx

#RabbitMQ#MQ
上次更新: 2022/12/01, 11:09:34
RabbitMQ如何保证消息不丢失
Kafka基础入门

← RabbitMQ如何保证消息不丢失 Kafka基础入门→

最近更新
01
常用游戏反外挂技术总结
11-27
02
Golang开发实践万字总结
11-11
03
Redis万字总结
10-30
更多文章>
Theme by Vdoing | Copyright © 2011-2022 Ravior | 粤ICP备17060229号-3 | MIT License
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式