使用说明
- 支持消息的增删改重试机制及键区分
- 每条消息可同步复制多个队列
- 访问 框架路径/?c=of_base_com_mq 进入控制台监控或热重启操作
- 异常的消息会转储到磁盘并在自动恢复, 异常恢复前控制台可查看
- 通过配置 _of.com.mq 来设置连接池
- 通过配置 _of.com.mq.连接池.queues 来设置队列
- 通过配置 _of.com.mq.连接池.adapter 来使用不同消息队列
- 通过配置 _of.com.mq.连接池.moveMq 无缝切换消息队列
- 消费回调统一的回调参数与返回值
连接池配置结构(_of.com.mq) : { 消息队列池名 : { "adapter" : 适配器, "params" : 调度参数 { }, "moveMq" :o迁移队列时设置, 原队列池P改名oP, 配置新队列池P.moveMq = oP, 直到oP全部消费后移除 "bindDb" : 事务数据库连接池名, 跟其提交或回滚, "queues" : 生产消息时会同时发给队列, 队列的磁盘配置文件路径 }, ... } 连接池配置结构(_of.com.mq.连接池.queues) : { 队列名 : { "mode" : 队列模式, null=生产及消费,false=仅生产,true=仅消费, "check" : 自动重载消息队列触发函数, true=(默认)校验"消费回调"加载的文件变动, false=仅校验队列配置文件变动, 字符串=以"@"开头的正则忽略路径(软链接使用真实路径), 如: "@/ctrl/@i" "memory" : 单个并发分配内存积累过高后自动重置, 单位M, 默认50, 0=不限制 "keys" : 消费消息时回调结构 { 消息键 : 不存在的键将被抛弃 { "lots" : 批量消费, 1=单条消费, >1=一次消费最多数量(消息变成一维数组) "cNum" : 并发数量, 数字=每台分布系统相同, 数组=指定不同分布系统并发数 { 数字键(分布系统自动排序从0开始小于等于键时生效) : 生效的并发数, 例如: 3(首先0-3四台分布系统生效) : 1(这四台分布系统启动一个并发), 6(接着4-6三台分布系统生效) : 0(这三台分布系统不启动), 7(最后仅7这台分布系统生效) : 2(这台分布系统启动两并发, 之后都不启动) } "call" : 回调结构, 返回值决定消息的处理方式 返回 true=成功删除消息 返回 false=失败稍后重试 返回 数字=五年内秒数为xx后秒重试, 其它为指定时间戳重试 返回 其它=抛出错误稍后重试 返回 接口响应结构="code" < 400为true, 反之为false }, ... } }, ... } 或当多个队列池指定到同个文件时, 可按如下方式配置 { 队列池 : { 队列名 : { "mode" : 队列模式, null=生产及消费,false=仅生产,true=仅消费, "check" : 自动重载消息队列触发函数, true=(默认)校验"消费回调"加载的文件变动, false=仅校验队列配置文件变动, 字符串=以"@"开头的正则忽略路径(软链接使用真实路径), 如: "@/ctrl/@i" "memory" : 单个并发分配内存积累过高后自动重置, 单位M, 默认50, 0=不限制 "keys" : 消费消息时回调结构 { 消息键 : 不存在的键将被抛弃 { "lots" : 批量消费, 1=单条消费, >1=一次消费最多数量(消息变成一维数组) "cNum" : 并发数量, "call" : 回调结构 返回 true=成功删除消息 返回 false=失败稍后重试 返回 数字=五年内秒数为xx后秒重试, 其它为指定时间戳重试 返回 其它=抛出错误稍后重试 返回 接口响应结构="code" < 400为true, 反之为false }, ... } }, ... } }
回调参数(_of.com.mq.连接池.queues.队列名.keys.消息键.call) : { "pool" : 指定消息队列池, "queue" : 队列名称, "key" : 消息键, "lots" : 批量消费数量, "this" : 当前并发信息 { "cMd5" : 回调唯一值 "cCid" : 当前并发值 } "count" : 调用计数, 首次为 1 lots=1时 : 调用计数 lots>1时 : [调用计数, ...] "msgId" : 消息ID列表 lots=1时 : 消息ID lots>1时 : [消息ID, ...] "data" :x消息数据, _fire 函数实现 lots=1时 : 消息数据 lots>1时 : { 消息ID : 消息数据, ... } "msgs" : 消息列表 { 消息ID : 单条消息 { "msgId" : 消息ID "count" : 调用计数, 首次为 1 "data" : 消息数据, "uTime" : 更新时间戳 }, ... } } 返回值(_of.com.mq.连接池.queues.队列名.keys.消息键.call) : 返回 true=成功删除消息 返回 false=失败稍后重试 返回 数字=五年内秒数为xx后秒重试, 其它为指定时间戳重试 返回 其它=抛出错误稍后重试 返回 接口响应结构="code" < 400为true, 反之为false 注意 回调不能使用类似exit的方式结束回调, 会导致消费进程重启, 降低消费速度
队列方式
- redis 方式的消息队列
- mysql 方式的消息队列
切换redis步骤 1. 搭建redis分布式节点(不建议集群) 2. 通过slotNum方法计算节点分槽(使消息可以分配均匀) 3. 修改mq配置中mqSlot为计算的分槽结果 4. 迁移消息队列需要配置 moveMq redis 配置结构(_of.com.mq) : { 消息队列池名 : { //配置Redis时建议配置 maxmemory, 内存满时只会拒绝服务不会导致服务崩溃 "adapter" : "redis", "params" : 调度参数 { "vHost" : 虚拟主机, 不同主机下的同名队列互不冲突, 默认="" "kvPool" : _of.com.kv 的redis连接池, 建议使用混合型持久化 "mqSlot" : 包含两项>0的一维数组, 如[[0, 1], [0, 1, 2]] 其作用是将消息均衡分布到各节点, 每项对应一个分槽 选择合适的数字或字符串让每个节点有一个分槽就是最优的方式 第0项数组代表生产分槽, 总个数代表总分槽数; 第1项数组代表消费分槽, 要覆盖生产分槽所有项, 当生产分槽调整时, 要包含调整前所有项, 待分槽自动调整完后, 改为生产分槽 注意: 真实节点变动时(含域名)不能使用此项, 必须须使用"moveMq"迁移队列, 用vHost或db防重, 否则会丢消息 }, ... } /** * 描述 : 计算分槽数字 * 参数 : * host : 测试地址, 格式 [host:port, ...] * pwd : 测试密码 * 打印 : * 输出num个数的分槽数字 * 作者 : Edgar.lee */ function slotNum($host, $pwd) { of_base_com_kv::pool($pool = uniqid(), array( 'adapter' => 'redis', 'params' => array( 'type' => 'distributed', 'host' => $host, 'auth' => $pwd, 'db' => 0, ) )); $i = 0; $num = count(array_unique($host)); $redis = of_base_com_kv::link($pool); do { $h = $redis->_target($i); isset($list[$h]) || $list[$h] = $i; ++$i; } while (count($list) < $num); echo join(', ', $list); } slotNum(array('xxx:6379', ...), '****'); /** * 描述 : 获取redis消息队列运行信息 * 参数 : * params : { * "match" :o正则匹配队列标识, 以@开头, 默认false不过滤 * "mqSlot" :o过滤队列分槽列表, 默认false=全部消息(消费分槽), true=升级中的(生产与消费分槽差集) * "total" :o是否统计消息总数, 默认false不统计, true=统计 * "overdue" :o是否统计可消费数, 默认false不统计, true=统计 * "failNo" :o是否统计失败总数, 默认false不统计, true=统计 * "failed" :o读取最大失败消息, 默认0不查询, >0为最大长度 * "recent" :o读取即将消费消息, 默认0不查询, >0为最大长度 * } * 返回 : * { * 队列标识, 队列池.队列名.消息键 : { * "mqName" : 消息名称, 虚拟机.队列名.消息键 * "mqSlot" : 队列槽列表 { * 虚拟机.队列名.消息键.槽编码 : { * "total" : 消息总数 * "overdue" : 可消费数 * "failNo" : 失败总数 * }, ... * } * "total" : 消息总数 * "overdue" : 可消费数 * "failNo" : 失败总数 * 消息ID : {}, ... * } * "recent" : 消费列表 { * 消息ID : {}, ... * } * "msgList" : 消息属性汇总列表 { * 消息ID : 消息属性 { * "msgId" : 消息ID, * "data" : 消息数据, * "syncCount" :o失败次数, * "syncLevel" : 同步等级(更新消息时重置), * "planTime" :o计划时间戳(不存在时为删除), * "updateTime" : 更新时间戳, * "lockTime" :o锁定时间戳(在此范围内不执行), * "lockMark" :o锁定标记(防止执行超时被其它消费) * }, * ... * } * } * } * 作者 : Edgar.lee */ of_accy_com_mq_redis::getMqInfo($params = array());
mysql 配置结构(_of.com.mq) : { 消息队列池名 : { "adapter" : "mysql", "params" : 调度参数 { "vHost" : 虚拟主机, 不同主机下的同名队列互不冲突, 默认="" "dbPool" : _of.db 的连接池 } }, ... } CREATE TABLE `_of_com_mq` ( `mark` char(35) NOT NULL COMMENT '消息唯一ID(虚拟主机+队列名称+消息类型+消息ID)', `vHost` char(50) NOT NULL COMMENT '虚拟主机', `queue` char(50) NOT NULL COMMENT '队列名称', `type` char(50) NOT NULL COMMENT '消息类型', `msgId` char(100) NOT NULL COMMENT '消息ID', `data` mediumtext NOT NULL COMMENT '队列数据', `syncCount` int(11) UNSIGNED NOT NULL COMMENT '已同步次数', `createTime` timestamp NOT NULL DEFAULT '2000-01-01 00:00:00' COMMENT '生成时间', `updateTime` timestamp NOT NULL DEFAULT '2000-01-01 00:00:00' COMMENT '消费时间, 2001为删除', `syncLevel` int(11) UNSIGNED NOT NULL COMMENT '同步等级, 数值越大优先级越低', `lockTime` timestamp NOT NULL DEFAULT '2000-01-01 00:00:00' COMMENT '锁定时间, 每 syncLevel * 5 分钟重试', `lockMark` char(32) NOT NULL COMMENT '锁定时生成的唯一ID', PRIMARY KEY (`type`,`mark`) USING BTREE, KEY `idx_consumer` (`lockTime`,`type`,`queue`) USING BTREE ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='消息队列表' PARTITION BY KEY (`type`) PARTITIONS 251; 下载 of < 200228 mysql模式消息队列升级包: 两种升级方式选其一 1. 停机升级: 停止WEB服务 -> 执行升级前SQL -> 执行升级后SQL -> 升级框架 -> 开启WEB服务 2. 无缝升级: 执行升级前SQL -> 升级框架 -> 重载队列 -> 执行升级后SQL
of_base_com_mq::set($keys, $data = null, $pool = 'default', $bind = null) 设置消息队列
-
事务操作 : keys : null=开启事务, true=提交事务, false=回滚事务 data : 指定消息队列池 pool : 指定数据库连接池 生产消息, 单条模式 : keys : 字符串=指定消息类型, 数组=[消息类型, 消息ID, 延迟秒数] data : null=删除 [消息类型, 消息ID] 指定的信息, 其它=消息数据 pool : 指定消息队列池 bind : ""=绑定到内部事务, 字符串=绑定数据池同步事务 生产消息, 批量模式 : keys : 批量消息, [{"keys" : 单条模式keys结构, "data" : 单条模式data结构}, ...] pool : 指定数据库连接池
#_of.com.mq 配置结构
array(
//消息队列池
'exchange' => array(
//适配器
'adapter' => 'mysql',
//调度参数
'params' => array(
'dbPool' => 'default'
),
//绑定事务数据库
'bindDb' => 'default',
//队列列表
'queues' => '/demo/queue/queue.php'
)
)
#/demo/queue/queue.php 配置结构
return array(
//生产消息时会同时发给多队列
'queue1' => array(
//队列模式, null=生产及消费, false=仅生产, true=仅消费
'mode' => null,
//消费消息时回调结构
'keys' => array(
//不存在的键将被抛弃
'key' => array(
'cNum' => 3,
'call' => 'demo_index::mqTest'
),
'key1' => array(
'cNum' => 1,
'call' => 'demo_index::mqTest'
)
)
),
'queue2' => array(
//队列模式, null=生产及消费, false=仅生产, true=仅消费
'mode' => null,
//消费消息时回调结构
'keys' => array(
//不存在的键将被抛弃
'key' => array(
'cNum' => 4,
'call' => 'demo_index::mqTest'
)
)
)
);
#demo_index::mqTest 代码
/**
* 描述 : 演示消息队列
*/
public function mqTest($params = null) {
//触发消息队列
if ($params) {
file_put_contents(
ROOT_DIR . OF_DATA . '/mqTest.txt',
time() . print_r($params, true),
FILE_APPEND | LOCK_EX
);
return true;
//生产消息队列
} else {
if (of::config('_of.com.mq.exchange')) {
echo '异步并发消息回调将在此文件中写入数据: ',
ROOT_DIR . OF_DATA . '/mqTest.txt';
L::sql(null);
//批量创建消息队列, queue1 与 queue2 将同时收到信息
of_base_com_mq::set(array(
array('keys' => 'key', 'data' => array(1, 2, 3)),
array('keys' => array('key'), 'data' => array(4, 5, 6)),
array('keys' => array('key1', '延迟ID', 600), 'data' => '消息信息(可传数组)'),
), 'exchange');
//因 queue2 没有 key1 键, 所以仅 queue1 会收到信息
of_base_com_mq::set(array('key1', '消息ID'), '消息信息(可传数组)', 'exchange');
L::sql(true);
} else {
echo '先取消/demo/config.php下_of.com.mq的注释';
}
}
}
#web 访问 demo_index::mqTest 将会在 OF_DATA/mqTest.txt 输出
1518646689Array
(
[pool] => exchange
[queue] => queue1
[key] => key
[data] => 消息信息
[this] => 当前并发信息 (
[cMd5] => 回调唯一值
[cCid] => 当前并发值
)
[msgId] => 消息ID
[count] => 调用计数, 首次为 1
)
1518646689Array
(
[pool] => exchange
[queue] => queue2
[key] => key
[data] => 消息信息
[this] => 当前并发信息 (
[cMd5] => 回调唯一值
[cCid] => 当前并发值
)
[msgId] => 消息ID
[count] => 调用计数, 首次为 1
)
二次开发
- 文件夹 /of/accy/com/mq 下存储着不同方式的对接文件
- 目前有已封装 mysql redis
- 可以通过配置文件 _of.com.mq 来使用不同方式, 支持多连接池
- 开发更多的存储方式, 要继承 of_base_com_mq 类并实现以下方法
abstract protected function _init($fire) 初始队列
-
firearray队列定位
{ "pool" : 消息的队列池 "bind" : 绑定的数据库池 }
abstract protected function _sets(&$msgs) 设置消息, 成功返回true, 失败返回false
-
msgsarray需要设置的消息集合
[{ "keys" : 消息定位 [消息类型, 消息主键], "data" : 消息数据, null=删除 keys 指定的信息, 其它=消息数据(包括数组) "pool" : 指定消息队列池, "bind" : ""=绑定到手动事务, 字符串=绑定数据池同步事务 "queue" : 队列名称 }, ...]
abstract protected function _fire(&$calll, $data) 触发消息队列, 根据回调响应值执行对应动作
-
callstring array符合回调结构
-
dataarray需要设置的消息集合, call的回调参数
{ "pool" : 指定消息队列池, "queue" : 队列名称, "key" : 消息键, "lots" : 批量消费数量, "this" : 当前并发信息 { "cMd5" : 回调唯一值 "cCid" : 并发ID, 从1-n }, "msgs" : 消息列表 { 消息ID : 单条消息 { "msgId" : 消息ID "count" : 调用计数, 首次为 1 "data" : 消息数据, "uTime" : 更新时间戳 }, ... }, ... } 在_fire实现的方法中调用of_base_com_mq::callback来执行消息队列 $result = of_base_com_mq::callback(_fire的$call参数, _fire的$data参数 + array( "msgs" : 消息列表 { 消息ID : 单条消息 { "msgId" : 消息ID "count" : 调用计数, 首次为 1 "data" : 消息数据, "uTime" : 更新时间戳 }, ... }, ... )); 同时 $result 的三个返回值, 要去实现 返回 true=成功删除消息 返回 false=失败稍后重试 返回 数字=五年内秒数为xx后秒重试, 其它为指定时间戳重试
返回结构: true=已匹配到消息, false=未匹配到消息
abstract protected function _quit(&$data) 触发消息队列意外退出时回调
-
dataarray回调参数结构
{ "pool" : 指定消息队列池, "queue" : 队列名称, "key" : 消息键, "this" : 当前并发信息 { "cMd5" : 回调唯一值 "cCid" : 当前并发值 } "msgs" : 消息数据列表 }
abstract protected function _begin() 开启事务, 成功返回 true, 失败返回 false
abstract protected function _commit($type) 提交事务, 成功返回 true, 失败返回 false
-
typestring"before"=提交开始回调, "after"=提交结束回调
abstract protected function _rollBack($type) 事务回滚, 成功返回 true, 失败返回 false
-
typestring"before"=回滚开始回调, "after"=回滚结束回调