用php socket 实现网络服务器,逐步演进,升级模型。
php socket 实现单进程阻塞的网络服务器
说明:
1、创建一个socket,绑定服务器端口(bind),监听端口(listen),在PHP中用stream_socket_server一个函数就能完成上面3个步骤
2、进入while循环,阻塞在accept操作上,等待客户端连接进入。此时程序会进入睡眠状态,直到有新的客户端发起connect到服务器,操作系统会唤醒此进程。accept函数返回客户端连接的socket
3、利用fread读取客户端socket当中的数据收到数据后服务器程序进行处理然后使用fwrite向客户端发送响应。长连接的服务会持续与客户端交互,而短连接服务一般收到响应就会close。
缺点:
1、一次只能处理一个连接,不支持多个连接同时处理
代码实现
1<?php
2//多个客户端发起请求,观察服务器状态
3//ab -n 请求数 -c 并发数 -k 长连接
4//ab -n 100 -c 10 -k http://192.168.100.98:9800
5//阻塞等待一个请求等待1秒,
6class Worker{
7 //监听socket
8 protected $socket = NULL;
9 //连接事件回调
10 public $onConnect = NULL;
11 //接收消息事件回调
12 public $onMessage = NULL;
13 public function __construct($socket_address) {
14 //监听地址+端口
15 $this->socket=stream_socket_server($socket_address);
16 }
17 public function start() {
18 //创建多个子进程阻塞接收服务端socket
19 while (true){
20 $clientSocket=stream_socket_accept($this->socket); //阻塞监听
21 //触发事件的连接的回调
22 if(!empty($clientSocket) && is_callable($this->onConnect)){
23 call_user_func($this->onConnect,$clientSocket);
24 }
25 //从连接当中读取客户端的内容
26 $buffer=fread($clientSocket,65535);
27 //正常读取到数据,触发消息接收事件,响应内容
28 if(!empty($buffer) && is_callable($this->onMessage)){
29 call_user_func($this->onMessage,$clientSocket,$buffer);
30 }
31 fclose($clientSocket);
32 }
33 //连接建立成功触发事件
34 //call_user_func($this->onConnect,"参数");
35 }
36}
37$worker = new Worker('tcp://0.0.0.0:9800');
38//连接事件
39$worker->onConnect = function ($fd) {
40 echo '连接事件触发',(int)$fd,PHP_EOL;
41};
42//消息接收
43$worker->onMessage = function ($conn, $message) {
44 //事件回调当中写业务逻辑
45 //var_dump($conn,$message);
46 $content="我是mao";
47 $http_resonse = "HTTP/1.1 200 OK\r\n";
48 $http_resonse .= "Content-Type: text/html;charset=UTF-8\r\n";
49 $http_resonse .= "Connection: keep-alive\r\n"; //连接保持
50 $http_resonse .= "Server: php socket server\r\n";
51 $http_resonse .= "Content-length: ".strlen($content)."\r\n\r\n";
52 $http_resonse .= $content;
53 sleep(1);
54 fwrite($conn, $http_resonse);
55};
56$worker->start(); //启动
预派生子进程模式服务器
1、程序启动后就会创建N个进程。每个子进程进入Accept,等待新的连接进入。当客户端连接到服务器时,其中一个子进程会被唤醒,开始处理客户端请求,并且不再接受新的TCP连接。当此连接关闭时,子进程会释放,重新进入Accept,参与处理新的连接。
这个模型的优势是完全可以复用进程,不需要太多的上下文切换,比如php-fpm基于此模型的。
缺点:
1、这种模型严重依赖进程的数量解决并发问题,一个客户端连接就需要占用一个进程,工作进程的数量有多少,并发处理能力就有多少。操作系统可以创建的进程数量是有限的。
例如:即时聊天程序,一台服务器可能要维持数十万的连接,那么就要启动数十万的进程来维持。这显然不可能
基于上面的模式我们发现我们只能通过每次(accept)处理单个请求,没办法一次性处理多个请求
代码实现
1<?php
2class Worker{
3 //监听socket
4 protected $socket = NULL;
5 //连接事件回调
6 public $onConnect = NULL;
7 //接收消息事件回调
8 public $onMessage = NULL;
9 public $workerNum=4; //子进程个数
10 public function __construct($socket_address) {
11 //监听地址+端口
12 $this->socket=stream_socket_server($socket_address);
13 }
14 public function start() {
15 //获取配置文件
16 $this->fork(); //用来创建多个助教老师,创建多个子进程负责接收请求的
17 }
18 public function fork(){
19 $pids=[];
20 for ($i=0;$i<$this->workerNum;$i++){
21 $pid=pcntl_fork(); //创建成功会返回子进程id
22 if($pid<0){
23 exit('创建失败');
24 }else if($pid>0){
25 //父进程空间,返回子进程id
26 $pids[$i] = $pid;
27 }else{ //返回为0子进程空间
28 $this->accept();//子进程负责接收客户端请求
29 exit; //必须加
30 }
31 }
32 while (count($pids) > 0) {
33 foreach ($pids as $key => $pid) {
34 //等待子进程中断,防止子进程成为僵尸进程。
35 // pcntl_wait($status);
36 $res = pcntl_waitpid($pid, $status, WNOHANG);
37 // If the process has already exited
38 if ($res == -1 || $res > 0) {
39 unset($pids[$key]);
40 }
41 }
42 }
43 }
44 public function accept(){
45 //创建多个子进程阻塞接收服务端socket
46 while (true){
47 $clientSocket=stream_socket_accept($this->socket); //阻塞监听
48 var_dump(posix_getpid());
49 //触发事件的连接的回调
50 if(!empty($clientSocket) && is_callable($this->onConnect)){
51 call_user_func($this->onConnect,$clientSocket);
52 }
53 //从连接当中读取客户端的内容
54 $buffer=fread($clientSocket,65535);
55 //正常读取到数据,触发消息接收事件,响应内容
56 if(!empty($buffer) && is_callable($this->onMessage)){
57 call_user_func($this->onMessage,$clientSocket,$buffer);
58 }
59 fclose($clientSocket); //必须关闭,子进程不会释放不会成功拿下进入accpet
60 }
61 }
62}
63$worker = new Worker('tcp://0.0.0.0:9800');
64//连接事件
65$worker->onConnect = function ($fd) {
66 //echo '连接事件触发',(int)$fd,PHP_EOL;
67};
68//消息接收
69$worker->onMessage = function ($conn, $message) {
70 //事件回调当中写业务逻辑
71 //var_dump($conn,$message);
72 $content="我是mao";
73 $http_resonse = "HTTP/1.1 200 OK\r\n";
74 $http_resonse .= "Content-Type: text/html;charset=UTF-8\r\n";
75 $http_resonse .= "Connection: keep-alive\r\n"; //连接保持
76 $http_resonse .= "Server: php socket server\r\n";
77 $http_resonse .= "Content-length: ".strlen($content)."\r\n\r\n";
78 $http_resonse .= $content;
79 fwrite($conn, $http_resonse);
80};
81$worker->start(); //启动
单进程阻塞复用的网络服务器(select)
服务监听流程如上
1、保存所有的socket,通过select系统调用,监听socket描述符的可读事件
2、Select会在内核空间监听一旦发现socket可读,会从内核空间传递至用户空间,在用户空间通过逻辑判断是服务端socket可读,还是客户端的socket可读
3、如果是服务端的socket可读,说明有新的客户端建立,将socket保留到监听数组当中
4、如果是客户端的socket可读,说明当前已经可以去读取客户端发送过来的内容了,读取内容,然后响应给客户端。
缺点:
1、select模式本身的缺点(1、循环遍历处理事件、2、内核空间传递数据的消耗)
2、单进程对于大量任务处理乏力
代码实现
1<?php
2class Worker{
3 //监听socket
4 protected $socket = NULL;
5 //连接事件回调
6 public $onConnect = NULL;
7 //接收消息事件回调
8 public $onMessage = NULL;
9 public $workerNum=4; //子进程个数
10 public $allSocket; //存放所有socket
11 public function __construct($socket_address) {
12 //监听地址+端口
13 $this->socket=stream_socket_server($socket_address);
14 stream_set_blocking($this->socket,0); //设置非阻塞
15 $this->allSocket[(int)$this->socket]=$this->socket;
16 }
17 public function start() {
18 //获取配置文件
19 $this->fork();
20 }
21 public function fork(){
22 $this->accept();//子进程负责接收客户端请求
23 }
24 public function accept(){
25 //进程阻塞接收服务端socket
26 while (true){
27 $write=$except=[];
28 //需要监听socket
29 $read=$this->allSocket;
30 //状态谁改变,传指 ,改变过的 & 传指
31 stream_select($read,$write,$except,60);//会阻塞
32 //怎么区分服务端跟客户端
33 foreach ($read as $index=>$val){
34 //当前发生改变的是服务端,有连接进入
35 if($val === $this->socket){
36 $clientSocket=stream_socket_accept($this->socket); //阻塞监听
37 //触发事件的连接的回调
38 if(!empty($clientSocket) && is_callable($this->onConnect)){
39 call_user_func($this->onConnect,$clientSocket);
40 }
41 $this->allSocket[(int)$clientSocket]=$clientSocket;
42 }else{
43 //从连接当中读取客户端的内容
44 $buffer=fread($val,1024);
45 //如果数据为空,或者为false,不是资源类型
46 if(empty($buffer)){
47 if(feof($val) || !is_resource($val)){
48 //触发关闭事件
49 fclose($val);
50 unset($this->allSocket[(int)$val]);
51 continue;
52 }
53 }
54 //正常读取到数据,触发消息接收事件,响应内容
55 if(!empty($buffer) && is_callable($this->onMessage)){
56 call_user_func($this->onMessage,$val,$buffer);
57 }
58 }
59 }
60 }
61 }
62}
63$worker = new Worker('tcp://0.0.0.0:9805');
64//连接事件
65$worker->onConnect = function ($fd) {
66 //echo '连接事件触发',(int)$fd,PHP_EOL;
67};
68//消息接收
69$worker->onMessage = function ($conn, $message) {
70 //事件回调当中写业务逻辑
71 //var_dump($conn,$message);
72 $content="我是cr-mao";
73 $http_resonse = "HTTP/1.1 200 OK\r\n";
74 $http_resonse .= "Content-Type: text/html;charset=UTF-8\r\n";
75 $http_resonse .= "Connection: keep-alive\r\n"; //连接保持
76 $http_resonse .= "Server: php socket server\r\n";
77 $http_resonse .= "Content-length: ".strlen($content)."\r\n\r\n";
78 $http_resonse .= $content;
79 fwrite($conn, $http_resonse);
80};
81$worker->start(); //启动
event服务器实现
IO复用/EventLoop
1、IO复用是什么?
IO多路复用是指内核一旦发现进程指定的一个或者多个IO条件准备读取,它就通知该进程,目前支持I/O多路复用有 select,poll,epoll,I/O多路复用就是通过一种机制,一个进程可以监视多个描述符,一旦某个描述符就绪(一般是读就绪或者写就绪),能够通知程序进行相应的读写操作。
2、Select跟poll
Select介绍:
监视并等待多个文件描述符的属性变化(可读、可写或错误异常)。select函数监视的文件描述符分 3 类,分别是writefds、readfds、和 exceptfds。调用后 select会阻塞,直到有描述符就绪(有数据可读、可写、或者有错误异常),或者超时( timeout 指定等待时间),函数才返回。当 select()函数返回后,可以通过遍历 fdset,来找到就绪的描述符,并且描述符最大不能超过1024
poll 介绍:
poll的机制与select类似,与select在本质上没有多大差别,管理多个描述符也是进行轮询,根据描述符的状态进行处理,但是poll没有最大文件描述符数量的限制。poll和select同样存在一个缺点就是,包含大量文件描述符的数组被整体复制于用户态和内核的地址空间之间,而不论这些文件描述符是否就绪,它的开销随着文件描述符数量的增加而线性增大。
问题:
select/poll问题很明显,它们需要循环检测连接是否有事件。如果服务器有上百万个连接,在某一时间只有一个连接向服务器发送了数据,select/poll需要做循环100万次,其中只有1次是命中的,剩下的99万9999次都是无效的,白白浪费了CPU资源。
epoll:
epoll是在2.6内核中提出的,是之前的select和poll的增强版本。相对于select和poll来说,epoll更加灵活,没有描述符限制,无需轮询。epoll使用一个文件描述符管理多个描述符,将用户关系的文件描述符的事件存放到内核的一个事件表中。
简单点来说就是当连接有I/O流事件产生的时候,epoll就会去告诉进程哪个连接有I/O流事件产生,然后进程就去处理这个事件。
高效的事件处理模式Reactor 模式
Reactor模型,Reactor顾名思义就是反应堆的意思,它本身不处理任何数据收发。只是可以监视一个socket句柄的事件变化。 1) 主进程/线程往epoll内核亊件中注册socket上的读就绪亊件。
2) 主进程/线程调用epoll_wait等待socket上有数据可读。
3) 当socket上有数据可读时,epoll_wait通知主进程/线程。主进程/线程则将socket可读事件放人请求队列。
4) 睡眠在请求队列上的某个工作线程被唤醒,它从socket读取数据,并处理客户请求, 然后往epoll内核事件表中注册该socket上的写就绪事件。
5) 主线程调用epoll_wait等待socket可写。
6) 当socket可写时,epoll_wait通知主进程/线程将socket可写亊件放人清求队列。
7) 睡眠在请求队列上的某个工作线程被唤醒,它往socket上写人服务器处理客户淸求
代码实现
实现的是属于一种变种,是类似于nginx采用的Reactor多进程的模式,具体差异表现为主进程中仅仅创建了监听,并没有创建 mainReactor 来“accept”连接,而是由子进程的 Reactor 来“accept”连接,通过负载均衡,一次只有一个子进程进行“accept”,子进程“accept”新连接后就放到自己的 Reactor中进行处理,不会再分配给其他子进程
1<?php
2class Worker{
3 //监听socket
4 protected $socket = NULL;
5 //连接事件回调
6 public $onConnect = NULL;
7 //接收消息事件回调
8 public $onMessage = NULL;
9 public $workerNum=4; //子进程个数
10 public $allSocket; //存放所有socket
11 public function __construct($socket_address) {
12 //监听地址+端口
13 $this->socket=stream_socket_server($socket_address);
14 }
15 public function start() {
16 //获取配置文件
17 $this->fork();
18 }
19 public function fork(){
20 $this->accept();//子进程负责接收客户端请求
21 }
22 public function accept(){
23 //第一个需要监听的事件(服务端socket的事件),一旦监听到可读事件之后会触发
24 swoole_event_add($this->socket,function ($fd){
25 $clientSocket=stream_socket_accept($fd);
26 //触发事件的连接的回调
27 if(!empty($clientSocket) && is_callable($this->onConnect)){
28 call_user_func($this->onConnect,$clientSocket);
29 }
30 //监听客户端可读
31 swoole_event_add($clientSocket,function ($fd){
32 //从连接当中读取客户端的内容
33 $buffer=fread($fd,1024);
34 //如果数据为空,或者为false,不是资源类型
35 if(empty($buffer)){
36 if(!is_resource($fd) || feof($fd)){
37 //触发关闭事件
38 fclose($fd);
39 }
40 }
41 //正常读取到数据,触发消息接收事件,响应内容
42 if(!empty($buffer) && is_callable($this->onMessage)){
43 call_user_func($this->onMessage,$fd,$buffer);
44 }
45 });
46 });
47 echo "非阻塞";
48 }
49}
50$worker = new Worker('tcp://0.0.0.0:9805');
51//连接事件
52$worker->onConnect = function ($fd) {
53 //echo '连接事件触发',(int)$fd,PHP_EOL;
54};
55//消息接收
56$worker->onMessage = function ($conn, $message) {
57 //事件回调当中写业务逻辑
58 //var_dump($conn,$message);
59 $content="我是crmao";
60 $http_resonse = "HTTP/1.1 200 OK\r\n";
61 $http_resonse .= "Content-Type: text/html;charset=UTF-8\r\n";
62 $http_resonse .= "Connection: keep-alive\r\n"; //连接保持
63 $http_resonse .= "Server: php socket server\r\n";
64 $http_resonse .= "Content-length: ".strlen($content)."\r\n\r\n";
65 $http_resonse .= $content;
66 fwrite($conn, $http_resonse);
67};
68$worker->start(); //启动
多进程网络服务器
代码实现
1<?php
2class Worker{
3 //监听socket
4 protected $socket = NULL;
5 //连接事件回调
6 public $onConnect = NULL;
7 public $reusePort=1;
8 //接收消息事件回调
9 public $onMessage = NULL;
10 public $workerNum=3; //子进程个数
11 public $allSocket; //存放所有socket
12 public $addr;
13 public function __construct($socket_address) {
14 //监听地址+端口
15 $this->addr=$socket_address;
16 }
17 public function start() {
18 //获取配置文件
19 $this->fork();
20 }
21 public function fork(){
22 for ($i=0;$i<$this->workerNum;$i++){
23 $pid=pcntl_fork(); //创建成功会返回子进程id
24 if($pid<0){
25 exit('创建失败');
26 }else if($pid>0){
27 //父进程空间,返回子进程id
28 }else{ //返回为0子进程空间
29 $this->accept();//子进程负责接收客户端请求
30 exit;
31 }
32 }
33 //放在父进程空间,结束的子进程信息,阻塞状态
34 $status=0;
35 for ($i=0;$i<$this->workerNum;$i++) {
36 $pid = pcntl_wait($status);
37 }
38 }
39 public function accept(){
40 $opts = array(
41 'socket' => array(
42 'backlog' =>10240, //成功建立socket连接的等待个数
43 ),
44 );
45 $context = stream_context_create($opts);
46 //开启多端口监听,并且实现负载均衡
47 stream_context_set_option($context,'socket','so_reuseport',1);//端口重用
48 stream_context_set_option($context,'socket','so_reuseaddr',1);//地址重用
49 $this->socket=stream_socket_server($this->addr,$errno,$errstr,STREAM_SERVER_BIND|STREAM_SERVER_LISTEN,$context);
50 //第一个需要监听的事件(服务端socket的事件),一旦监听到可读事件之后会触发
51 swoole_event_add($this->socket,function ($fd){
52 $clientSocket=stream_socket_accept($fd);
53 //触发事件的连接的回调
54 if(!empty($clientSocket) && is_callable($this->onConnect)){
55 call_user_func($this->onConnect,$clientSocket);
56 }
57 //监听客户端可读
58 swoole_event_add($clientSocket,function ($fd){
59 //从连接当中读取客户端的内容
60 $buffer=fread($fd,1024);
61 //如果数据为空,或者为false,不是资源类型
62 if(empty($buffer)){
63 if(!is_resource($fd) || feof($fd) ){
64 //触发关闭事件
65 fclose($fd);
66 }
67 }
68 //正常读取到数据,触发消息接收事件,响应内容
69 if(!empty($buffer) && is_callable($this->onMessage)){
70 call_user_func($this->onMessage,$fd,$buffer);
71 }
72 });
73 });
74 }
75}
76$worker = new Worker('tcp://0.0.0.0:9810');
77//开启多进程的端口监听
78$worker->reusePort = true;
79//连接事件
80$worker->onConnect = function ($fd) {
81 //echo '连接事件触发',(int)$fd,PHP_EOL;
82};
83//消息接收
84$worker->onMessage = function ($conn, $message) {
85 //事件回调当中写业务逻辑
86 // $a=include 'index.php';
87 // var_dump($a);
88 //var_dump($conn,$message);
89 $content="我是cr-mao";
90 $http_resonse = "HTTP/1.1 200 OK\r\n";
91 $http_resonse .= "Content-Type: text/html;charset=UTF-8\r\n";
92 $http_resonse .= "Connection: keep-alive\r\n"; //连接保持
93 $http_resonse .= "Server: php socket server\r\n";
94 $http_resonse .= "Content-length: ".strlen($content)."\r\n\r\n";
95 $http_resonse .= $content;
96 fwrite($conn, $http_resonse);
97};
98$worker->start(); //启动
信号处理,inotify实现热重启
1<?php
2class Worker{
3 //监听socket
4 protected $socket = NULL;
5 //连接事件回调
6 public $onConnect = NULL;
7 public $reusePort=1;
8 //接收消息事件回调
9 public $onMessage = NULL;
10 public $workerNum=3; //子进程个数
11 public $allSocket; //存放所有socket
12 public $addr;
13 protected $worker_pid; //子进程pid
14 protected $master_pid;//主进程id
15 protected $watch_fd;//文件监视的句柄
16 public function __construct($socket_address) {
17 //监听地址+端口
18 $this->addr=$socket_address;
19 $this->master_pid=posix_getpid();
20 }
21 public function start() {
22 //获取配置文件
23 $this->watch();
24 $this->fork($this->workerNum);
25 $this->monitorWorkers(); //监视程序,捕获信号,监视worker进程
26 }
27 /**
28 * 文件监视,自动重启
29 */
30 protected function watch(){
31 $this->watch_fd=inotify_init(); //初始化
32 $files=get_included_files();
33 foreach ($files as $file){
34 inotify_add_watch($this->watch_fd,$file,IN_MODIFY); //监视相关的文件
35 }
36 //监听
37 swoole_event_add($this->watch_fd,function ($fd){
38 $events=inotify_read($fd);
39 if(!empty($events)){
40 posix_kill($this->master_pid,SIGUSR1);
41 }
42 });
43 }
44 /**
45 * 捕获信号
46 * 监视worker进程.拉起进程
47 */
48 public function monitorWorkers (){
49 //注册信号事件回调,是不会自动执行的
50 // reload
51 pcntl_signal(SIGUSR1, array($this, 'signalHandler'),false); //重启woker进程信号
52 //ctrl+c
53 pcntl_signal(SIGINT, array($this, 'signalHandler'),false); //重启woker进程信号
54 //
55 $status=0;
56 while (1){
57 // 当发现信号队列,一旦发现有信号就会触发进程绑定事件回调
58 pcntl_signal_dispatch();
59 $pid = pcntl_wait($status); //当信号到达之后就会被中断
60 //会去查询我们的子进程id
61 $index=array_search($pid,$this->worker_pid);
62 //如果进程不是正常情况下的退出,重启子进程,我想要维持子进程个数
63 if($pid>1 && $pid != $this->master_pid && $index!=false && !pcntl_wifexited($status)){
64 $index=array_search($pid,$this->worker_pid);
65 $this->fork(1);
66 var_dump('拉起子进程');
67 unset($this->worker_pid[$index]);
68 }
69 pcntl_signal_dispatch();
70 //进程重启的过程当中会有新的信号过来,如果没有调用pcntl_signal_dispatch,信号不会被处理
71 }
72 }
73 public function signalHandler($sigo){
74 switch ($sigo){
75 case SIGUSR1:
76 $this->reload();
77 echo "收到重启信号";
78 break;
79 case SIGINT:
80 $this->stopAll();
81 echo "按下ctrl+c,关闭所有进程";
82 swoole_event_del($this->watch_fd);
83 exit();
84 break;
85 }
86 }
87 public function fork($worker_num){
88 for ($i=0;$i<$worker_num;$i++){
89 $test=include 'index.php';
90 var_dump($test);
91 $pid=pcntl_fork(); //创建成功会返回子进程id
92 if($pid<0){
93 exit('创建失败');
94 }else if($pid>0){
95 //父进程空间,返回子进程id
96 $this->worker_pid[]=$pid;
97 }else{ //返回为0子进程空间
98 $this->accept();//子进程负责接收客户端请求
99 exit;
100 }
101 }
102 //放在父进程空间,结束的子进程信息,阻塞状态
103 }
104 public function accept(){
105 $opts = array(
106 'socket' => array(
107 'backlog' =>10240, //成功建立socket连接的等待个数
108 ),
109 );
110 $context = stream_context_create($opts);
111 //开启多端口监听,并且实现负载均衡
112 stream_context_set_option($context,'socket','so_reuseport',1);
113 stream_context_set_option($context,'socket','so_reuseaddr',1);
114 $this->socket=stream_socket_server($this->addr,$errno,$errstr,STREAM_SERVER_BIND|STREAM_SERVER_LISTEN,$context);
115 //第一个需要监听的事件(服务端socket的事件),一旦监听到可读事件之后会触发
116 swoole_event_add($this->socket,function ($fd){
117 $clientSocket=stream_socket_accept($fd);
118 //触发事件的连接的回调
119 if(!empty($clientSocket) && is_callable($this->onConnect)){
120 call_user_func($this->onConnect,$clientSocket);
121 }
122 //监听客户端可读
123 swoole_event_add($clientSocket,function ($fd){
124 //从连接当中读取客户端的内容
125 $buffer=fread($fd,1024);
126 //如果数据为空,或者为false,不是资源类型
127 if(empty($buffer)){
128 if(!is_resource($fd) || feof($fd) ){
129 //触发关闭事件
130 fclose($fd);
131 }
132 }
133 //正常读取到数据,触发消息接收事件,响应内容
134 if(!empty($buffer) && is_callable($this->onMessage)){
135 call_user_func($this->onMessage,$fd,$buffer);
136 }
137 });
138 });
139 }
140 /**
141 * 重启worker进程
142 */
143 public function reload(){
144 foreach ($this->worker_pid as $index=>$pid){
145 posix_kill($pid,SIGKILL); //结束进程
146 var_dump("杀掉的子进程",$pid);
147 unset($this->worker_pid[$index]); //删除之前的pid了所以worker_pid当中没有记录了
148 $this->fork(1); //重新拉起worker
149 }
150 }
151 //捕获信号之后重启worker进程
152 public function stopAll(){
153 foreach ($this->worker_pid as $index=>$pid){
154 posix_kill($pid,SIGKILL); //结束进程
155 unset($this->worker_pid[$index]);
156 }
157 }
158}
159//ps -ef | grep php | grep -v grep | awk '{print $2}' | xargs kill -s 9
160$worker = new Worker('tcp://0.0.0.0:9800');
161//开启多进程的端口监听
162$worker->reusePort = true;
163//连接事件
164$worker->onConnect = function ($fd) {
165 //echo '连接事件触发',(int)$fd,PHP_EOL;
166};
167$worker->onTask = function ($fd) {
168 //echo '连接事件触发',(int)$fd,PHP_EOL;
169};
170//消息接收
171$worker->onMessage = function ($conn, $message) {
172 //事件回调当中写业务逻辑
173 // $a=include 'index.php';
174 // var_dump($a);
175 //$server->task(); //在worker进程当中能够向task进程发送消息
176 //var_dump($conn,$message);
177 $content="我是cr-mao";
178 $http_resonse = "HTTP/1.1 200 OK\r\n";
179 $http_resonse .= "Content-Type: text/html;charset=UTF-8\r\n";
180 $http_resonse .= "Connection: keep-alive\r\n"; //连接保持
181 $http_resonse .= "Server: php socket server\r\n";
182 $http_resonse .= "Content-length: ".strlen($content)."\r\n\r\n";
183 $http_resonse .= $content;
184 fwrite($conn, $http_resonse);
185};
186$worker->start(); //启动