swoole / rfc

Swoole 提案
116 stars 3 forks source link

RFC-1029 支持多线程运行模式 #85

Closed matyhtf closed 5 months ago

matyhtf commented 8 months ago

目标

引入多线程运行模式,使 Swoole 实现多线程+协程的运行方式。

背景

Swoole 服务器编程开发中,协程的出现已经解决了大部分难题,但是我们发现跨进程读写数据依然很难,需要借助进程间通信(IPC)、RedisSwoole\Table 或其他共享内存实现。

RedisIPC 进程间通信方式性能较差。而 Swoole\Table 的问题是需要固定分配内存,无法扩容,存在诸多限制。

除此之外,多进程的调试非常麻烦,例如我们要使用 gdb 就需要 gdb -p 逐个进程去追踪,而 JavaGolang 这样的多线程模型,只有一个进程,调试更简单。实现一些底层的工具也会更容易。

实现方式

创建新线程时,隔离全局变量

const char *script_file = "index.php";
std::thread newThread1([ce]() {
        ts_resource(0);
        TSRMLS_CACHE_UPDATE();

        zend_file_handle file_handle{};
        if (php_request_startup() != SUCCESS) {
            EG(exit_status) = 1;
            goto _startup_error;
        }
        file_handle.filename = zend_string_init(script_file, strlen(script_file), 0);
        zend_first_try {
            php_execute_script(&file_handle);
        }
        zend_end_try();

        php_request_shutdown(NULL);
        file_handle.filename = NULL;
    _startup_error:
        ts_free_thread();
});

创建线程

$thread = Swoole\Thread::exec('index.php', $arg1, $arg2, ...$argv);
# 等待线程退出
$thread->join();
# index.php

echo "begin\n";
var_dump(Swoole\Thread::getId());
$args = Swoole\Thread::getArguments();
var_dump($args);

if ($args[0] == 'thread-2') {
    $t3 = Swoole\Thread::exec('mt.php', 'thread-3', PHP_OS);
    $t3->join();
}

sleep(5);
echo "end\n";

与可以在协程中创建协程相似,在子线程中依然可以创建新线程

由于 ZTS 的机制,实际上 Swoole\ThreadSwoole\Process 是一致的,无法共享任何对象资源。

实际上 Thread::exec()Process::exec() 更接近,ZTS 线程反而比 fork() 隔离得更为干净,fork() 是可以从父进程继承已创建的对象和资源,而 ZTS 新的线程不会从父线程继承任何资源,相当于是一个全新的进程。

虽然通过底层的技术手段可以实现线程之间传递对象和资源,例如 ext-pthreads 等扩展,但涉及到并行操作同一个文件句柄和内存指针等复杂的问题。再加上 Swoole 的异步 IO 和协程机制带来的复杂性。应用层代码正确地使用锁,同时兼顾性能和数据一致性是一件极其困难的事情,错误的使用方法导致严重的 BUG,因此 Swoole 不考虑提供这方面的支持。

在线程中创建协程

# index.php 
Co\run(function () {
    echo "begin\n";
    sleep(10);
    echo "end\n";    
});

在线程中可以使用 Co\run 创建新的协程调度器,使用 Co\go 创建新的协程。不同线程之间的协程无任何关联,包括 Channel 也只能在当前线程中使用。

运行结果

# index.php
[thread-0]  TheadId=140327446116480, mainThread=1, CG(compiled_filename)=(nil)
[thread-1]  TheadId=140327442708032, mainThread=0, CG(compiled_filename)=(nil)
begin
[thread-2]  TheadId=140327434315328, mainThread=0, CG(compiled_filename)=(nil)
begin

使用 ps aux 可以看到只有一个进程:

ps aux|grep php
htf       783550  0.2  0.0 171424 12028 pts/11   Sl+  10:51   0:00 php index.php

Server

$http = new Swoole\Http\Server('127.0.0.1', 9501);

# 在主线程中执行
$http->on('start', function ($server) {
    echo "Swoole http server is started at http://127.0.0.1:9501\n";
});

# 在 Worker 线程中执行
$http->on('request', function ($request, $response) {
    $response->header('Content-Type', 'text/plain');
    $response->end('Hello World');
});

# 在 Worker 线程启动时执行
$http->on('workerStart', function ($server, $workerId) {
    echo "worker thread #$workerId is started\n";
});

$http->start();

缺点

线程 API

并发 Map

use Swoole\Thread;
use Swoole\Thread\Map;

$map = new Map;

# 写入
$map[time()] = 'value';
$map['hello'] = 3.1415926;

# 读取
echo $map['hello'];

# 删除
unset($map['hello']);

# 获取长度
count($map);

# 获取所有 Key ,若 Map 过大可能会长时间占用锁,导致其他线程全部阻塞,建议只在 `shutdown` 阶段使用
$map->keys();

并发 List

use Swoole\Thread;
use Swoole\Thread\ArrayList;

$list = new ArrayList();

# 追加元素
$list[] = time();
$list[] = 99999;
$list[2] = 'test';

# 获取长度
count($list);

# 抛出异常 unsupported behavior,不支持随机删除
unset($list[1]);

# 赋值
$list[0] = 0;

# 抛出 out of range 异常,错误的赋值
$list[1000] = 0;

线程安全


$list = new Swoole\Thread\ArrayList();
$list[] = base64_encode(random_bytes(32));
$list[1] = uniqid();

$t1 = Swoole\Thread::exec('mt.php', 'thread-1', $list);

其他更新

jingjingxyk commented 8 months ago

Swoole v6 要来了!即将增加多线程支持 https://github.com/swoole/swoole-src/pull/5281/files https://github.com/swoole/swoole-src/tree/v6.0 https://github.com/swoole/swoole-src/commit/908644c72b2de0407e62a3d79f04f35fef09c98f https://cplusplus.com/reference/multithreading/

以后是不是可以写成类似python这样,无论是使用多进程、还是多线程,换个单词即可


import time
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor

def sleep_time(sleep_seconds: int) -> int:
    time.sleep(sleep_seconds)
    print(f'sleep {sleep_seconds} seconds')
    return sleep_seconds

if __name__ == '__main__':
    with ThreadPoolExecutor(max_workers=3) as executor:
        result = executor.map(sleep_time, sleep_list)
        print(result)
        print(list(result))

    with ProcessPoolExecutor(max_workers=3) as executor:
        result = executor.map(sleep_time, sleep_list)
        print(result)
        print(list(result))
matyhtf commented 8 months ago

Swoole v6 要来了!即将增加多线程支持 https://github.com/swoole/swoole-src/pull/5281/files https://github.com/swoole/swoole-src/tree/v6.0 swoole/swoole-src@908644c https://cplusplus.com/reference/multithreading/

以后是不是可以写成类似python这样,无论是使用多进程、还是多线程,换个单词即可

import time
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor

def sleep_time(sleep_seconds: int) -> int:
    time.sleep(sleep_seconds)
    print(f'sleep {sleep_seconds} seconds')
    return sleep_seconds

if __name__ == '__main__':
    with ThreadPoolExecutor(max_workers=3) as executor:
        result = executor.map(sleep_time, sleep_list)
        print(result)
        print(list(result))

    with ProcessPoolExecutor(max_workers=3) as executor:
        result = executor.map(sleep_time, sleep_list)
        print(result)
        print(list(result))

这应该是 php 层面的工作