swoole运行模式

Swoole跟传统的web开发有什么区别,除了传统的LAMP/LNMP同步开发模式,swoole的异步开发模式是怎么样的。

1.传统web开发模式

PHP web开发采用的方式是LAMP/LNMP架构,即Linux、Nginx,Mysql和PHP。这里以nginx来举例,大致结构为:

当请求进入时,web server将请求转交给PHP-FPM,PHP-FPM是一个进程池架构的FastCGI服务,内置PHP解释器。FPM负责解释执行PHP文件生成响应,最终返回给web server,展现至前端。PHP文件中实现了许多业务逻辑,包括Mysql和Nosql的访问,调用第三方应用等等。

这样的结构php-fpm和nginx的配合已经运行得足够好,但是由于php-fpm本身是同步阻塞进程模型,在请求结束后释放所有的资源(包括框架初始化创建的一系列对象),导致PHP进程“空转”(创建<—>销毁<—>创建)消耗大量的CPU资源,从而导致单机的吞吐能力有限。

每次请求处理的过程都意味着一次PHP文件解析,环境设置等不必要的耗时操作PHP进程处理完即销毁,无法在PHP程序中使用连接池等技术实现性能优化。

2. Swoole运行模式

针对传统架构的问题,swoole从PHP扩展出发,解决了上述问题。

相比于传统架构,Swoole进程模型最大的特点在于其多线程Reactor模式处理网络请求,使得其能轻松应对大量连接。

除此之外的优点还包括:

全异步非阻塞,占用资源开销小,程序执行效率高

程序运行只解析加载一次PHP文件,避免每次请求的重复加载

进程常驻,使得连接池和请求之间信息传递等的实现成为可能

3.使用swoole和传统php开发的缺点

1、更难上手。这要求开发人员对于多进程的运行模式有更清晰的认识 2、更容易内存泄露。在处理全局变量,静态变量的时候一定要小心,这种不会被GC清理的变量会存在整个生命周期中,如果没有正确的处理,很容易消耗完所有的内存。在php-fpm下,php代码执行完内存就会被完全释放。

http_server详解

简介

swoole 内置实现了一个简单的httpServer类。swoole版的http server相对于php-fpm,最大优势在于高性能,代码一次载入内存,后续无需再解释执行,但是swoole_http_server对Http协议的支持并不完整,实际生产环境,一般是前面加一层Nginx,由Nginx解析一些静态css、js文件,php文件请求交由swoole处理

1、http_server本质是swoole_server,不过在协议的解析部分固定使用的是http协议解析

2、完整的http协议请求会被解析并封装在swoole_http_request对象当中

3、所有的http响应都通过swoole_http_response对象进行封装和发送

创建http_server

 1<?php
 2
 3$http = new swoole_http_server("0.0.0.0", 8811);
 4$http->set(
 5    [
 6        'enable_static_handler' => true,  //处理静态文件
 7        'document_root' => "/home/www/test/crswoole/demo/data",
 8    ]
 9);
10$http->on('request', function($request, $response) {
11    //print_r($request->get);
12    $content = [
13        'date:' => date("Ymd H:i:s"),
14        'get:' => $request->get,
15        'post:' => $request->post,
16        'header:' => $request->header,
17    ];
18    swoole_async_writefile(__DIR__."/access.log", json_encode($content).PHP_EOL, function($filename){
19        // todo
20    }, FILE_APPEND);
21    $response->cookie("singwa", "xsssss", time() + 1800);
22    $response->end("sss". json_encode($request->get));
23});
24$http->start();

其中onRequest回调接收两个参数分别是swoole_http_request对象和swoole_http_response对象,分别负责request请求和response响应。

swoole_http_request,负责http请求的相关信息。我们可以在这个对象上,获取header\server\get\post\files\cookie等信息,这等同于php的超全局变量,但是这可不是超全局的。

img

注意事项:

1、Swoole的HttpServer可以接受application/x-www-form-urlencoded/form-data类型的POST参数,并且会将解析后的参数存放在swoole_server_request对象的post成员变量内。

对于application/json或者其他类型的请求参数,Swoole底层并不会自动解析。但是Swoole的swoole_server_request提供了rawContent方法可以获得原始的POST字符串,我们可以根据Content-type类型做相应的解析。

2、POST/文件上传需要设置临时文件位置(upload_tmp_dir),并且需要设置包的大小,最大尺寸受到 package_max_length 配置项限制,默认为2M,调用$response->end后会自动删除,在end之后操作上传文件会抛出文件不存在错误。

swoole_http_response

swoole_http_response,负责处理HTTP响应信息,包括响应的头信息header\响应状态等,跟传统的HTTP请求和响应差别不大,比较简单,大概了解下即可。

注意事项:

请在end()之前设置相应的响应头、状态等等,end操作后将向客户端浏览器发送内容,并销毁request/response对象

路由

Swoole的HttpServer的路由需要开发者自己手动实现

  • 1、通过server[‘path_info’]获取到路由信息
  • 2、定义好相应的规则对请求目录进行分割,比如(模块\控制器\方法)
  • 3、注册spl_autoload_register
  • 4、拼接命名空间的地址实例化时自动触发autoLoad进行加载

swoole websocket

Swoole增加了内置的WebSocket服务器支持,通过几行PHP代码就可以写出一个异步非阻塞多进程的WebSocket服务器。

demo代码

服务端代码

 1<?php
 2//继承http_server
 3$server = new swoole_websocket_server("0.0.0.0", 8812);
 4//$server->set([]);
 5$server->set(
 6    [
 7        'enable_static_handler' => true, // //处理静态文件
 8        'document_root' => "/home/www/test/crswoole/demo/data",
 9    ]
10);
11//监听websocket连接打开事件
12$server->on('open', 'onOpen');
13function onOpen($server, $request) {
14    print_r($request->fd);
15}
16//开启此函数 open事件就不触发
17$server->on('handshake', function (\swoole_http_request $request, \swoole_http_response $response) {
18    // print_r( $request->header );
19    // if (如果不满足我某些自定义的需求条件,那么返回end输出,返回false,握手失败) {
20    //    $response->end();
21    //     return false;
22    // }
23    // websocket握手连接算法验证
24    $secWebSocketKey = $request->header['sec-websocket-key'];
25    $patten = '#^[+/0-9A-Za-z]{21}[AQgw]==$#';
26    if (0 === preg_match($patten, $secWebSocketKey) || 16 !== strlen(base64_decode($secWebSocketKey))) {
27        $response->end();
28        return false;
29    }
30    echo $request->header['sec-websocket-key'];
31    $key = base64_encode(sha1(
32        $request->header['sec-websocket-key'] . '258EAFA5-E914-47DA-95CA-C5AB0DC85B11',
33        true
34    ));
35    $headers = [
36        'Upgrade' => 'websocket',
37        'Connection' => 'Upgrade',
38        'Sec-WebSocket-Accept' => $key,
39        'Sec-WebSocket-Version' => '13',
40    ];
41    // WebSocket connection to 'ws://127.0.0.1:9502/'
42    // failed: Error during WebSocket handshake:
43    // Response must not include 'Sec-WebSocket-Protocol' header if not present in request: websocket
44    if (isset($request->header['sec-websocket-protocol'])) {
45        $headers['Sec-WebSocket-Protocol'] = $request->header['sec-websocket-protocol'];
46    }
47    foreach ($headers as $key => $val) {
48        $response->header($key, $val);
49    }
50    $response->status(101);
51    $response->end();
52});
53//发送http请求会触发,然后通知所有websocket客户端
54$server->on('request',function($request,$reponse) use($server){
55    //echo 'request';
56//    $fds=$server->connection_list();
57//    foreach($fds as $value){
58//        if($server->isEstablished($value)){
59//            $server->push($value,'通知');
60//        }
61//    }
62//    $reponse->end('websocket');
63    //推荐 connections属性  TCP连接迭代器,可以使用foreach遍历服务器当前所有的连接
64    foreach($server->connections as $fd)
65    {
66        if($server->isEstablished($fd)){
67            $server->push($fd,'通知');
68        }
69    }
70    $reponse->end('websocket');
71});
72// 监听ws消息事件
73$server->on('message', function (swoole_websocket_server $server, $frame) {
74    echo "receive from {$frame->fd}:{$frame->data},opcode:{$frame->opcode},fin:{$frame->finish}\n";
75    $server->push($frame->fd, "singwa-push-secesss");
76});
77$server->on('close', function ($ser, $fd) {
78    echo "client {$fd} closed\n";
79});
80$server->start();

客户端代码

 1<!DOCTYPE html>
 2<html lang="en">
 3<head>
 4    <meta charset="UTF-8">
 5    <title></title>
 6</head>
 7<body>
 8<h1>singwa-swoole-ws测试<h1>
 9    <script>
10        var wsUrl = "ws://192.168.100.98:8812";
11        var websocket = new WebSocket(wsUrl);
12        //实例对象的onopen属性
13        websocket.onopen = function(evt) {
14            websocket.send("hello-sinwa");
15            console.log("conected-swoole-success");
16        }
17        // 实例化 onmessage
18        websocket.onmessage = function(evt) {
19            console.log("ws-server-return-data:" + evt.data);
20        }
21        //onclose
22        websocket.onclose = function(evt) {
23            console.log("close");
24        }
25        //onerror
26        websocket.onerror = function(evt, e) {
27            console.log("error:" + evt.data);
28        }
29    </script>
30</body>
31</html>

onOpen事件

1function onOpen($server, $request) {
2    print_r($request->fd);
3}
  • WebSocket客户端与服务器建立连接并完成握手后会回调此函数。
  • $req 是一个Http请求对象,包含了客户端发来的握手请求信息
  • onOpen事件函数中可以调用push向客户端发送数据或者调用close关闭连接
  • onOpen事件回调是可选的

onMessage事件

1$server->on(‘message’, function (swoole_websocket_server $server, $frame)

当服务器收到来自客户端的数据帧时会回调此函数。

  • $frame 是swoole_websocket_frame对象,包含了客户端发来的数据帧信息
  • onMessage回调必须被设置,未设置服务器将无法启动

$frame

1$frame 共有4个属性,分别是:
2$frame->fd,客户端的socket_id,使用$server->push推送数据时需要用到。
3$frame->data,数据内容,可以是文本内容也可以是二进制数据,可以通过opcode的值来判断
4$frame->opcode,WebSocket的OpCode类型,可以参考WebSocket协议标准文档
5$frame->finish, 表示数据帧是否完整,一个WebSocket请求可能会分成多个数据帧进行发送

Push方法

1function WebSocket\Server->push(int $fd, $data, int $opcode = 1, bool $finish = true);
2
3$fd 客户端连接的ID,如果指定的$fd对应的TCP连接并非websocket客户端,将会发送失败
4$data 要发送的数据内容向websocket客户端连接推送数据,长度最大不得超过2M。
5$data 如果是文本类型,编码格式必然是UTF-8,这是WebSocket协议规定的
6$opcode,指定发送数据内容的格式,默认为文本。发送其它格式可以参考文档常量

注意:

1、连接保持+心跳

Websocket也是长连接的形式,同样支持自己实现心跳包的检测,怎么实现请参考前面的

2、校验客户端连接的有效性

我们创建的websocket_server,是对外开放的,也就是任何人都能连接过来,对于非websocket协议同样能触发,所以我们要判断当前是websocket客户端并且能够通讯才进行发送。

swoole task

task进程异步任务实现

task进程适用场景

情景一:管理员需要给100W用户发送邮件,当点击发送,浏览器会一直转圈,直到邮件全部发送完毕。

情景二:千万微博大V发送一条微博,其关注的粉丝相应的会接收到这个消息,是不是大V需要一直等待消息发送完成,才能执行其它操作

情景三:处理几TB数据

情景四: 数据库读写操作

情景五:广播消息

从我们理解的角度思考,这其实都是php进程一直被阻塞,客户端才一直在等待服务端的响应,我们的代码就是同步执行的。

对于用户而言,这就是漫长的等待。如何优雅的提高用户体验就是一个非常棘手的问题。

那么task的目的就是这个

woker进程投递到task进程

1、worker进程当中,我们调用对应的task()方法发送数据通知到task worker进程

2、task worker进程会在onTask()回调中接收到这些数据,并进行处理。

3、处理完成之后通过调用finsh()函数或者直接return返回消息给worker进程

4、worker进程在onFinsh()进程收到这些消息并进行处理

请阅读手册:https://wiki.swoole.com/wiki/page/134.html

投递一个异步任务到task_worker池中。此函数是非阻塞的,执行完毕会立即返回。Worker进程可以继续处理新的请求。使用Task功能,必须先设置 task_worker_num,并且必须设置Server的onTask和onFinish事件回调函数。

参数注意:

1$task_id不等于workerStart中的worker_id,而是swoole维护的任务自增长id
2$from_id表示来自于哪个woker投递而来的任务,id等于worker_id值
3$serv->worker_id等于workerStart中的worker_id, 此处是task_woker的id
4$serv->worker_pid 此处是task_woker的进程pid

代码举例

server代码如下

 1<?php
 2
 3$key = ftok(__DIR__, 1); //默认值就是这个
 4$taskLog = [];
 5$serv = new swoole_server("127.0.0.1", 9501);
 6$serv->set(array(
 7    'worker_num'        => 2,
 8    'task_worker_num'   => 4,
 9    'task_ipc_mode'     => 2,
10    'message_queue_key' => $key,
11    //task_ipc_mode=1  Server程序终止后 ,数据会丢失
12    //2, 使用消息队列通信 非争抢模式,进程 指定消息队列Key后Server程序终止后,消息队列中的数据不会删除,因此进程重启后仍然能取到数据
13));
14//work精彩
15$serv->on('Receive', function (swoole_server $serv, $fd, $from_id, $data) {
16    echo "接收数据" . $data . "\n";
17    $data = trim($data);
18    //$data要投递的任务数据,必须是可序列化的PHP变量
19    //$dst_worker_id可以制定要给投递给哪个Task进程,传入ID即可,
20    //范围是0 - (serv->task_worker_num -1)
21    //用成功,返回值为整数$task_id,表示此任务的ID ,
22    $task_id = $serv->task($data, 0); //投递到task进程,指定第一个taskwork 进程
23    $serv->send($fd, "分发任务,任务id为$task_id\n");
24});
25$serv->on('Task', function (swoole_server $serv, $task_id, $from_id, $data) {
26    echo "Tasker进程接收到数据";
27    //$serv->worker_id=2 第一个task的work进行id ,task进程的work_id=2,3,4,5  ,  work 进程work_id =0,1
28    echo "#{$serv->worker_id}\tonTask: [PID={$serv->worker_pid}]: task_id=$task_id, data_len=" . strlen($data) . "." . PHP_EOL;
29  //  var_dump(posix_getpid());  //进程确实是发生了变化
30  //  var_dump($serv->worker_id); //task_work_id
31    echo "任务来自于:$from_id" . ",任务id为{$task_id}" . PHP_EOL;
32    try {
33    } catch (\Exception $e) {
34        //$server->sendMessage(); 失败 通过消息管道 重新投递给work进程,work进程再触发task
35    }
36    //模拟百分制50概率 消息失败
37//    if (mt_rand(1, 2) == 2) {
38        $serv->sendMessage($data, 0);
39//    } else {
40//        //echo "{$server->worker_id}任务来自于:$form_id".",任务id为{$task_id},数据:{$data}".PHP_EOL;
41//    }
42//    $serv->finish("执行完毕");
43//    sleep(10);
44//    $serv->finish("执行完毕");
45});
46$serv->on('Finish', function (swoole_server $serv, $task_id, $data) {
47    echo "Task#$task_id finished, data_len=" . strlen($data) . PHP_EOL;
48});
49$serv->on('workerStart', function ($serv, $worker_id) {
50    if ($serv->taskworker) {
51        echo 'this is task' . $serv->worker_id . ',work_id:' . $worker_id . PHP_EOL;
52    } else {
53        echo 'this is work' . $serv->worker_id . ',work_id:' . $worker_id . PHP_EOL;
54    }
55});
56$serv->on("pipeMessage", function ($serv, $task_id, $data) {
57    echo "#{$serv->worker_id} message from #$task_id: $data\n";
58    $key = md5($serv->worker_id . '_' . $data);
59    global $taskLog;
60    var_dump($taskLog);
61    if (!isset($taskLog[$key])) {
62        $taskLog[$key] = 1;
63    }
64    //var_dump($this->taskLog);
65    if ($taskLog[$key] < 3) { ///一共执行3次,重新投放2次
66        $num = $taskLog[$key];
67        //重新投入task进程
68        $num++;
69        $taskLog[$key] = $num;
70        echo '重新  task \n'; //
71        $serv->task($data,$serv->worker_id);
72    } else {
73        //var_dump('失败超过三次,结束投放,并记录日志文件');
74        echo "任务{$task_id}被终止:{$data}" . PHP_EOL;
75        $file = __DIR__ . '/task.log';
76        $content = '失败超过三次的记录,' . date('Y-m-d H:i:s') . "\r\nfail:worker_id:{$serv->worker_id} ,task_id:$task_id: $data\r\n";
77        file_put_contents($file, $content, FILE_APPEND);
78    }
79});
80$serv->start();

client代码如下

 1<?php
 2/**
 3 * Desc:
 4 * User: maozhongyu
 5 * Date: 19/2/19
 6 * Time: 下午11:18
 7 */
 8$client = new swoole_client(SWOOLE_SOCK_TCP);
 9//建立连接
10if (!$client->connect('127.0.0.1', 9501, -1)) {
11    exit("connect failed. Error: {$client->errCode}\n");
12}
13//想服务端发送数据
14$client->send("hello world\n");
15//接受服务端发过来的数据
16$reponse = $client->recv();
17echo $reponse;
18//关闭连接
19$client->close();

Swoole的多进程模块

老的文档写的全点 https://wiki.swoole.com/wiki/page/p-process.html

Swoole的多进程模块

Swoole是有自己的一个进程管理模块,用来替代PHP的pcntl扩展。

需要注意Process进程在系统是非常昂贵的资源,创建进程消耗很大。另外创建的进程过多会导致进程切换开销大幅上升。

为什么不使用pcntl

  • 没有提供进程间通信的功能
  • 不支持重定向标准输入和输出
  • 只提供了 fork 这样原始的接口,容易使用错误

swoole是怎么解决的

  • 可以方便的实现进程间通讯
  • 基于Unix Socketsysvmsg消息队列的进程间通信,只需调用write/read或者push/pop即可
  • 支持重定向标准输入和输出,在子进程内 echo 不会打印屏幕,而是写入管道,读键盘输入可以重定向为管道读取数据
  • 提供了 exec 接口,创建的进程可以执行其他程序,与原 PHP 父进程之间可以方便的通信
  • 在协程环境中无法使用 Process 模块,可以使用 runtime hook+proc_open 实现,参考协程进程管理

创建进程

1swoole_process::__construct(callable $function, bool $redirect_stdin_stdout = false,
2    int $pipe_type = SOCK_DGRAM, bool $enable_coroutine = false);
  • $function,子进程创建成功后要执行的函数,底层会自动将函数保存到对象的callback属性上。如果希望更改执行的函数,可赋值新的函数到对象的callback属性
  • $redirect_stdin_stdout,重定向子进程的标准输入和输出。启用此选项后,在子进程内输出内容将不是打印屏幕,而是写入到主进程管道。读取键盘输入将变为从管道中读取数据。默认为阻塞读取。
  • $pipe_type,管道类型,启用$redirect_stdin_stdout后,此选项将忽略用户参数,强制为1。如果子进程内没有进程间通信,可以设置为 0
  • $enable_coroutine,默认为false,在callback function中启用协程,开启后可以直接在子进程的函数中使用协程API

管道类型

  • 0:不创建管道
  • 1:创建SOCK_STREAM类型管道
  • 2:创建SOCK_DGRAM类型管道
  • 启用$redirect_stdin_stdout 后,此选项将忽略用户参数,强制为1

Process 对象在销毁时会自动关闭管道,子进程内如果监听了管道会收到CLOSE事件 使用Process作为监控父进程,创建管理子进程时,父类必须注册信号SIGCHLD对退出的进程执行wait,否则子进程退出时会变成僵尸进程

SIGCHLD   忽略信号 当子进程停止或退出时通知父进程

 1<?php
 2  
 3$process = new swoole_process('callback_function', true);
 4
 5$pid = $process->start();
 6
 7function callback_function(swoole_process $worker)
 8{
 9    $worker->exec('/usr/local/bin/php', array(__DIR__.'/swoole_server.php'));
10}
11
12swoole_process::wait();
 1<?php
 2  //开启10个子进程
 3$processNum=10;
 4for ($workPage = 1; $workPage <= $processNum; $workPage++) {
 5  $process = new swoole_process('callback_function', true);
 6  $pid = $process->start();
 7}
 8
 9function callback_function(swoole_process $worker)
10{
11    $worker->exec('/usr/local/bin/php', array(__DIR__.'/swoole_server.php'));
12}
13
14swoole_process::wait();

进程间的通讯

如果是非常简单的多进程执行任务,那么进程间就不需要通讯了,实际情况下,很多业务是需要通讯的,比如,发邮件,如果自进程发送失败了,那么是要通知主进程的等等

swoole_process进程间支持2种通信方式:

1、管道pipe

2、消息队列

管道通讯

半双工: 数据单向流动, 一端只读, 一端只写.

同步 vs 异步: 默认为同步阻塞模式, 可以使用 swoole_event_add() 添加管道到 swoole 的 event loop 中, 实现异步IO

管道有2个方法,分别来写入数据,和读取数据。

 1<?php
 2$worker=[];
 3for ($i=0;$i<3;$i++){
 4    $process=new Swoole\Process(function ($process){
 5        //子进程空间
 6//        while (true){
 7//            sleep(1);
 8//        }
 9        //var_dump("子进程:".$process->read());
10        sleep(1);
11        echo "子进程数据";
12        //$process->write('子进程数据');
13        echo posix_getpid().PHP_EOL;
14    },true,true);
15   $pid=$process->start();
16   $worker[$pid]=$process;  //把相应的进程对象
17   //$process->write("主进程数据");
18   // var_dump("主进程:".$process->read().PHP_EOL);
19    //异步监听管道当中的数据,读事件的监听,当管道可读的时候触发
20   swoole_event_add($process->pipe,function ($pipe) use($process){
21        var_dump("主进程:".$process->read().PHP_EOL);
22   });
23}
24//先让子进程执行完毕,然后再读取阻塞
25//foreach ($worker as $w){
26//   var_dump($w->read().PHP_EOL);
27//}

消息队列的通讯

消息队列:

  • 一系列保存在内核中的消息链表
  • 有一个 msgKey, 可以通过此访问不同的消息队列
  • 有数据大小限制, 默认 8192
  • 阻塞 vs 非阻塞: 阻塞模式下 pop()空消息队列/push()满消息队列会阻塞, 非阻塞模式可以直接返回

swoole 中使用消息队列:

  • 通信模式: 默认为争抢模式, 无法将消息投递给指定子进程
  • 新建消息队列后, 主进程就可以使用
  • 消息队列不可和管道一起使用, 也无法使用 swoole event loop