初识swoole

分享: 胡支昊

目录

  • 使用背景
  • swoole的简介
  • 球球大作战demo
  • 如何使用swoole
  • 直播项目抓取中的应用

使用背景

直播聚合项目面临的问题

  • 项目使用多家直播站公用数据源,无法定制xml
  • 展示实时数据,需要抓取频率较高
  • 每次全量更新入库,可能需要多个worker
  • 任务按粒度拆分后如何统计汇总

使用背景

为什么选用swoole

  • swoole是纯C编写的PHP扩展,运行效率高(可以忽略)
  • 支持多进程(子进程)的工作方式,比原生的pcntl族函数更方便
  • 采用Reactor模型设计,对实现任务分发支持好
  • 支持同步和异步的I/O,某个任务阻塞或失败不会拖垮整个抓取业务
  • 支持热重启
  • 支持HTTP Server,可以通过外部访问操控daemon进程,比如让其热重启

swoole简介

官方的介绍

Swoole:重新定义PHP

imgPHP的异步、并行、高性能网络通信引擎,使用纯C语言编写,提供了PHP语言的异步多线程服务器,异步TCP/UDP网络客户端,异步MySQL,异步Redis,数据库连接池,AsyncTask,消息队列,毫秒定时器,异步文件读写,异步DNS查询。 Swoole内置了Http/WebSocket服务器端/客户端、Http2.0服务器端。Swoole可以广泛应用于互联网、移动通信、企业软件、云计算、网络游戏、物联网(IOT)、车联网、智能家居等领域。 使用PHP+Swoole作为网络通信框架,可以使企业IT研发团队的效率大大提升,更加专注于开发创新产品。

重点特性

  • 纯C语言编写,是一个扩展
  • swoole_server: 强大的TCP/UDP Server框架,多线程,EventLoop,事件驱动,异步,Worker进程组,Task异步任务,毫秒定时器,SSL/TLS隧道加密。协议上还支持HTTP和websocket
  • swoole_client: TCP/UDP客户端,支持同步并发调用,也支持异步事件驱动
  • swoole_event: EventLoop API,让用户可以直接操作底层的事件循环,将socket,stream,管道等Linux文件加入到事件循环中
  • swoole_async: 异步IO接口,提供了 异步文件系统IO,异步DNS查询,异步MySQL等API。支持异步毫秒定时器和异步文件操作接口
  • swoole_process: 进程管理模块,可以方便的创建子进程,进程间通信,进程管理。

Agar.io复刻版

使用swoole的WebSocketServer编写的后端程序

谁说PHP写不了游戏后台

如何使用swoole

swoole进程模型

  • Swoole是一个多进程模式的框架(可以类比Nginx的进程模型),当启动一个Swoole应用时,一共会创建2 + n + m + k个进程,其中n为Worker进程数,m为TaskWorker进程数,2为一个Master进程和一个Manager进程,k为自定义进程数。进程间关系如下图。

swoole进程模型

  • 其中,Master进程为主进程,该进程会创建Manager进程、Reactor线程等工作进/线程。
  • Reactor线程实际运行epoll实例,用于accept客户端连接以及接收客户端数据;
  • Manager进程为管理进程,该进程的作用是创建、管理所有的Worker进程和TaskWorker进程。

Reactor、Worker、Task的关系

Reactor线程
  • 负责维护客户端机器的TCP连接、处理网络IO、收发数据
  • 完全是异步非阻塞的模式
  • 全部为C代码,除Start/Shudown事件回调外,不执行任何PHP代码
  • 将TCP客户端发来的数据缓冲、拼接、拆分成完整的一个请求数据包
  • Reactor以多线程的方式运行
Worker进程
  • Worker进程作为Swoole的工作进程,所有的业务逻辑代码均在此进程上运行
  • 接受由Reactor线程投递的请求数据包,并执行PHP回调函数处理数据
  • 生成响应数据并发给Reactor线程,由Reactor线程发送给TCP客户端
  • 可以是异步非阻塞模式,也可以是同步阻塞模式
  • Worker以多进程的方式运行
Task进程
  • Task Worker是Swoole中一种特殊的工作进程,该进程的作用是处理一些耗时较长的任务,以达到释放Worker进程的目的
  • 接受由Worker进程通过swoole_server->task/taskwait方法投递的任务
  • 处理任务,并将结果数据返回给Worker进程
  • 完全是同步阻塞模式
  • Task以多进程的方式运行

Reactor、Worker、Task的关系

  • 可以理解为reactor就是nginx,worker就是php-fpm。reactor线程异步并行地处理网络请求,然后再转发给worker进程中去处理。reactor和worker间通过IPC方式通信。

  • swoole的reactor,worker,task_worker之间可以紧密的结合起来,提供更高级的使用方式。

  • 一个更通俗的比喻,假设Server就是一个工厂,那reactor就是销售,帮你接项目订单。而worker就是工人,当销售接到订单后,worker去工作生产出客户要的东西。而task_worker可以理解为行政人员,可以帮助worker干些杂事,让worker专心工作。

HttpServer

$serv = new Swoole\Http\Server("127.0.0.1", 9502);

$serv->on('Request', function($request, $response) {
    var_dump($request->get);
    var_dump($request->post);
    var_dump($request->cookie);
    var_dump($request->files);
    var_dump($request->header);
    var_dump($request->server);

    $response->cookie("User", "Swoole");
    $response->header("X-Server", "Swoole");
    $response->end("<h1>Hello Swoole!</h1>");
});

$serv->start();

WebSocket Server

$serv = new Swoole\Websocket\Server("127.0.0.1", 9502);

$serv->on('Open', function($server, $req) {
    echo "connection open: ".$req->fd;
});

$serv->on('Message', function($server, $frame) {
    echo "message: ".$frame->data;
    $server->push($frame->fd, json_encode(["hello", "world"]));
});

$serv->on('Close', function($server, $fd) {
    echo "connection close: ".$fd;
});

$serv->start();

TCP Server

$serv = new Swoole\Server("127.0.0.1", 9501);
$serv->set(array(
    'worker_num' => 8,   //工作进程数量
    'daemonize' => true, //是否作为守护进程
));
$serv->on('connect', function ($serv, $fd){
    echo "Client:Connect.\n";
});
$serv->on('receive', function ($serv, $fd, $from_id, $data) {
    $serv->send($fd, 'Swoole: '.$data);
    $serv->close($fd);
});
$serv->on('close', function ($serv, $fd) {
    echo "Client: Close.\n";
});
$serv->start();

TCP Client

$client = new Swoole\Client(SWOOLE_SOCK_TCP, SWOOLE_SOCK_ASYNC);
//设置事件回调函数
$client->on("connect", function($cli) {
    $cli->send("hello world\n");
});
$client->on("receive", function($cli, $data){
    echo "Received: ".$data."\n";
});
$client->on("error", function($cli){
    echo "Connect failed\n";
});
$client->on("close", function($cli){
    echo "Connection close\n";
});
//发起网络连接
$client->connect('127.0.0.1', 9501, 0.5);

异步MySQL

$db = new Swoole\MySQL;
$server = array(
    'host' => '127.0.0.1',
    'user' => 'test',
    'password' => 'test',
    'database' => 'test',
);

$db->connect($server, function ($db, $result) {
    $db->query("show tables", function (Swoole\MySQL $db, $result) {
        if ($r === false) {
            var_dump($db->error, $db->errno);
        } elseif ($r === true) {
            var_dump($db->affected_rows, $db->insert_id);
        } else {
            var_dump($$result);
            $db->close();
        }
    });
});

异步Redis/异步Http客户端

$redis = new Swoole\Redis;
$redis->connect('127.0.0.1', 6379, function ($redis, $result) {
    $redis->set('test_key', 'value', function ($redis, $result) {
        $redis->get('test_key', function ($redis, $result) {
            var_dump($result);
        });
    });
});

$cli = new Swoole\Http\Client('127.0.0.1', 80);
$cli->setHeaders(array('User-Agent' => 'swoole-http-client'));
$cli->setCookies(array('test' => 'value'));

$cli->post('/dump.php', array("test" => 'abc'), function ($cli) {
    var_dump($cli->body);
    $cli->get('/index.php', function ($cli) {
        var_dump($cli->cookies);
        var_dump($cli->headers);
    });
});

Async-IO

$fp = stream_socket_client("tcp://127.0.0.1:80", $code, $msg, 3);
$http_request = "GET /index.html HTTP/1.1\r\n\r\n";
fwrite($fp, $http_request);
Swoole\Event::add($fp, function($fp){
    echo fread($fp, 8192);
    //socket处理完成后,从epoll事件中移除socket
    //如果不移除 可以继续读取
    swoole_event_del($fp);
    fclose($fp);
});
Swoole\Timer::after(2000, function() {
    echo "2000ms timeout\n";
});
Swoole\Timer::tick(1000, function() {
    echo "1000ms interval\n";
});

异步任务

$serv = new Swoole\Server("127.0.0.1", 9502);
$serv->set(array('task_worker_num' => 4));
$serv->on('Receive', function($serv, $fd, $from_id, $data) {
     //使用task方法将任务异步的丢进task进程中,也可以用taskWait同步的方式
    $task_id = $serv->task("Async");
    echo "Dispath AsyncTask: id=$task_id\n";
});
$serv->on('Task', function ($serv, $task_id, $from_id, $data) {
    echo "New AsyncTask[id=$task_id]".PHP_EOL;
    //$serv->finish("$data -> OK");
    return "$data -> OK";
});
$serv->on('Finish', function ($serv, $task_id, $data) {
    echo "AsyncTask[$task_id] Finish: $data".PHP_EOL;
});
$serv->start();

自定义子进程(1)

//普通的子进程
$workers = [];
$worker_num = 3;//创建的进程数

for($i=0;$i<$worker_num ; $i++){
    $process = new swoole_process('process');
    $pid = $process->start();
    $workers[$pid] = $process;
}
foreach($workers as $process){
    //子进程也会包含此事件
    swoole_event_add($process->pipe, function ($pipe) use($process){
         $data = $process->read();
        echo "RECV: " . $data.PHP_EOL;
    });
}
function process(swoole_process $process){// 第一个处理
    $process->write($process->pid);
    echo $process->pid,"\t",$process->callback .PHP_EOL;
}

自定义子进程(2)

//在server中自定义子进程
$server = new swoole_server('127.0.0.1', 9501);
$process = new swoole_process(function($process) use ($server) {
    while (true) {
        $msg = $process->read();
        foreach($server->connections as $conn) {
            $server->send($conn, $msg);
        }
    }
});
$server->addProcess($process);
$server->on('receive', function ($serv, $fd, $from_id, $data) use ($process) {
    //群发收到的消息
    $process->write($data);
});
$server->start();

异步毫秒定时器

//原生pcntl_alarm基于时钟信号和PHP tick只支持到秒,swool支持到毫秒
swoole_timer_tick(1000, function(){
    echo "timeout\n";
});

一般情况下程序的结构

  • 接受tcp/udp/http/websocket请求,由master进程/reactor线程帮我们实现,不用关心
  • 请求会分发给多个worker进程中的一个进行处理,分配策略可以配置
  • worker进程一般是非阻塞的(也可以阻塞),可以在返回给client信息后,将非主流程操作交给异步taskworker
  • taskworker进程负责一些费时费力的操作,是同步阻塞的
  • worker进程和taskworker进程之间都可以方便的互相通信

运行流程图

进程/线程结构图

直播聚合抓取中的应用

抓取进程结构图

谢谢!