消息队列

使用说明

  1. 支持消息的增删改重试机制及键区分
  2. 每条消息可同步复制多个队列
  3. 访问 框架路径/?c=of_base_com_mq 进入控制台监控或热重启操作
  4. 异常的消息会转储到磁盘并在自动恢复, 异常恢复前控制台可查看
  5. 通过配置 _of.com.mq 来设置连接池
  6. 通过配置 _of.com.mq.连接池.queues 来设置队列
  7. 通过配置 _of.com.mq.连接池.adapter 来使用不同消息队列
  8. 通过配置 _of.com.mq.连接池.moveMq 无缝切换消息队列
  9. 连接池配置结构(_of.com.mq) : {
        消息队列池名 : {
            "adapter" : 适配器,
            "params"  : 调度参数 {
            },
            "moveMq"  :o迁移队列时设置, 原队列池P改名oP, 配置新队列池P.moveMq = oP, 直到oP全部消费后移除
            "bindDb"  : 事务数据库连接池名, 跟其提交或回滚,
            "queues"  : 生产消息时会同时发给队列, 队列的磁盘配置文件路径
        }, ...
    }
    
    连接池配置结构(_of.com.mq.连接池.queues) : {
        队列名 : {
            "mode"   : 队列模式, null=生产及消费,false=仅生产,true=仅消费,
            "check"  : 自动重载消息队列触发函数,
                true=(默认)校验"消费回调"加载的文件变动,
                false=仅校验队列配置文件变动,
                字符串=以"@"开头的正则忽略路径(软链接使用真实路径), 如: "@/ctrl/@i"
            "memory" : 单个并发分配内存积累过高后自动重置, 单位M, 默认50, 0=不限制
            "keys"   : 消费消息时回调结构 {
                消息键 : 不存在的键将被抛弃 {
                    "lots" : 批量消费, 1=单条消费, >1=一次消费最多数量(消息变成一维数组)
                    "cNum" : 并发数量, 数字=每台分布系统相同, 数组=指定不同分布系统并发数 {
                        数字键(分布系统自动排序从0开始小于等于键时生效) : 生效的并发数, 例如:
                        3(首先0-3四台分布系统生效) : 1(这四台分布系统启动一个并发),
                        6(接着4-6三台分布系统生效) : 0(这三台分布系统不启动),
                        7(最后仅7这台分布系统生效) : 2(这台分布系统启动两并发, 之后都不启动)
                    }
                    "call" : 回调结构, 返回值决定消息的处理方式
                        返回 true=成功删除消息
                        返回 false=失败稍后重试
                        返回 数字=五年内秒数为xx后秒重试, 其它为指定时间戳重试
                        返回 其它=抛出错误稍后重试
                        返回 接口响应结构="code" < 400为true, 反之为false
                }, ...
            }
        }, ...
    } 或当多个队列池指定到同个文件时, 可按如下方式配置 {
        队列池 : {
            队列名 : {
                "mode"   : 队列模式, null=生产及消费,false=仅生产,true=仅消费,
                "check"  : 自动重载消息队列触发函数,
                    true=(默认)校验"消费回调"加载的文件变动,
                    false=仅校验队列配置文件变动,
                    字符串=以"@"开头的正则忽略路径(软链接使用真实路径), 如: "@/ctrl/@i"
                "memory" : 单个并发分配内存积累过高后自动重置, 单位M, 默认50, 0=不限制
                "keys"   : 消费消息时回调结构 {
                    消息键 : 不存在的键将被抛弃 {
                        "lots" : 批量消费, 1=单条消费, >1=一次消费最多数量(消息变成一维数组)
                        "cNum" : 并发数量,
                        "call" : 回调结构
                            返回 true=成功删除消息
                            返回 false=失败稍后重试
                            返回 数字=五年内秒数为xx后秒重试, 其它为指定时间戳重试
                            返回 其它=抛出错误稍后重试
                            返回 接口响应结构="code" < 400为true, 反之为false
                    }, ...
                }
            }, ...
        }
    }
  10. 消费回调统一的回调参数与返回值
  11. 回调参数(_of.com.mq.连接池.queues.队列名.keys.消息键.call) : {
        "pool"  : 指定消息队列池,
        "queue" : 队列名称,
        "key"   : 消息键,
        "lots"  : 批量消费数量,
        "this"  : 当前并发信息 {
            "cMd5" : 回调唯一值
            "cCid" : 当前并发值
        }
        "count" : 调用计数, 首次为 1
            lots=1时 : 调用计数
            lots>1时 : [调用计数, ...]
        "msgId" : 消息ID列表
            lots=1时 : 消息ID
            lots>1时 : [消息ID, ...]
        "data"  :x消息数据, _fire 函数实现
            lots=1时 : 消息数据
            lots>1时 : {
                消息ID : 消息数据,
                ...
            }
        "msgs"  : 消息列表 {
            消息ID : 单条消息 {
                "msgId" : 消息ID
                "count" : 调用计数, 首次为 1
                "data"  : 消息数据,
                "uTime" : 更新时间戳
            }, ...
        }
    }
    返回值(_of.com.mq.连接池.queues.队列名.keys.消息键.call) :
        返回 true=成功删除消息
        返回 false=失败稍后重试
        返回 数字=五年内秒数为xx后秒重试, 其它为指定时间戳重试
        返回 其它=抛出错误稍后重试
        返回 接口响应结构="code" < 400为true, 反之为false
        注意 回调不能使用类似exit的方式结束回调, 会导致消费进程重启, 降低消费速度

队列方式

  1. redis 方式的消息队列
  2. 切换redis步骤
    1. 搭建redis分布式节点(不建议集群)
    2. 通过slotNum方法计算节点分槽(使消息可以分配均匀)
    3. 修改mq配置中mqSlot为计算的分槽结果
    4. 迁移消息队列需要配置 moveMq
    
    
    redis 配置结构(_of.com.mq) : {
        消息队列池名 : {
            //配置Redis时建议配置 maxmemory, 内存满时只会拒绝服务不会导致服务崩溃
            "adapter" : "redis",
            "params"  : 调度参数 {
                "vHost"  : 虚拟主机, 不同主机下的同名队列互不冲突, 默认=""
                "kvPool" : _of.com.kv 的redis连接池, 建议使用混合型持久化
                "mqSlot" : 包含两项>0的一维数组, 如[[0, 1], [0, 1, 2]]
                           其作用是将消息均衡分布到各节点, 每项对应一个分槽
                           选择合适的数字或字符串让每个节点有一个分槽就是最优的方式
                           第0项数组代表生产分槽, 总个数代表总分槽数;
                           第1项数组代表消费分槽, 要覆盖生产分槽所有项, 当生产分槽调整时, 要包含调整前所有项, 待分槽自动调整完后, 改为生产分槽
                           注意: 真实节点变动时(含域名)不能使用此项, 必须须使用"moveMq"迁移队列, 用vHost或db防重, 否则会丢消息
        }, ...
    }
    
    /**
     * 描述 : 计算分槽数字
     * 参数 :
     *      host : 测试地址, 格式 [host:port, ...]
     *      pwd  : 测试密码
     * 打印 :
     *      输出num个数的分槽数字
     * 作者 : Edgar.lee
     */
    function slotNum($host, $pwd) {
        of_base_com_kv::pool($pool = uniqid(), array(
            'adapter' => 'redis',
            'params'  => array(
                'type' => 'distributed',
                'host' => $host,
                'auth' => $pwd,
                'db'   => 0,
            )
        ));
    
        $i = 0;
        $num = count(array_unique($host));
        $redis = of_base_com_kv::link($pool);
    
        do {
            $h = $redis->_target($i);
            isset($list[$h]) || $list[$h] = $i;
            ++$i;
        } while (count($list) < $num);
    
        echo join(', ', $list);
    }
    slotNum(array('xxx:6379', ...), '****');
    
    /**
     * 描述 : 获取redis消息队列运行信息
     * 参数 :
     *      params : {
     *          "match"   :o正则匹配队列标识, 以@开头, 默认false不过滤
     *          "mqSlot"  :o过滤队列分槽列表, 默认false=全部消息(消费分槽), true=升级中的(生产与消费分槽差集)
     *          "total"   :o是否统计消息总数, 默认false不统计, true=统计
     *          "overdue" :o是否统计可消费数, 默认false不统计, true=统计
     *          "failNo"  :o是否统计失败总数, 默认false不统计, true=统计
     *          "failed"  :o读取最大失败消息, 默认0不查询, >0为最大长度
     *          "recent"  :o读取即将消费消息, 默认0不查询, >0为最大长度
     *      }
     * 返回 :
     *      {
     *          队列标识, 队列池.队列名.消息键 : {
     *              "mqName"  : 消息名称, 虚拟机.队列名.消息键
     *              "mqSlot"  : 队列槽列表 {
     *                  虚拟机.队列名.消息键.槽编码 : {
     *                      "total"   : 消息总数
     *                      "overdue" : 可消费数
     *                      "failNo"  : 失败总数
     *                  }, ...
     *              }
     *              "total"   : 消息总数
     *              "overdue" : 可消费数
     *              "failNo"  : 失败总数
     *                  消息ID : {}, ...
     *              }
     *              "recent"  : 消费列表 {
     *                  消息ID : {}, ...
     *              }
     *              "msgList" : 消息属性汇总列表 {
     *                  消息ID : 消息属性 {
     *                      "msgId"      : 消息ID,
     *                      "data"       : 消息数据,
     *                      "syncCount"  :o失败次数,
     *                      "syncLevel"  : 同步等级(更新消息时重置),
     *                      "planTime"   :o计划时间戳(不存在时为删除),
     *                      "updateTime" : 更新时间戳,
     *                      "lockTime"   :o锁定时间戳(在此范围内不执行),
     *                      "lockMark"   :o锁定标记(防止执行超时被其它消费)
     *                  },
     *                  ...
     *              }
     *          }
     *      }
     * 作者 : Edgar.lee
     */
    of_accy_com_mq_redis::getMqInfo($params = array());
  3. mysql 方式的消息队列
  4. mysql 配置结构(_of.com.mq) : {
        消息队列池名 : {
            "adapter" : "mysql",
            "params"  : 调度参数 {
                "vHost"  : 虚拟主机, 不同主机下的同名队列互不冲突, 默认=""
                "dbPool" : _of.db 的连接池
            }
        }, ...
    }
    
    CREATE TABLE `_of_com_mq` (
        `mark` char(35) NOT NULL COMMENT '消息唯一ID(虚拟主机+队列名称+消息类型+消息ID)',
        `vHost` char(50) NOT NULL COMMENT '虚拟主机',
        `queue` char(50) NOT NULL COMMENT '队列名称',
        `type` char(50) NOT NULL COMMENT '消息类型',
        `msgId` char(100) NOT NULL COMMENT '消息ID',
        `data` mediumtext NOT NULL COMMENT '队列数据',
        `syncCount` int(11) UNSIGNED NOT NULL COMMENT '已同步次数',
        `createTime` timestamp NOT NULL DEFAULT '2000-01-01 00:00:00' COMMENT '生成时间',
        `updateTime` timestamp NOT NULL DEFAULT '2000-01-01 00:00:00' COMMENT '消费时间, 2001为删除',
        `syncLevel` int(11) UNSIGNED NOT NULL COMMENT '同步等级, 数值越大优先级越低',
        `lockTime` timestamp NOT NULL DEFAULT '2000-01-01 00:00:00' COMMENT '锁定时间, 每 syncLevel * 5 分钟重试',
        `lockMark` char(32) NOT NULL COMMENT '锁定时生成的唯一ID',
        PRIMARY KEY (`type`,`mark`) USING BTREE,
        KEY `idx_consumer` (`lockTime`,`type`,`queue`) USING BTREE
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='消息队列表'
    PARTITION BY KEY (`type`) PARTITIONS 251;
    
    下载 of < 200228 mysql模式消息队列升级包: 两种升级方式选其一
        1. 停机升级: 停止WEB服务 -> 执行升级前SQL -> 执行升级后SQL -> 升级框架 -> 开启WEB服务
        2. 无缝升级: 执行升级前SQL -> 升级框架 -> 重载队列 -> 执行升级后SQL
    

of_base_com_mq::set($keys, $data = null, $pool = 'default', $bind = null) 设置消息队列

二次开发

  1. 文件夹 /of/accy/com/mq 下存储着不同方式的对接文件
  2. 目前有已封装 mysql redis
  3. 可以通过配置文件 _of.com.mq 来使用不同方式, 支持多连接池
  4. 开发更多的存储方式, 要继承 of_base_com_mq 类并实现以下方法

abstract protected function _init($fire) 初始队列

abstract protected function _sets(&$msgs) 设置消息, 成功返回true, 失败返回false

abstract protected function _fire(&$calll, $data) 触发消息队列, 根据回调响应值执行对应动作

返回结构: true=已匹配到消息, false=未匹配到消息

abstract protected function _quit(&$data) 触发消息队列意外退出时回调

abstract protected function _begin() 开启事务, 成功返回 true, 失败返回 false

abstract protected function _commit($type) 提交事务, 成功返回 true, 失败返回 false

abstract protected function _rollBack($type) 事务回滚, 成功返回 true, 失败返回 false