swoft-cloud / swoft

🚀 PHP Microservice Full Coroutine Framework
https://swoft.org
Apache License 2.0
5.58k stars 786 forks source link

chunk 分块获取数据中增加子协程部分数据丢失 #1262

Open lizhanghu opened 4 years ago

lizhanghu commented 4 years ago

StatsCommand 代码块

$queue = $input->getInt('queue');
$startId = $input->getInt('start_id');
$endId = $input->getInt('end_id');
$this->StartTime = time();
$count = $queue??40;
unset($args);
unset($input);
echo "开始执行,并发数量: ".$count."\n";
if ($startId && $endId) {
            $where = [
                'total_count'=>0,
                'pass_push_count'=>0,
                ['id', '>=', $startId],
                ['id', '<=', $endId]
            ];
 } else {
            $where = [
                'total_count'=>0,
                'pass_push_count'=>0
            ];
 }
$channel = new Channel(1000);
Stats::where($where)->orderBy('id','asc')->chunk($count, function ($words) use ($channel){
                echo count($words)." while\n";
                foreach ($words as $key=>$wordObj) {
                    sgo(function () use ($channel, $wordObj) {
                        $word = ArrayHelper::toArray($wordObj);
                        if ($channel->isFull()) {
                            \co::sleep(1);
                        }
                        $result = $channel->push([
                            'id' => $word['id'],
                            'total' => 1 
                        ]);
                        if ($result == false) {
                            echo $channel->errCode."\n";
                        }
                    });
                }

                $count = count($words);
                $i = 1;
                while(true) {
                    if($i>=$count) {
                        break;
                    } else {
                        $data = $channel->pop();
                        if ($data) {
                            echo "ID:".$data['id']."start \n";
                            $i++;
                        }
                        if ($data['total']>0) {
                            $co = sgo(function () use ($data) {
                                $flag = Stats::find($data['id'])->update([
                                    'total_count' => $data['total'],
                                ]);
                                if ($flag == false) {
                                    echo "ID:".$data['id']." total:".($data['total'])." update失败! \n";
                                } else {
                                    echo "ID:".$data['id']." total:".($data['total'])."\n";
                                }
                            });
                            if ($co ==false) {
                                echo "ID:".$data['id']." 协程创建失败 \n";
                            }
                        }
                    }

                }
            echo "使用内存: ".$this->getFilesize(memory_get_usage())."\n";
});

命令行执行
php bin/swoft stats:run queue=50 start_id=1 end_id=500

问题描述

如果执行这样的代码发现会少update近200条数据,也没有报错
$flag = Stats::find($data['id'])->update([
      'total_count' => $data['total'],
]);
如果把上面的这段注释掉就日志就会echo出500条记录
我不清楚是因为协程的问题还是生成的实体连接池问题

版本

swoft2.0 swoole 4.4.7
sakuraovq commented 4 years ago

打开S QL事件监听看看 有没有执行 sql

lizhanghu commented 4 years ago

打开S QL事件监听看看 有没有执行 sql 是swoft里面的事件吗

lizhanghu commented 4 years ago

打开S QL事件监听看看 有没有执行 sql

class ModelSavedListener implements EventHandlerInterface
{
    /**
     * @param EventInterface $event
     */
    public function handle(EventInterface $event): void
    {
        /** @var Model $modelStatic */
        $modelStatic = $event->getTarget();
        if ($modelStatic instanceof KeywordStats) {
            $data = $modelStatic->getModelOriginal();
            $modify = $modelStatic->getModelChanges();
            echo $data['id']."修改内容为".json_encode($modify)."\n";
        }
    }
}

我添加了事件监听发现没有检测到丢失数据的日志,我在下面这段代码中打上了断点测试

if ($data['total']>0) {
                            $co = sgo(function () use ($data) {

                                --------------start---------------
                                $flag = Stats::find($data['id'])->update([
                                    'total_count' => $data['total'],
                                ]);
                                ----------------end--------------

                                if ($flag == false) {
                                    echo "ID:".$data['id']." total:".($data['total'])." update失败! \n";
                                } else {
                                    echo "ID:".$data['id']." total:".($data['total'])."\n";
                                }
                            });
                            if ($co ==false) {
                                echo "ID:".$data['id']." 协程创建失败 \n";
                            }
}

发现 chunk($count, function (\Swoft\Stdlib\Collection $words) 的count输入多少这个数据就执行到哪,比如我输入的是50(每次返回50条),id范围在1-55,id 51就已经断掉了但是也不报错。如果我把这个model的代码注释掉,51-55的id就能打印出来

lizhanghu commented 4 years ago

打开S QL事件监听看看 有没有执行 sql

您好 我还要等几天

sakuraovq commented 4 years ago

🥺 这个问题不好查询 不是连接问题吧 这种可以考虑使用 生产消费模型 去处理不要在 chunk 里面去直接消费