Swoole入门教程(二):搭建物联网服务
多端口监听
在开发服务器的时候,我们常常要同时对外开放很多端口,比如开放80端口提供http服务,开放9501端口提供Tcp服务,开放9502端口提供websocket服务,这时我们不需要new多个server,只需要用listen方法新增端口监听和协议即可
由于协程服务类Swoole\Coroutine\Http\Server没有多端口函数,我们用异步服务类Swoole\Http\Server来编写,代码如下
$server = new \Swoole\Http\Server('0.0.0.0',88);
$server->on('request',function (\Swoole\Http\Request $request,\Swoole\Http\Response $response){
$route = $request->server['request_uri'];
echo $route;
$response->end("<h1>Hello 你来自{$route}</h1>");
});
$server->on('start',function(){
echo '服务启动';
});
$tcp = $server->listen('0.0.0.0',9501,SWOOLE_SOCK_TCP);
$webSocket = $server->listen('0.0.0,0',9502,SWOOLE_SOCK_TCP);
$webSocket->set(['open_websocket_protocol'=>true]);
$webSocket->on('receive',function(){
echo 'Websocket收到了数据';
});
$server->start();
$webSocket端口对象要添加on方法,不然会报 Swoole\Server::start(): require onMessage callback 错误
关于Server的构造参数,请自行阅读官方文档:https://wiki.swoole.com/#/server/tcp_init
通过上面的代码我们就建立了3个服务88端口的http服务、9501端口的TCP服务,9502端口的websocket服务
那么如何接收请求呢,会不会混乱因为已经用端口区分开了,让每个端口各自处理请求,用的是Port对象的on方法,listen函数会返回一个\Swoole\Server\Port对象,根据协议不同,Port对象可以处理的事件不同,具体如下
TCP 服务器[1]
• onConnect
• onClose
• onReceive
UDP 服务器[2]
• onPacket
• onReceive
HTTP 服务器[3]
• onRequest
WebSocket 服务器[4]
• onMessage
• onOpen
• onHandshake
Swoole在物联网的应用
MODBUS通讯
用Swoole可以开发物联网应用,假设我们手上有一个环境监测设备,设备采用标准 MODBUS-RTU 通信协议,RS485 信号输出,现在我们就来开发一个服务器和这个设备通讯,
1、首先,我们一般会拿到设备的对接文档,大概长这样子的
2、然后我们随便找一个网络调试工具,用于模拟设备端来开发调试
3、我们就开始写服务端代码
假如我们要向03号设备查询当前环境的温度值,根据文档,我们需要构造的发送端数据是
地址码 功能码 起始地址 数据长度 校验码低位 校验码高位 0x03 0x03 num2hex(501)(2个字节) num2hex(2)(2个字节) crc16低位 crc16高位
function num2hex($num,$len){
$hex = base_convert($num,10,16);
//一个字节=2个16进制位
$len = $len*2;
if(strlen($hex)<$len){
$hex = str_repeat('0',$len-strlen($hex)).$hex;
}
return $hex;
}
function crc16($string)
{
$hex = pack('H*', $string);
$crc = 0xFFFF;
for ($x = 0; $x < strlen($hex); $x++) {
$crc = $crc ^ ord($hex[$x]);
for ($y = 0; $y < 8; $y++) {
if (($crc & 0x0001) == 0x0001) {
$crc = (($crc >> 1) ^ 0xA001);
} else {
$crc = $crc >> 1;
}
}
}
return $crc;
}
$low = sprintf('%02s',dechex($crc %256));// 低八位
$high = sprintf('%02s',dechex(floor($crc /256)));// 高8位
通过函数的处理,便可得出发送数据为
03 03 01 f5 00 02 d4 27
go(function(){
$server = new \Swoole\Coroutine\Server('0.0.0.0',9501,false,true);
$server->handle(function (\Swoole\Coroutine\Server\Connection $conn){
echo '您好,欢迎进入聊天室'."\n";
$socket = $conn->exportSocket();
//向设备发送数据
$command = '030301f50002d427';
$conn->send(hex2bin($command));
while (true){
$data = $conn->recv();
if ($data === '' || $data === false) {
// $errCode = swoole_last_error();
// $errMsg = socket_strerror($errCode);
// echo "协程风格下链接是动态创建和销毁的,可以设置5秒没收到数据自动超时关闭\n";
// $conn->close();
break;
}
else{
echo '收到数据:'.bin2hex($data);
}
}
//向设备发送请求数据
echo '客户端:'.$socket->fd.'的链接已经关闭';
});
$server->start();
});
在设备端点击连接后就收到了数据请求了
这时设备端需要回复应答帧给服务端去解析
解析的过程刚好是刚才组装数据的逆向,由于设备发来的数据可能存在丢包,所以要先做一下crc校验,校验非常简单,去掉最后2个字节,用0303040292ff9b通过crc16函数算出crc的低位和高位,然后对比79fd看是否对应上,如果对应上了,那数据就是正确的了。拿到数据后,再通过进制换算就可以拿到温度数据了。
0xFF9B (十六进制)= -101 => 温度 = -10.1℃
TCP 粘包问题
Swoole在没有并发的情况下快速启动中的代码可以正常运行,但是并发高了就会有 TCP 数据包边界问题,TCP 协议在底层机制上解决了 UDP 协议的顺序和丢包重传问题,但相比 UDP 又带来了新的问题,TCP 协议是流式的,数据包没有边界,应用程序使用 TCP 通信就会面临这些难题,俗称** TCP 粘包问题**。
因为 TCP 通信是流式的,在接收 1 个大数据包时,可能会被拆分成多个数据包发送。多次 Send 底层也可能会合并成一次进行发送。这里就需要 2 个操作来解决:
• 分包:Server 收到了多个数据包,需要拆分数据包
• 合包:Server 收到的数据只是包的一部分,需要缓存数据,合并成完整的包
所以 TCP 网络通信时需要设定通信协议。常见的 TCP 通用网络通信协议有 HTTP、HTTPS、FTP、SMTP、POP3、IMAP、SSH、Redis、Memcache、MySQL 。
值得一提的是,Swoole 内置了很多常见通用协议的解析,来解决这些协议的服务器的 TCP 数据包边界问题,只需要简单的配置即可,参考 open_http_protocol[5]/open_http2_protocol[6]/open_websocket_protocol[7]/open_mqtt_protocol[8]
前面的例子中,我们用的modbus不在通用协议里面,那么如何处理呢,其实,除了通用协议外还可以自定义协议,Swoole 支持了 2 种类型的自定义网络通信协议
• EOF 结束符协议
此协议处理的原理是每个数据包结尾加一串特殊字符表示包已结束,所以才用此处理方式的话要保证每个数据段里面不能有EOF结束符,否则会造成分包错误,设置的代码如下
$server->set(array(
'open_eof_check' => true,
'package_eof' => "\r\n",
));
$client->set(array(
'open_eof_check' => true,
'package_eof' => "\r\n",
));
• 固定包头 + 包体协议
固定包头的方法是最通用的,我们前面用的modbus可以通过这种方式来解决粘包问题。在服务器端程序中经常能看到。这种协议的特点是一个数据包总是由包头 + 包体 2 部分组成。包头由一个字段指定了包体或整个包的长度,长度一般是使用 2 字节 /4 字节整数来表示。服务器收到包头后,可以根据长度值来精确控制需要再接收多少数据就是完整的数据包。Swoole 的配置可以很好的支持这种协议,可以灵活地设置 4 项参数应对所有情况,具体文档在这:https://wiki.swoole.com/#/server/setting?id=open_length_check,这对前面的modbus,我们的需要的配置如下
$server->set(array(
'open_length_check' => true,
'package_length_func' => function ($data) {
//提取前面3个字节,第3个字节为数据区长度,然后+5(包括2个字节校验码)就是总长度
if (strlen($data) < 3) {
return 0;
}
$length = hexdec(bin2hex($data[2]));
if ($length <= 0) {
return -1;
}
$total = $length + 5;
echo $total;
return $total;
},
'package_max_length' => 2000000, //协议最大长度
));
搭建Mqtt服务
关于什么是MQTT,请看《什么是MQTT,物联网MQTT协议详解》[9]
通过设置 open_mqtt_protocol[10] 选项,启用后Swoole会解析 MQTT 包头,Worker 进程的 onReceive[11] 事件每次会返回一个完整的 MQTT 数据包。
可以使用 Swoole 作为 MQTT 服务端(broker)或客户端(client),实现一套完整物联网(IOT)解决方案
以下是实现一个MQTT服务端的代码
<?php
function decodeValue($data)
{
return 256 * ord($data[0]) + ord($data[1]);
}
function decodeString($data)
{
$length = decodeValue($data);
return substr($data, 2, $length);
}
function mqttGetHeader($data)
{
$byte = ord($data[0]);
$header['type'] = ($byte & 0xF0) >> 4;
$header['dup'] = ($byte & 0x08) >> 3;
$header['qos'] = ($byte & 0x06) >> 1;
$header['retain'] = $byte & 0x01;
return $header;
}
function eventConnect($header, $data)
{
$connect_info['protocol_name'] = decodeString($data);
$offset = strlen($connect_info['protocol_name']) + 2;
$connect_info['version'] = ord(substr($data, $offset, 1));
$offset += 1;
$byte = ord($data[$offset]);
$connect_info['willRetain'] = ($byte & 0x20 == 0x20);
$connect_info['willQos'] = ($byte & 0x18 >> 3);
$connect_info['willFlag'] = ($byte & 0x04 == 0x04);
$connect_info['cleanStart'] = ($byte & 0x02 == 0x02);
$offset += 1;
$connect_info['keepalive'] = decodeValue(substr($data, $offset, 2));
$offset += 2;
$connect_info['clientId'] = decodeString(substr($data, $offset));
return $connect_info;
}
$server = new Swoole\Server('0.0.0.0', 9501, SWOOLE_BASE);
$server->set([
'open_mqtt_protocol' => true, // 启用 MQTT 协议
'worker_num' => 1,
]);
$server->on('Connect', function ($server, $fd) {
echo "mqtt客户端连接进来.\n";
});
$server->on('Receive', function ($server, $fd, $reactor_id, $data) {
$header = mqttGetHeader($data);//swoole默认为什么解析了
var_dump($header);
if ($header['type'] == 1) {
$resp = chr(32) . chr(2) . chr(0) . chr(0);
eventConnect($header, substr($data, 2));
$server->send($fd, $resp);
} elseif ($header['type'] == 3) {
$offset = 2;
$topic = decodeString(substr($data, $offset));
$offset += strlen($topic) + 2;
$msg = substr($data, $offset);
echo "client msg: {$topic}\n----------\n{$msg}\n";
//file_put_contents(__DIR__.'/data.log', $data);
}
echo "收到消息,长度为" . strlen($data) . "\n";
});
$server->on('Close', function ($server, $fd) {
echo "客户端关闭连接.\n";
});
$server->on('start',function (){
echo "Mqtt Server启动了!";
});
$server->start();
启动后,我们通过Mqtt工具MQTTX来测试
异步任务
异步任务在任何系统中都至关重要,在 Server 程序中如果需要执行很耗时的操作,比如一个聊天服务器发送广播,比如文件传输,比如Web 服务器中发送邮件。如果直接去执行这些函数就会阻塞当前进程,导致服务器响应变慢,因此,我们需要异步任务
下面演示用代码演示,下载一本8M多的MQTT pdf书籍,书籍地址:https://www.laojunsay.com/wp-content/uploads/2023/04/Gaston-C.-Hillar-MQTT-Essentials-A-Lightweight-IoT-Protocol-2017-Packt-Publishing-libgen.li_.pdf
$server = new \Swoole\Server('0.0.0.0',9501,SWOOLE_PROCESS,SWOOLE_SOCK_TCP);
$server->set(array(
'worker_num' => 4, // 进程数
'task_worker_num'=>4
));
//此回调函数在worker进程中执行。
$server->on('Receive', function($serv, $fd, $reactor_id, $data) {
//投递异步任务
$url = $data;
echo "收到一个下载地址:地址为:{$url}\n";
$taskInfo = ['fd'=>$fd,'url'=>$url];
$task_id = $serv->task($taskInfo);
echo "立即指派给异步任务去下载: 任务id={$task_id},我要去处理别的事情了\n";
});
//处理异步任务(此回调函数在task进程中执行)。
$server->on('Task', function ($serv, $task_id, $reactor_id, $data) {
echo "异步开始下载,[id={$task_id}]".PHP_EOL;
$url = $data['url'];
$ch = curl_init();
$timeout = 60;
curl_setopt($ch, CURLOPT_URL, $url);
curl_setopt($ch, CURLOPT_RETURNTRANSFER, 1);
curl_setopt($ch, CURLOPT_CONNECTTIMEOUT, $timeout);//在需要用户检测的网页里需要增加下面两行
curl_setopt($ch, CURLOPT_SSL_VERIFYPEER, false);
//curl_setopt($ch, CURLOPT_HTTPAUTH, CURLAUTH_ANY);
//curl_setopt($ch, CURLOPT_USERPWD, US_NAME.”:”.US_PWD);
$contents = curl_exec($ch);
curl_close($ch);
file_put_contents('./mqtt.pdf',$contents);
//返回任务执行的结果
$finishInfo = ['msg'=>"资源:{$data['url']} -> 下载完成!",'fd'=>$data['fd']];
$serv->finish($finishInfo);
});
//处理异步任务的结果(此回调函数在worker进程中执行)。
$server->on('Finish', function ($serv, $task_id, $data) {
echo "异步任务[{$task_id}] 处理完成: {$data['msg']}".PHP_EOL;
$serv->send($data['fd'],iconv('utf-8','gb2312',$data['msg']));
});
$server->on('start',function (){
echo '服务器启动!'."\r\n";
});
$server->start();
演示结果
发表评论