简单队列服务:HTTPSQS

HTTPSQS是国内著名软件工程师张宴的作品, 曾在金山有广泛应用, 常用来进行异步解耦操作,如发送手机短信、发送电子邮件等场景。

HTTPSQS(HTTP Simple Queue Service)是一款基于HTTP GET/POST协议的轻量级开源简单消息队列服务,使用Tokyo Cabinet的B+Tree Key/Value数据库来做数据的持久化存储。

HTTPSQS具有以下特征:

  • 非常简单,基于HTTP GET/POST 协议,支持HTTP协议的编程语言均可调用
  • 非常快速,入队列、出队列速度超过10000次/秒
  • 支持多队列
  • 单个队列支持的最大队列数量高达10亿条
  • 低内存消耗,海量数据存储,存储几十GB的数据只需不到100MB的物理内存缓冲区
  • 可以在不停止服务的情况下便捷地修改单个队列的最大队列数量
  • 可以实时查看队列状态(入队列位置、出队列位置、未读队列数量、最大队列数量)
  • 十分轻, 源代码不超过800行

httpsqs

安装

安装libevent

libvent官网:http://libevent.org/

1
2
3
4
5
6
7
8
9
10
11
# 下载安装包
wget https://github.com/libevent/libevent/releases/download/release-2.1.10-stable/libevent-2.1.10-stable.tar.gz
# 解压
tar zxvf libevent-2.1.10-stable.tar.gz
cd libevent-2.1.10-stable
# 配置编译参数
./configure --prefix=/usr/local/libevent/
# 编译
make
# 安装
sudo make install

安装tokyocabinet

tokyocabinet官网: https://fallabs.com/tokyocabinet/

1
2
3
4
5
6
7
8
9
10
# 下载安装包
wget https://fallabs.com/tokyocabinet/tokyocabinet-1.4.48.tar.gz
tar zxvf tokyocabinet-1.4.48.tar.gz
cd tokyocabinet
# 配置编译参数
./configure --prefix=/usr/local/tokyocabinet/
# 编译
make
# 安装
sudo make install

安装httpsqs

1
2
3
4
5
6
7
8
9
10
11
12
13
# 解压安装包
tar zxvf httpsqs-1.7.tar.gz
# 修改libevent和tokyocabinet的安装路径
vim MakeFile
# 编译
make
# 安装
sudo make install # 安装到了/usr/bin下

# 习惯将安装路径放在/usr/local目录下
sudo mkdir -p /usr/local/httpsqs/bin/
sudo mkdir -p /usr/local/httpsqs/tmp/
sudo mv /usr/bin/httpsqs /usr/local/httpsqs/bin/
  • 启动https: /usr/local/httpsqs/bin/httpsqs -d -p 端口号 -x 数据存放目录
  • 关闭httpsqs: 直接kill掉httpsqs进程

HTTPSQS管理脚本

个人比较习惯使用类似service start/stop的命令来管理各项服务,下面分享一个httpsqs管理脚本:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
#!/bin/bash 
# 快速启动httpsqs脚本
#
EXEC=/usr/local/httpsqs/bin/httsqs
PORT=1218
PIDFILE=/usr/local/httpsqs/tmp/httpsqs.pid
DATADIR=/usr/local/httpsqs/data

case "$1" in
start)
if [ -f $PIDFILE ]
then
echo "$PIDFILE exists, process is already running or crashed"
else
echo "Starting httpsqs ..."
ulimit -SHn 65535
$EXEC -d -p $PORT -x $DATADIR -i $PIDFILE
fi
;;
stop)
if [ ! -f $PIDFILE ]
then
echo "$PIDFILE does not exist, process is not running"
else
PID=$(cat $PIDFILE)
kill $PID
while [ -x /proc/${PID} ]
do
echo "Waiting for httpsqs to shutdown ..."
sleep 1
done
echo "httpsqs stopped"
fi
;;
*)
echo "Please use start or stop as first argument"
;;
esac

HTTPSQS的使用

入列

入列:将文本消息放入队列

以curl命令为例:

1
curl "http://host:port/?name=your_queue_name&opt=put&data=经过URL编码的文本消息&auth=mypass123"

返回数据:

  • HTTPSQS_PUT_OK: 入列成功
  • HTTPSQS_PUT_ERROR: 入列失败
  • HTTPSQS_PUT_END: 队列已满

出列

出列:从队列中取出文本消息

以curl命令为例:

1
curl "http://host:port/?charset=utf-8&name=your_queue_name&opt=get&auth=mypass123"

返回消息队列的内容给客户端, 如果没有未取出的消息队列,则返回:HTTPSQS_GET_END

查看队列状态

以curl命令为例:

1
curl "http://host:port/?name=your_queue_name&opt=status_json&auth=mypass123"

返回数据示例:

1
{"name":"xoyo","maxqueue":1000000,"putpos":45,"putlap":1,"getpos":6,"getlap":1,"unread":39}

HTTPSQS客户端

由于HTTPSQS是基于HTTP协议实现通信的,可以依据接口,自行编写library。本文分享基于PHP CI框架的HTTPSQS客户端类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
<?php if (!defined('BASEPATH')) exit('No direct script access allowed');
/*
----------------------------------------------------------------------------------------------------------------
HTTP Simple Queue Service - httpsqs client class for PHP v1.7.1

Author: Zhang Yan (http://blog.s135.com), E-mail: net@s135.com
This is free software, and you are welcome to modify and redistribute it under the New BSD License
----------------------------------------------------------------------------------------------------------------
Useage:
<?php
include_once("httpsqs_client.php");
$httpsqs = new httpsqs($httpsqs_host, $httpsqs_port, $httpsqs_auth, $httpsqs_charset);
$result = $httpsqs->put($queue_name, $queue_data); //1. PUT text message into a queue. If PUT successful, return boolean: true. If an error occurs, return boolean: false. If queue full, return text: HTTPSQS_PUT_END
$result = $httpsqs->get($queue_name); //2. GET text message from a queue. Return the queue contents. If an error occurs, return boolean: false. If there is no unread queue message, return text: HTTPSQS_GET_END
$result = $httpsqs->gets($queue_name); //3. GET text message and pos from a queue. Return example: array("pos" => 7, "data" => "text message"). If an error occurs, return boolean: false. If there is no unread queue message, return: array("pos" => 0, "data" => "HTTPSQS_GET_END")
$result = $httpsqs->status($queue_name); //4. View queue status
$result = $httpsqs->status_json($queue_name); //5. View queue status in json. Return example: {"name":"queue_name","maxqueue":5000000,"putpos":130,"putlap":1,"getpos":120,"getlap":1,"unread":10}
$result = $httpsqs->view($queue_name, $queue_pos); //6. View the contents of the specified queue pos (id). Return the contents of the specified queue pos.
$result = $httpsqs->reset($queue_name); //7. Reset the queue. If reset successful, return boolean: true. If an error occurs, return boolean: false
$result = $httpsqs->maxqueue($queue_name, $num); //8. Change the maximum queue length of per-queue. If change the maximum queue length successful, return boolean: true. If it be cancelled, return boolean: false
$result = $httpsqs->synctime($num); //9. Change the interval to sync updated contents to the disk. If change the interval successful, return boolean: true. If it be cancelled, return boolean: false
?>
----------------------------------------------------------------------------------------------------------------
*/

class Httpsqs
{

public $httpsqs_host;
public $httpsqs_port;
public $httpsqs_auth;
public $httpsqs_charset;
protected $_ci;

public function __construct($params=array()) {

$this->_ci = & get_instance();

$this->httpsqs_host = isset($params['host']) ? $params['host'] : $this->_ci->config->item('httpsqs_host');
$this->httpsqs_port = isset($params['port']) ? $params['port'] : $this->_ci->config->item('httpsqs_port');
$this->httpsqs_auth = isset($params['auth']) ? $params['auth'] : $this->_ci->config->item('httpsqs_auth');
$this->httpsqs_charset = isset($params['charset']) ? $params['charset'] : $this->_ci->config->item('httpsqs_charset');

return true;
}

public function http_get($query)
{
$socket = fsockopen($this->httpsqs_host, $this->httpsqs_port, $errno, $errstr, 5);
if (!$socket)
{
return false;
}
$header = '';
$pos_value = 0;
$host = $this->httpsqs_host;
$out = "GET ${query} HTTP/1.1\r\n";
$out .= "Host: ${host}\r\n";
$out .= "Connection: close\r\n";
$out .= "\r\n";
fwrite($socket, $out);
$line = trim(fgets($socket));
$header .= $line;
list($proto, $rcode, $result) = explode(" ", $line);
$len = -1;
while (($line = trim(fgets($socket))) != "")
{
$header .= $line;
if (strstr($line, "Content-Length:"))
{
list($cl, $len) = explode(" ", $line);

}
if (strstr($line, "Pos:"))
{
list($pos_key, $pos_value) = explode(" ", $line);
}
if (strstr($line, "Connection: close"))
{
$close = true;
}
}
if ($len < 0)
{
return false;
}

$body = fread($socket, $len);
$fread_times = 0;
while(strlen($body) < $len){
$body1 = fread($socket, $len);
$body .= $body1;
unset($body1);
if ($fread_times > 100) {
break;
}
$fread_times++;
}
//if ($close) fclose($socket);
fclose($socket);
$result_array["pos"] = (int)$pos_value;
$result_array["data"] = $body;
return $result_array;
}

public function http_post($query, $body)
{
$socket = fsockopen($this->httpsqs_host, $this->httpsqs_port, $errno, $errstr, 1);
if (!$socket)
{
return false;
}
$header = '';
$host = $this->httpsqs_host;
$out = "POST ${query} HTTP/1.1\r\n";
$out .= "Host: ${host}\r\n";
$out .= "Content-Length: " . strlen($body) . "\r\n";
$out .= "Connection: close\r\n";
$out .= "\r\n";
$out .= $body;
fwrite($socket, $out);
$line = trim(fgets($socket));
$header .= $line;
list($proto, $rcode, $result) = explode(" ", $line);
$len = -1;
while (($line = trim(fgets($socket))) != "")
{
$header .= $line;
if (strstr($line, "Content-Length:"))
{
list($cl, $len) = explode(" ", $line);
}
if (strstr($line, "Pos:"))
{
list($pos_key, $pos_value) = explode(" ", $line);
}
if (strstr($line, "Connection: close"))
{
$close = true;
}
}
if ($len < 0)
{
return false;
}
$body = @fread($socket, $len);
//if ($close) fclose($socket);
fclose($socket);
$result_array["pos"] = (int)$pos_value;
$result_array["data"] = $body;
return $result_array;
}

public function put($queue_name, $queue_data)
{
$result = $this->http_post("/?auth=".$this->httpsqs_auth."&charset=".$this->httpsqs_charset."&name=".$queue_name."&opt=put", $queue_data);
if ($result["data"] == "HTTPSQS_PUT_OK") {
return true;
} else if ($result["data"] == "HTTPSQS_PUT_END") {
return $result["data"];
}
return false;
}

public function get($queue_name)
{
$result = $this->http_get("/?auth=".$this->httpsqs_auth."&charset=".$this->httpsqs_charset."&name=".$queue_name."&opt=get");
if ($result == false || $result["data"] == "HTTPSQS_ERROR" || $result["data"] == false) {
return false;
}
return $result["data"];
}

public function gets($queue_name)
{
$result = $this->http_get("/?auth=".$this->httpsqs_auth."&charset=".$this->httpsqs_charset."&name=".$queue_name."&opt=get");
if ($result == false || $result["data"] == "HTTPSQS_ERROR" || $result["data"] == false) {
return false;
}
return $result;
}

public function status($queue_name)
{
$result = $this->http_get("/?auth=".$this->httpsqs_auth."&charset=".$this->httpsqs_charset."&name=".$queue_name."&opt=status");
if ($result == false || $result["data"] == "HTTPSQS_ERROR" || $result["data"] == false) {
return false;
}
return $result["data"];
}

public function view($queue_name, $queue_pos)
{
$result = $this->http_get("/?auth=".$this->httpsqs_auth."&charset=".$this->httpsqs_charset."&name=".$queue_name."&opt=view&pos=".$pos);
if ($result == false || $result["data"] == "HTTPSQS_ERROR" || $result["data"] == false) {
return false;
}
return $result["data"];
}

public function reset($queue_name)
{
$result = $this->http_get("/?auth=".$this->httpsqs_auth."&charset=".$this->httpsqs_charset."&name=".$queue_name."&opt=reset");
if ($result["data"] == "HTTPSQS_RESET_OK") {
return true;
}
return false;
}

public function maxqueue($queue_name, $num)
{
$result = $this->http_get("/?auth=".$this->httpsqs_auth."&charset=".$this->httpsqs_charset."&name=".$queue_name."&opt=maxqueue&num=".$num);
if ($result["data"] == "HTTPSQS_MAXQUEUE_OK") {
return true;
}
return false;
}

public function status_json($queue_name)
{
$result = $this->http_get("/?auth=".$this->httpsqs_auth."&charset=".$this->httpsqs_charset."&name=".$queue_name."&opt=status_json");
if ($result == false || $result["data"] == "HTTPSQS_ERROR" || $result["data"] == false) {
return false;
}
return $result["data"];
}

public function synctime($num)
{
$result = $this->http_get("/?auth=".$this->httpsqs_auth."&charset=".$this->httpsqs_charset."&name=httpsqs_synctime&opt=synctime&num=".$num);
if ($result["data"] == "HTTPSQS_SYNCTIME_OK") {
return true;
}
return false;
}
}
?>

其他HTTPSQS用法可参考: http://zyan.cc/httpsqs/

有用就打赏一下作者吧!