Swoole入门教程(三):客户端的应用

2023-10-10 484 阅读 0评论

在前面的教程里面,我已经学习了Swoole的Server端的应用,本篇学习Swoole在Client端的应用。

在Swoole出现之前,PHP主要用socket来创建TCP/UDP Client,用CURL或者Guzzle类库来创建HTTP Client,
众所周知,php连接mysql一般都是直接用php 的PDO扩展组件来实现客户端的,本篇通过手写Mysql登陆过程来体验socket客户端的开发过程

Mysql协议分析

我们知道,TCP/Ip套接字方式是Mysql数据库在所有平台下都提供的通讯方式,也是使用最多的一种方式,要使用TCP/Ip来连接Mysql,我们首先得知道Mysql的通讯协议。

图片Mysql通讯过程

以上图片显示了mysql的通讯流程,具体流程分析请看MySQL协议分析,这篇博文详细分析了Mysql的数据结构,消息头和消息体结构如下

图片mysql报文

登陆认证的消息体报文如下:

图片握手报文


MySQL客户端与服务器的完整交互过程如下

图片Mysql通讯流程


在分析完报文结构和交互流程后,我们就可以开始着手写代码了。

用原生PHP来实现Mysql客户端

php进行socket编译主要使用socket_create等socket[1]函数和位运算。实现登陆用到了2个数据结构如下:

图片服务器握手包


图片客户端登陆包


具体实现代码如下:

$conf = include_once './conf.php';
$host = $conf['mysql']['HOST'];
$port = $conf['mysql']['PORT'];
$user = $conf['mysql']['USER'];
$db = $conf['mysql']['DATABASE'];
$password = $conf['mysql']['PASS'];

//
//$client->connect($host,$port,$db,$user,$password);

$socket = socket_create(AF_INET, SOCK_STREAM, SOL_TCP);
if ($socket === false) {
    echo "socket_create() 创建socket失败 reason: " . socket_strerror(socket_last_error()) . "\n";
} else {
    echo "成功创建socket\n";
}

$result = socket_connect($socket, $host, $port);
if ($result === false) {
    echo "连接mysql失败,原因: ($result) " . socket_strerror(socket_last_error($socket)) . "\n";
} else {
    echo "成功连接mysql";

    //开始解析
    $data = socket_read($socket, 8192, PHP_BINARY_READ);
    //解析服务器发来的握手信息
    $greeting_arr = decodeData($data);
//    var_dump($greeting_arr);
    $resp = formatLogin($greeting_arr);
    //开始请求登陆
    socket_write($socket, $resp, mb_strlen($resp, 'ASCII'));//发送登陆认证,

    //获取服务端响应报文,先读取消息头
    $data = socket_read($socket, 8192, PHP_BINARY_READ);
    $byteArr = unpack('C*', $data);//解包

    $type = $byteArr[5];
//    var_dump(sprintf('%02x',$type));
    $type_str = '';
    if ($type == 0x00) {
        $type_str = "OK";//响应命令
    } elseif ($type == 0xff) {
        $type_str = "ERR";
    } elseif ($type == 0xfe) {
        $type_str = "EOF";//EOF 报文表示登陆成功
    } elseif ($type <= 250) {
    }
    echo $type_str;
    socket_close($socket);
    exit;
}


function decodeData($data){

    $byteArr = unpack('C*',$data);//解包

    //获取消息长度
    $package_length  = $byteArr[1] | ($byteArr[2] << 8)  | ($byteArr[3] << 16);//前面3个字节表示长度
    $package_number = $byteArr[4];//消息序号
    //获取报文类型
    $type = $byteArr[5];

    $protocol = $type_str = '';
    if($type == 0x00 ){
        $type_str = "OK";//响应命令
    }elseif ($type == 0xff){
        $type_str = "ERR";
    }elseif($type == 0xfe){
        $type_str = "EOF";
    }elseif($type <= 250 ) {
        $type_str = "DATA";//响应数据
        $protocol = $type;//协议版本号为响应类型
    }

    $nextString = mb_substr($data,4);
    var_dump($nextString);
    //nullpos
    for($nullpos=6;$nullpos<count($byteArr);$nullpos++){
        if($byteArr[$nullpos] == 0x00){
            break;
        }
    }
    $shuzu = array_slice($byteArr,5,($nullpos-5));
    $version = pack('C*',...$shuzu);

    $leftPacket = array_slice($byteArr,$nullpos);
//    var_dump($leftPacket);
    //服务器线程id
    $thread_id = $leftPacket[0] | ($leftPacket[1] << 8)  | ($leftPacket[2] << 16) | ($leftPacket[3] << 24);//前面3个字节表示长度
    //第一个挑战随机数
    $salt1 = pack('C*',...array_slice($leftPacket,4,8));
    $fill = sprintf('%02x',$leftPacket[12]);//一位填充位

    //服务器权能标志
    $server_capability = sprintf('%02x%02x',$leftPacket[13],$leftPacket[14]);
    var_dump($server_capability);
    $charset = sprintf('%02x',$leftPacket[15]);//字符编码
    $status = sprintf('%02x%02x',$leftPacket[16],$leftPacket[17]);//服务器状态
    var_dump($status);
    $server_capability_ext = sprintf('%02x%02x',$leftPacket[18],$leftPacket[19]);//服务器权能标志(高16位)
    $auth_plugin_data_len = $leftPacket[20];//随机挑战数长度
    var_dump($auth_plugin_data_len);
    $fill2 = array_slice($leftPacket,21,10);//填充值
    for($nullpos=31;$nullpos<count($leftPacket);$nullpos++){
        if($leftPacket[$nullpos] == 0x00){
            break;
        }
    }
    $salt2 = array_slice($leftPacket,31,($nullpos-31));//第二部分挑战数
    $salt2 = pack('C*',...$salt2);//第二部分挑战数

    $client_auth_plugin = array_slice($leftPacket,$nullpos);
    $client_auth_plugin =pack('C*',...$client_auth_plugin);//登陆组件名
    var_dump($client_auth_plugin);

    return compact('package_length','package_number','type_str','protocol',
        'version','thread_id','salt1','fill','server_capability','charset',
        'status','server_capability_ext','auth_plugin_data_len','fill2','salt2','client_auth_plugin');
}

//登录
function formatLogin($greeting){
    global $user;
    global $password;
    global $db;
    //客户端配置项 类似bitMap 具体每个值的权限可以参考mysql的文档
    $body['capability'] = bin2hex(pack('v', 0b1010001010001101));
    $body['capability_ext'] = bin2hex(pack('v', 0b0000000000001010));
    $body['message_max_length'] = '000000c0';//最大消息长度
    $body['charset'] = $greeting['charset'];//字符编码
    $body['unused'] = str_repeat('00', 23);//填充数

    //00分隔符
    $body['user_name'] = bin2hex($user) . '00';
    $salt = $greeting['salt1'] . $greeting['salt2'];
    $part1 = sha1($password, true);
    $part2 = sha1($salt . sha1(sha1($password, true), true), true);
    //挑战认证数据,\x14表示后面有20位的密码 如果不传database的话要再结尾加上00表结束
    $body['passwd'] ='14'. bin2hex($part1 ^ $part2);
    $body['database'] = bin2hex($db) . '00';

    $body['client_auth_plugin'] = bin2hex($greeting['client_auth_plugin']);

    $body['connection_attributes'] = '150c' . bin2hex('_client_name') . '07' . bin2hex('mysqlnd');

//    var_dump($body);
    $stream = '';
    foreach ($body as $v) {
        $stream .= $v;
    }
    //消息体封包
    $stream = _packageBody($stream, 1);
    return _hex2ascii($stream);
}

//16进制转换成ascii  反过来是bin2hex
function _hex2ascii($hex_str) {
    $send_msg = "";
    foreach (str_split($hex_str, 2) as $v) {
        $send_msg .= chr(hexdec($v));
    }
    return $send_msg;
}

/*----------  格式化发送的数据  ----------*/
/**
 * 给消息体封装包头信息 (body数据长度数据(小端序3字节).第几个包(小端序1字节).消息体)
 * @param $hex_body
 * @param int $package_number
 * @return string
 */
function _packageBody($hex_body, $package_number = 0) {
    $byte_len = mb_strlen($hex_body, 'ASCII') / 2;//两个16进制位=1个字节
    //转换成24bit 小端字节序,3个字节
    $package_length = substr(bin2hex(pack('V', $byte_len)), 0, 6);
    //转换成8bit 小端字节序
    $package_number = bin2hex(pack('h', $package_number));
    return $package_length . $package_number . $hex_body;
}

上面的代码只是简单的实现了一个mysql登陆过程,距离完成整个mysql客户端还差了99.9%。你还要实现增删改查,你要实现事务功能,日志功能、读写锁等等,很复杂,但Swoole团队实现了

用Swoole来实现Mysql客户端

Swoole团队实现了一个协程 MySQL 客户端。使用方法如下:

use Swoole\Coroutine\MySQL;
use function Swoole\Coroutine\run;

run(function () {
    $swoole_mysql = new MySQL();
    $swoole_mysql->connect([
        'host'     => '127.0.0.1',
        'port'     => 3306,
        'user'     => 'user',
        'password' => 'pass',
        'database' => 'test',
    ]);
    $res = $swoole_mysql->query('select sleep(1)');
    var_dump($res);
});

具体使用文档:https://wiki.swoole.com/#/coroutine_client/mysql

一键协程化

起初,为了解决这些客户端的协程支持问题 Swoole 开发组做了大量的工作:比如完成了mysql客户端,redis客户端,针对每种类型的客户端都完成对应的适配。但慢慢的,swoole发现了这样做有一些问题:

  • • 实现复杂,团队需要针对每个客户端都需要很了解,想要完成完美的支持,那需要投入巨大的工作量。

  • • 增加了用户的技术负担,比如用户都习惯了PHP原生的PDO,那么现在需要用陌生的 Swoole\Coroutine\MySQL[2] 的方法,增加了学习成本。

  • • 很难覆盖到所有的操作,比如 proc_open()、sleep() 函数等等也可能阻塞住导致程序变成同步阻塞的

  • • phper一般都会安装好了php_pdo,phpredis等扩展,没有好好利用真的很可惜。

针对上述问题,Swoole 开发组换了实现思路,采用 Hook 原生 PHP 函数的方式实现协程客户端,通过一行代码就可以让原来的同步 IO 的代码变成可以协程调度[3]异步 IO[4],即一键协程化
实现方式如下:

Co::set(['hook_flags' => SWOOLE_HOOK_ALL]); //不包括CURL
Co::set(['hook_flags' => SWOOLE_HOOK_ALL | SWOOLE_HOOK_CURL]); //真正的协程化所有类型,包括CURL

通过一行代码,就能把原来的同步Io变成了异步IO,文档地址:https://wiki.swoole.com/#/runtime
有了这个特性,在hyperf框架的StartServer这个Command中,通过一句Coroutine::set(['hook_flags' => swoole_hook_flags()]);来实现了整个框架的协程化

    protected function execute(InputInterface $input, OutputInterface $output)
    {
        $this->checkEnvironment($output);

        $serverFactory = $this->container->get(ServerFactory::class)
            ->setEventDispatcher($this->container->get(EventDispatcherInterface::class))
            ->setLogger($this->container->get(StdoutLoggerInterface::class));

        $serverConfig = $this->container->get(ConfigInterface::class)->get('server', []);
        if (! $serverConfig) {
            throw new InvalidArgumentException('At least one server should be defined.');
        }

        $serverFactory->configure($serverConfig);

        Coroutine::set(['hook_flags' => swoole_hook_flags()]);

        $serverFactory->start();

        return 0;
    }


TCP/UDP客户端

Coroutine\Client 提供了 TCP、UDP、unixSocket[5] 传输协议的 Socket 客户端[6]封装代码,使用时仅需 new Swoole\Coroutine\Client 即可。
使用示例:

use Swoole\Coroutine\Client;
use function Swoole\Coroutine\run;

run(function () {
    $client = new Client(SWOOLE_SOCK_TCP);
    if (!$client->connect('127.0.0.1', 9501, 0.5))
    {
        echo "connect failed. Error: {$client->errCode}\n";
    }
    $client->send("hello world\n");
    echo $client->recv();
    $client->close();
});

协程客户端也支持长度和 EOF 协议处理,支持通过协议解析来处理TCP 数据包边界问题[7]
结束符检测

$client->set(array(
    'open_eof_check' => true,
    'package_eof' => "\r\n\r\n",
    'package_max_length' => 1024 * 1024 * 2,
));

长度检测

$client->set(array(
    'open_length_check' => true,
    'package_length_type' => 'N',
    'package_length_offset' => 0, //第N个字节是包长度的值
    'package_body_offset' => 4, //第几个字节开始计算长度
    'package_max_length' => 2000000, //协议最大长度
));

启用mqtt协议

$client->set(array(
    'open_mqtt_protocol' => true,
));


Socket客户端来

Swoole\Coroutine\Socket 模块相比协程风格服务端和协程客户端相关模块 Socket 可以实现更细粒度的一些 IO 操作。
示例:

use Swoole\Coroutine;
use function Swoole\Coroutine\run;

run(function () {
    $socket = new Coroutine\Socket(AF_INET, SOCK_STREAM, 0);

    $retval = $socket->connect('127.0.0.1', 9601);
    while ($retval)
    {
        $n = $socket->send('hello');
        var_dump($n);

        $data = $socket->recv();
        var_dump($data);

        //发生错误或对端关闭连接,本端也需要关闭
        if ($data === '' || $data === false) {
            echo "errCode: {$socket->errCode}\n";
            $socket->close();
            break;
        }

        Coroutine::sleep(1.0);
    }

    var_dump($retval, $socket->errCode, $socket->errMsg);
});

Coroutine\Socket 模块提供的 IO 操作接口均为同步编程风格,底层自动使用协程调度器实现异步 IO。
Socket是比较底层了一个类了,前面的TCP/UDP客户端都是Socket的封装,它们都内置了一个socket属性成员。

FastCGI 客户端

Swoole应用发布时一般都是通过nginx做方向代理来实现服务的发布。那么有没有可能不用做反向代理直接运行呢,答案是可以的。
我们知道php自带的php-fpm启动后,会创建一个master进程,监听9000端口(可配置),master进程又会根据fpm.conf/www.conf去创建若干子进程,子进程用于处理实际的业务。当有客户端来连接9000端口时,空闲子进程会自己去accept,如果子进程全部处于忙碌状态,新进的待accept的连接会被master放进队列里,等待fpm子进程空闲;[8]
我们看nginx的配置文件,一般都会有如下配置:

    location ~ [^/]\.php(/|$)
    {
        try_files $uri =404;
        fastcgi_pass  unix:/tmp/php-cgi.sock;
        fastcgi_index index.php;
        include fastcgi.conf;
        include pathinfo.conf;
    }

在看php-fpm.conf的配置可以知道,unix:/tmp/php-cgi.sock;其实就是127.0.0.1:9000 的unixsocket地址,说明nginx把php文件的请求转发给我php-fpm去处理了,而php-fpm收到请求就执行原生php运算了,swoole利用Socket客户端类对接了127.0.0.1:9000,接管了php原生部分的工作,从而实现了一个FastCGI 客户端,通过 FastCGI 客户端,可以直接与 PHP-FPM 服务进行交互而无需通过任何 HTTP 反向代理。
FastCGI客户端使用示例如下:

try {
    $client = new \Swoole\Coroutine\FastCGI\Client('127.0.0.1', 9000);
    $request = (new \Swoole\FastCGI\HttpRequest())
        ->withDocumentRoot(__DIR__)
        ->withScriptFilename(__DIR__ . '/var.php')
        ->withScriptName('var.php')
        ->withMethod('POST')
        ->withUri('/var?foo=bar&bar=char')
        ->withHeader('X-Foo', 'bar')
        ->withHeader('X-Bar', 'char')
        ->withBody(['foo' => 'bar', 'bar' => 'char']);
    $response = $client->execute($request);
    echo "Result: \n{$response->getBody()}";
} catch (\Swoole\Coroutine\FastCGI\Client\Exception $exception) {
    echo "Error: {$exception->getMessage()}\n";
}

引用链接

[1] socket: https://www.php.net/manual/en/ref.sockets.php
[2] Swoole\Coroutine\MySQL: https://wiki.swoole.com/#/coroutine_client/mysql
[3] 协程调度: https://wiki.swoole.com/#/coroutine?id=%e5%8d%8f%e7%a8%8b%e8%b0%83%e5%ba%a6
[4] 异步 IO: https://wiki.swoole.com/#/learn?id=%e5%90%8c%e6%ad%a5io%e5%bc%82%e6%ad%a5io
[5] unixSocket: https://wiki.swoole.com/#/learn?id=%e4%bb%80%e4%b9%88%e6%98%afipc
[6] Socket 客户端: https://wiki.swoole.com/#/coroutine_client/socket
[7] TCP 数据包边界问题: https://wiki.swoole.com/#/learn?id=tcp%e6%95%b0%e6%8d%ae%e5%8c%85%e8%be%b9%e7%95%8c%e9%97%ae%e9%a2%98
[8] www.conf去创建若干子进程,子进程用于处理实际的业务。当有客户端来连接9000端口时,空闲子进程会自己去accept,如果子进程全部处于忙碌状态,新进的待accept的连接会被master放进队列里,等待fpm子进程空闲;: http://www.conf去创建若干子进程,子进程用于处理实际的业务。当有客户端来连接9000端口时,空闲子进程会自己去accept,如果子进程全部处于忙碌状态,新进的待accept的连接会被master放进队列里,等待fpm子进程空闲;

喜欢就支持以下吧
点赞 0

发表评论

快捷回复: 表情:
aoman baiyan bishi bizui cahan ciya dabing daku deyi doge fadai fanu fendou ganga guzhang haixiu hanxiao zuohengheng zhuakuang zhouma zhemo zhayanjian zaijian yun youhengheng yiwen yinxian xu xieyanxiao xiaoku xiaojiujie xia wunai wozuimei weixiao weiqu tuosai tu touxiao tiaopi shui se saorao qiudale qinqin qiaoda piezui penxue nanguo liulei liuhan lenghan leiben kun kuaikule ku koubi kelian keai jingya jingxi jingkong jie huaixiao haqian aini OK qiang quantou shengli woshou gouyin baoquan aixin bangbangtang xiaoyanger xigua hexie pijiu lanqiu juhua hecai haobang caidao baojin chi dan kulou shuai shouqiang yangtuo youling
提交
评论列表 (有 0 条评论, 484人围观)

最近发表

热门文章

最新留言

热门推荐

标签列表