mingyun / mingyun.github.io

github主页
158 stars 92 forks source link

redis 延时队列 #111

Open mingyun opened 6 years ago

mingyun commented 6 years ago

通常一些比较耗时的任务如发邮件,短信,不需要马上获取发送状态,可以使用队列来处理,之前在 sf 公众号看了一篇关于 redis 延时队列的文章 https://mp.weixin.qq.com/s/Dkz85tZgTs7aPYSJoq4XwA 分享给大家

简单的队列处理

//入队列
while(1){  
        try{  
                $value = "value_".time();  
                $redis->LPUSH('key1', $value);  
                echo $value."\n";  
                sleep(rand(1, 3));  
        }catch(Exception $e){  
                echo $e->getMessage(). "\n";  
                exit;  
        }  
} 
//消费队列
while(1){  
        try{  
                $value = $redis->BLPOP('key1', 20); //超时 20秒 
                if(!empty($value)){  
                        var_export( $value )."\n";  
                }  

        }catch(Exception $e){  
                echo $e->getMessage(). "\n";  
                exit;  
        }  
} 

入队列

<?php 
// https://github.com/wuzhc/demo/blob/master/redis/queue/queueBlpop.php
$redis = new Redis();
$redis->connect("127.0.0.1", "6379");  //php客户端设置的ip及端口  
$redis->zAdd('goods:delay:task', time() + 50, json_encode(['id'=>1,'cid'=>1,'name'=>'php']));
$redis->zAdd('goods:delay:task', time() + 60, json_encode(['id'=>2,'cid'=>2,'name'=>'js']));
$redis->zAdd('goods:delay:task', time() + 70, json_encode(['id'=>3,'cid'=>3,'name'=>'py']));

/**
 * 尝试3秒内获取锁
 * @param string $lockName
 * @param int $timeout
 * @return bool|string
 */
function acquireLock($lockName, $timeout = 3)
{
    global $redis;
    $identifier = uniqid();
    $end = time() + $timeout;
    while ($end >= time()) {
        if ($redis->set($lockName, $identifier, array('nx'))) {
            return $identifier;
        }
        usleep(1000);
    }
    return false;
}
/**
 * 释放锁https://mp.weixin.qq.com/s/Dkz85tZgTs7aPYSJoq4XwA
 * @param $lockName
 * @param $identifier
 * @return bool
 */
function releaseLock($lockName, $identifier)
{
    global $redis;
    while (true) {
        $redis->watch($lockName);
        if ($redis->get($lockName) == $identifier) {
            $redis->multi(Redis::MULTI);
            $redis->del($lockName);
            $res = $redis->exec();
            if (isset($res[0]) && $res[0] == 1) {
                return true;
            }
        } else {
            $redis->unwatch();
            return false;
        }
    }
}
//
while (1) {
    // 因为是有序集合,只要判断第一条记录的延时时间,例如第一条未到执行时间
    // 相对说明集合的其他任务未到执行时间
    $rs = $redis->zRange('goods:delay:task', 0, 0, true);print_r($rs);//[{"id":1,"cid":1,"name":"php"}] => 1512713765
    // 集合没有任务,睡眠时间设置为5秒
    if (empty($rs)) {
        echo 'no tasks , sleep 5 seconds' . PHP_EOL;
        sleep(5);
        continue;
    }
    $taskJson = key($rs);print_r($taskJson);//{"id":1,"cid":1,"name":"php"}
    $delay = $rs[$taskJson];
    $task = json_decode($taskJson, true);
    $now = time();
    // 到时间执行延时任务
    if ($delay <= $now) {
        // 对当前任务加锁,避免移动移动延时任务到任务队列时被其他客户端修改
        if (!($identifier = acquireLock($task['id']))) {
            continue;
        }
        // 移动延时任务到任务队列
        $redis->zRem('goods:delay:task', $taskJson);
        $redis->rPush('goods:task', $taskJson);
        echo $task['id'] . ' run ' . PHP_EOL;
        // 释放锁
        releaseLock($task['id'], $identifier);
    } else {
        // 延时任务未到执行时间
        $sleep = $delay - $now;
        // 最大值设置为2秒,保证如果有新的任务(延时时间1秒)进入集合时能够及时的被处理
//        $sleep = $sleep > 2 ? 2 :$sleep;
        echo 'wait ' . $sleep . ' seconds ' . PHP_EOL;
        sleep($sleep);
    }
}
//执行效果
$ php queue.php
Array
(
    [{"id":1,"cid":1,"name":"php"}] => 1512714807
)
{"id":1,"cid":1,"name":"php"}wait 50 seconds
Array
(
    [{"id":1,"cid":1,"name":"php"}] => 1512714807
)
{"id":1,"cid":1,"name":"php"}1 run
Array
(
    [{"id":2,"cid":2,"name":"js"}] => 1512714817
)
{"id":2,"cid":2,"name":"js"}wait 6 seconds
Array
(
    [{"id":2,"cid":2,"name":"js"}] => 1512714817
)
{"id":2,"cid":2,"name":"js"}2 run
Array
(
    [{"id":3,"cid":3,"name":"py"}] => 1512714827
)
{"id":3,"cid":3,"name":"py"}wait 6 seconds
Array
(
    [{"id":3,"cid":3,"name":"py"}] => 1512714827
)
{"id":3,"cid":3,"name":"py"}3 run

消费队列

<?php
$redis = new Redis();
$redis->connect('127.0.0.1');
// 出队
while (true) {
    // 阻塞设置超时时间为3秒
    $task = $redis->blPop(array('goods:task'), 3);
    if ($task) {
        $redis->rPush('goods:success:task', $task[1]);
        $task = json_decode($task[1], true);
        echo $task['id'] . ':' . $task['cid'] . ':' . 'handle success';
        echo PHP_EOL;
    } else {
        echo 'nothing' . PHP_EOL;
        sleep(5);
    }
}
//执行效果
php delay.php
nothing
nothing
nothing
1:1:handle success
nothing
2:2:handle success
nothing
3:3:handle success
nothing
nothing

laravel 延时队列

$date = \Carbon\Carbon::now()->addMinutes(3);
\Queue::later($date,new \App\Commands\Wechat(135678, 123456789));
//实际上是队列进入 zset queues:default:delayed https://www.lijinma.com/blog/2017/01/31/laravel-queue/
//源码文件
  \vendor\laravel\framework\src\Illuminate\Queue\Queue.php
  \vendor\laravel\framework\src\Illuminate\Queue\RedisQueue.php
public function later($delay, $job, $data = '', $queue = null)
    {
        $payload = $this->createPayload($job, $data);

        $delay = $this->getSeconds($delay);

        $this->getConnection()->zadd($this->getQueue($queue).':delayed', $this->getTime() + $delay, $payload);

        return array_get(json_decode($payload, true), 'id');
    }
protected function getSeconds($delay)
    {
        if ($delay instanceof DateTime)
        {
            return max(0, $delay->getTimestamp() - $this->getTime());
        }

        return (int) $delay;
    }return $this->clients[$name ?: 'default'];
protected function getQueue($queue)
    {
        return 'queues:'.($queue ?: $this->default);
    }
protected function createPayload($job, $data = '', $queue = null)
    {
        if ($job instanceof Closure)
        {
            return json_encode($this->createClosurePayload($job, $data));
        }
        elseif (is_object($job))
        {
            return json_encode([
                'job' => 'Illuminate\Queue\CallQueuedHandler@call',
                'data' => ['command' => serialize(clone $job)],
            ]);
        }

        return json_encode($this->createPlainPayload($job, $data));
    }
protected function setMeta($payload, $key, $value)
    {
        $payload = json_decode($payload, true);

        return json_encode(array_set($payload, $key, $value));
    }
// 消费队列
<?php
namespace App\Commands;
use Illuminate\Queue\SerializesModels;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Contracts\Bus\SelfHandling;
use Illuminate\Contracts\Queue\ShouldBeQueued;

class Wechat extends Command implements SelfHandling, ShouldBeQueued
{
    use InteractsWithQueue, SerializesModels;

    /**
     * @var int
     */
    protected $userId;

    /**
     * @var int
     */
    protected $id;

    /**
     * Create a new command instance.
     *
     * @param int $userId
     * @param int $id
     */
    public function __construct($userId, $id)
    {
        $this->userId = $userId;
        $this->id = $id;
    }

    /**
     * Execute the command.
     *
     * @return void
     */
    public function handle()
    {
        if (!$this->id || !$this->userId) {
            $this->delete();
            return;
        }

        $token = getToken();
        $json = sprintf('{
           "touser":"%s",
           "template_id":"qBnWvRl0aULE6fXuMemeD7v9Dr0JD4w",
           "url":"http://www.baidu.com",            
           "data":{
                   "title": {
                       "value":"自定义标题",
                       "color":"#173177"
                   },
                   "content":{
                       "value":"自定义内容",
                       "color":"#173177"
                   },
                   "time": {
                       "value":"时间",
                       "color":"#173177"
                   }
           }
        }', $this->userId);
        $url = "https://api.weixin.qq.com/cgi-bin/message/template/send?access_token=$token";
        $data = curl($url, $json);//一个 curl post 函数
        $data = json_decode($data, 1);
        $this->delete();
    }
} 

一分钟限制请求 100 次

 function limit($key){
        $maxNum = $redis->incr($key);
        if($maxNum <= 1){
            $redis->expire($key,60);
        }else if($maxNum > 100){
            return false;
        }
        return true;
    }
mingyun commented 6 years ago
 function urlsafe_base64_encode($data)
    {
        return strtr(base64_encode($data), ['+' => '-', '/' => '_', '=' => '']);
    }
function urlsafe_base64_decode($data, $strict = false)
    {
        return base64_decode(strtr($data, '-_', '+/'), $strict);
    }

use Illuminate\Queue\InteractsWithQueue; use InteractsWithQueue; if($this->attempts()>3){ $this->delete();检查这个工作已经被执行的次数 } Mac 上的文件系统 HFS+ 默认是大小写不敏感 文件名大小写变化,git 根本没有任何察觉 删除旧文件,提交代码。 添加新文件,提交代码。 git config core.ignorecase false $ git mv test.txt tmp.txt $ git mv tmp.txt Test.txt $ git commit -m "Renamed test.txt to Test.txt" https://www.lijinma.com/ 开发环境我们想测试的时候,可以把 Queue driver 设置成为 sync,这样队列就变成了同步执行,方便调试队列里面的任务

127.0.0.1:6379>monitor LRANGE "queues:default" 0 -1 获取 List 里面的所有数据。 ZRANGE ”queues:default:reserved“ 0 -1 获取 zset 里面的所有数据。 ZRANGEBYSCORE queues:default:reserved -inf +inf 通过时间来排序获取所有数据 使用 json_encode([]) 得到的结果是

那如何返回一个 {} (json) json_encode(new stdClass) 我觉得写一个接口应该考虑如下的内容:

确定 url:是否符合 Restful,是否要符合公司确定的规范? 确认操作人的权限 确定要操作的资源的权限 验证传入的参数:不要相信外面传进来的任何数据 操作失败的提示 操作成功的提示 写文档:如何按规矩写好文档是一件值得花时间和精力梳理的事情 如何测试接口,用什么数据测试 求10100001中1的数量是奇数还是偶数; 答案:1 ^ 0 ^ 1 ^ 0 ^ 0 ^ 0 ^ 0 ^ 1 = 1,结果为1就是奇数个1,结果为0就是偶数个1; 你肯定会想到判断a - b == 0,但是如果判断a ^ b == 0效率将会更高

RedisClient::getInstance()->lPush($key,json_encode($data));
        RedisClient::getInstance()->lTrim($key,0,10);//截取前10条
/**
     * 队列处理  根据房间号  self::$roomMsgSeconds 秒之内处理 self::$roomMsgMaxNum 条 如果超过 直接丢弃数据 不往nginx push stream 推了
     * @return bool
     */
    public function sendAction(){
        set_time_limit(0);
        $startTime = time();
        $duration = 300;
        while (time() - $startTime < $duration){
            $this->_currentLimiting();
            usleep(100000);
        }
    }
/**
     * 房间限流处理 60秒限制 100
     * @param $roomKey
     * @return bool
     */
    private function _currentLimiting($roomKey){
        $maxNum = RedisClient::getInstance()->incr($roomKey);
        if($maxNum <=1){
            RedisClient::getInstance()->expire($roomKey,60);
        }else if($maxNum > 100){
            //如果值大于20000 记录日志处理
            if($maxNum >= 20000 ){
                Common::writeLog('timeout key '.$roomKey.' num '.$maxNum,'error','redisTimeoutKey');
            }
            return false;
        }else{
            return true;
        }
        return true;
    }
$str = "昵称\t添加时间\t消息内容\tTo\n";
            $str = iconv('utf-8','GBK//IGNORE',$str);
            foreach($ret as $val){
                $userName = !empty($val['user_name']) ? iconv('utf-8','GBK//IGNORE',$val['user_name']) : '-';
                $addTime  = !empty($val['created_at'])? iconv('utf-8','GBK//IGNORE',$val['created_at']): '-';
                $text     = !empty($val['content'])   ? iconv('utf-8','GBK//IGNORE',$val['content'])   : '-';
                $to       = !empty($val['nick_name']) ? iconv('utf-8','GBK//IGNORE',$val['nick_name']) : '-';
                $text     = str_replace(array("\r\n", "\r", "\n"), "", $text);
                $str     .= $userName."\t".$addTime."\t".$text."\t".$to."\n";

            }
            $fileName = date('YmdHis').'_'.$room.'.xls';
            $this->_exportData($fileName,$str);
}
private function _exportData($fileName,$content){
        header("Content-type: text/html; charset=utf-8");
        header("Cache-Control: must-revalidate, post-check=0, pre-check=0");
        header("Content-Type: application/vnd.ms-execl");
        header("Content-Type: application/force-download");
        header("Content-Type: application/download");
        header("Content-Disposition: attachment; filename=".$fileName);
        header("Content-Transfer-Encoding: binary");
        header("Pragma: no-cache");
        header("Expires: 0");
        echo $content;exit;
    }
function get_ip_address(){
    foreach (array('HTTP_CLIENT_IP', 'HTTP_X_FORWARDED_FOR', 'HTTP_X_FORWARDED', 'HTTP_X_CLUSTER_CLIENT_IP', 'HTTP_FORWARDED_FOR', 'HTTP_FORWARDED', 'REMOTE_ADDR') as $key){
        if (array_key_exists($key, $_SERVER) === true){
            foreach (explode(',', $_SERVER[$key]) as $ip){
                $ip = trim($ip); // just to be safe
                if (filter_var($ip, FILTER_VALIDATE_IP, FILTER_FLAG_NO_PRIV_RANGE | FILTER_FLAG_NO_RES_RANGE) !== false){
                    return $ip;
                }
            }
        }
    }
}location / {
    proxy_set_header Host $host;
    proxy_set_header X-Real-IP $remote_addr;
    proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
}
著作权归作者所有。
商业转载请联系作者获得授权,非商业转载请注明出处。
作者:mokeyjay
链接:https://www.mokeyjay.com/archives/1166
来源:超能小紫

// 过滤掉emoji表情
function filterEmoji($str)
{
    $str = preg_replace_callback(
            '/./u',
            function (array $match) {
                return strlen($match[0]) >= 4 ? '' : $match[0];
            },
            $str);
 
     return $str;
 }

https://www.cnblogs.com/weafer/archive/2011/09/21/2184059.html lpop命令的block版本。即当timeout为0时,若遇到名称为key i的list不存在或该list为空,则命令结束。