PHP异步编程探索
我们知道,PHP 最初是为了支持同步开发而创建的,因此大多数 PHP 开发人员习惯于仅使用php编写同步代码,那么我们能用Php来进行异步编程吗。答案是可以的。
什么是异步编程
当我们用php编写的应用程序I/O任务时,程序会在执行某个任务之前,一定要等待之前的任务完成,这时CPU会有很多时间处于空闲状态,这不仅会降低应用程序性能,还会降低硬件利用率。比如,当程序需要从数据库中读取大量的数据时,由于需要等待I/O操作完成,程序的执行速度会非常缓慢。因此,我们通过Swoole扩展或者一些如Reactphp的库,在程序执行的过程中,不需要等待某个任务完成才能执行下一个任务。这种编程模式可以极大地提高程序的效率和响应速度,尤其在处理复杂的I/O操作时表现得更为出色,而这就是异步编程。
Php pcntl扩展
通过异步编程的概念可知,异步编程的可以通过多进程来实现,在php中,pcntl扩展是首选的用来处理多进程的库,我们先测一段同步的代码
<?php
class AsyncPhp{
public function curl($url)
{
$ch = curl_init();
curl_setopt($ch, CURLOPT_URL, $url);
curl_setopt($ch, CURLOPT_RETURNTRANSFER, 1);
curl_setopt($ch, CURLOPT_HEADER, 0);
$output = curl_exec($ch);
curl_close($ch);
return $output;
}
public function testSync(){
$url = 'https://www.baidu.com/?i=';
$start=microtime(true);
for($i=0;$i<5;$i++){
$this->curl($url.$i);
echo '访问第'.($i+1).'次百度'."\n";
}
$end=microtime(true);
echo "\n总用时",$end-$start."\n";
}
}
$tester = new AsyncPhp();
$tester->testSync();
当我们同步请求5次百度时,是0.9秒
这时为了提高性能,pcntl就可以派上用场。
pcntl是一个可以利用操作系统的fork系统调用在PHP中实现多线程的进程控制扩展,当使用fork系统调用后执行的代码将会是并行的。pcntl的缺点是只适用于Linux平台的CLI模式下使用
我们在刚才的测试来新增如下方法来测试
public function testAsync(){
$url = 'https://www.baidu.com/?i=';
$start=microtime(true);
$i=0;
while($i<5){
$pids[$i] = pcntl_fork(); //创建子进程
if($pids[$i] == 0){ //返回0表示在子进程
$this->curl($url.$i);; //子进程执行代码
exit(0);
}
$i++;
}
//等待进程关闭
for($i=0;$i<5;$i++){
pcntl_waitpid($pids[$i],$status,WUNTRACED);//等待进程结束
if(pcntl_wifexited($status)){
//子进程完成退出
echo '第'.$i.'次访问百度,用时:'.microtime(true)-$start."\n";
}
}
$end=microtime(true);
echo "\n总用时",$end-$start."\n";
}
在上面的代码中,当程序运行pcntl_fork时,Linux系统立fork出一个子进程,并在子进程中返回0,所以$this->curl是在子进程中运行的,exit(0)是终止了这个子进程,为什么要终止?如果不终止,子进程就会继续执行while这样最终导致产生很多次访问,而我们只要5次。pcntl_waitpid用于等待子进程结束,这样就可以计算子进程运行时间了。代码执行结果如下
通过对比可知,php使用pcntl扩展实现异步总用时比传统同步方式少用时0.7秒,性能提升了3倍多。但是pcntl只能跑在cli环境下,传统php-fpm环境下是无法使用的,这时我们可以在网络请求这边想办法,于是有了curl_multi。
使用curl_multi函数组
**curl_multi函数组允许异步处理多个 cURL 句柄,**下面是具体代码
public function getCurl($url)
{
$ch = curl_init();
curl_setopt($ch, CURLOPT_URL, $url);
curl_setopt($ch, CURLOPT_RETURNTRANSFER, 1);
curl_setopt($ch, CURLOPT_HEADER, 0);
return $ch;
}
public function testCurlMulti(){
$url = 'https://www.baidu.com/?i=';
$start=microtime(true);
$mh = curl_multi_init();
$curl_array = array();
for ($i=0; $i < 5; $i++) {
$curl_array[$i] = $this->getCurl($url.$i);
curl_multi_add_handle($mh, $curl_array[$i]);
}
$running = NULL;
do {
curl_multi_exec($mh,$running);
} while($running > 0);
foreach($curl_array as $i=>$item){
$cotnent = curl_multi_getcontent($curl_array[$i]);
echo '第'.$i.'次访问百度,用时:'.microtime(true)-$start."\n";
curl_multi_remove_handle($mh, $curl_array[$i]);
}
curl_multi_close($mh);
$end=microtime(true);
echo "\n总用时",$end-$start."\n";
}
在上面的代码中,curl_multi_exec函数将并行执行每个curl句柄。最终总用时如下:
也比同步的代码快了一倍了。
curl_multi函数组在很多场景下都有使用到,特别是在网络爬虫开发中会经常使用到。
Parallel扩展
同pcntl一样,Parallel[1]也是一个php扩展,安装方法:https://github.com/krakjoe/parallel/blob/develop/INSTALL.md,如果安装时报如下错误,
则需要重新编译php,编译方式如下:
wget https://www.php.net/distributions/php-8.1.1.tar.gz
tar -zxvf ./php-8.1.1.tar.gz
cd php-8.1.1
# 编译
./configure --prefix=/usr/local/php81 --enable-debug --enable-zts --with-config-file-path=/usr/local/php81/etc --with-config-file-scan-dir=/usr/local/php81/conf.d --with-mysqli=mysqlnd --with-pdo-mysql=mysqlnd --with-iconv=/usr/local --with-freetype=/usr/local/freetype --with-jpeg --with-zlib --enable-xml --disable-rpath --enable-bcmath --enable-shmop --enable-sysvsem --with-curl --enable-mbregex --enable-mbstring --enable-intl --enable-pcntl --enable-ftp --enable-gd --with-openssl --with-mhash --enable-pcntl --enable-sockets --with-zip --enable-soap --with-gettext --enable-opcache --with-xsl --with-pear --disable-mbregex
make && make install
# 搞定
ln -s /usr/local/php81/bin/php /usr/bin/php81
php -v
以上安装过程中有可能会出错,出错的解决方法请看我的另一篇文章《为了编译启用ZTS的php,我一路披荆斩棘》。
成功安装php之后,会打印如下结果
ZTS的php环境有了,就可以安装parallel了
git clone https://github.com/krakjoe/parallel.git
cd parallel
phpize
./configure --enable-parallel [ --enable-parallel-coverage ] [ --enable-parallel-dev ]
make
make test
make install
安装成功后是这样子的
我们继续新增testParallel方法。代码如下
public function testParallel(){
$ch = new Channel();
// executed within thread
$start=microtime(true);
$task = function (Channel $channel, int $i){
$url = 'https://www.baidu.com/?i=';
$url = $url.$i;
$ch = curl_init();
curl_setopt($ch, CURLOPT_URL, $url);
curl_setopt($ch, CURLOPT_RETURNTRANSFER, 1);
curl_setopt($ch, CURLOPT_HEADER, 0);
$output = curl_exec($ch);
curl_close($ch);
// echo $output;
echo "第".$i."次访问百度\n";
$channel->send($output);
};
// creating a few threads
$runtimeList = [];
for ($i = 0; $i < 5; $i++) {
$runtimeList[] = new Runtime();
}
// run all threads
$futureList = [];
foreach ($runtimeList as $i => $runtime) {
$futureList[] = $runtime->run($task, [$ch, $i]);
}
$ch->close();
// echo $queue . PHP_EOL;
}
Parallel底层用的也是多进程机制,但术语一般是说成线程,上面的代码中,我们定义了5个线程,并发运行结果如下
ReactPHP
与前面几个扩展库不同,ReactPHP[2] 是一个用PHP编写的基于事件驱动编程(EventLoop)的低级库。其核心是事件循环,在其之上提供低级实用程序,例如:流抽象、异步 DNS 解析器、网络客户端/服务器、HTTP 客户端/服务器以及与进程的交互。
安装ReactPHP很简单,直接用composer安装
composer require react/react
ReactPHP 有各种组件,例如事件循环、promise 和流。当我们安装 ReactPHP 时,这些组件和其他一些组件都会被安装,因此您不必单独安装
ReactPHP异步编程的写法采用了类Promise的设计,熟悉ES6 Promise的话上手很快。
require __DIR__ . '/vendor/autoload.php';
$browser = new React\Http\Browser();
$url = 'https://www.baidu.com/?i=';
$start = microtime(true);
for ($i=0; $i < 500; $i++) {
$browser->get($url.$i)->then(function (Psr\Http\Message\ResponseInterface $response) use($i,$start){
$result = $response->getBody();
echo "第{$i}次访问百度完成,用时:".microtime(true)-$start."\n";
}, function (Exception $e) {
echo 'Error: ' . $e->getMessage() . PHP_EOL;
});
}
上面的代码确实实现了异步编程。但我们发现用时并没有多大的提升
总用时为0.63秒?为什么?
我们把5次访问改为500次,运行程序时,我们新开个命令行窗口,输入ps -ef | grep php来观察php进程
我们发现始终都只有一个php ReactPhp2.php的进程,说明Reactphp没有采用多进程机制,但它的优点大于它的缺点。
1. ReactPHP安装非常方便,直接用composer require就行,不需要其他php扩展的支持
2. ReactPHP对异步编程进行了非常好的支持,书写语法非常友好。具体查看官方文档就知道了
Fibers
2021 年 11 月发布的 PHP 8.1 开始支持了一个新特性: Fibers[3],用于实现轻量级协程。由于它是一个非常底层的 API ,并不是直接可以使用于应用层,更多的,要使用Fibers,只需把php版本更新到8.1即可。
<?php
declare(ticks=1);//Zend引擎每执行1条低级语句就去执行一次 register_tick_function() 注册的函数
class Thread {
protected static $names = [];
protected static $fibers = [];
protected static $params = [];
public static function register(string|int $name, callable $callback, array $params)
{
self::$names[] = $name;
self::$fibers[] = new Fiber($callback);
self::$params[] = $params;
}
public static function run() {
$output = [];
while (self::$fibers) {
foreach (self::$fibers as $i => $fiber) {
try {
if (!$fiber->isStarted()) {
// Register a new tick function for scheduling this fiber
register_tick_function('Thread::scheduler');
$fiber->start(...self::$params[$i]);
} elseif ($fiber->isTerminated()) {
$output[self::$names[$i]] = $fiber->getReturn();
unset(self::$fibers[$i]);
} elseif ($fiber->isSuspended()) {
$fiber->resume();
}
} catch (Throwable $e) {
$output[self::$names[$i]] = $e;
}
}
}
return $output;
}
public static function scheduler () {
if(Fiber::getCurrent() === null) {
return;
}
// running Fiber::suspend() in this if condition will prevent an infinite loop!
if(count(self::$fibers) > 1)
{
Fiber::suspend();
}
}
}
$start = microtime(true);
function curl($url)
{
$ch = curl_init();
curl_setopt($ch, CURLOPT_URL, $url);
curl_setopt($ch, CURLOPT_RETURNTRANSFER, 1);
curl_setopt($ch, CURLOPT_HEADER, 0);
$output = curl_exec($ch);
curl_close($ch);
return $output;
}
function callback($i){
$url = 'https://www.baidu.com/?i=';
return curl($url.$i);
};
// registering 6 Threads (A, B, C, D, E, and F)
foreach(range(1, 5) as $id) {
Thread::register($id, function($id,$start){
$result = callback($id);
return microtime(true) - $start;//放回用时。
}, [$id,$start]);
}
// run threads and wait until execution finishes
$outputs = Thread::run();
print_r($outputs);
以上代码通过declare(ticks=1)来实现Fibers协程的切换。通过register_tick_function函数来让程序没运行一行就执行Fiber::suspend(),这样就可以达到协程切换的目的,最后$outputs打印的时每个协程的耗时。也因为频繁suspend,耗时比较长。但我们异步编程的目录已达到了。
Amphp
前面的例子只是使用Fiber完成了一个最简单但不太合理的协程。实际上 Fiber 扩展进入内核后,它是一个非常底层的 API ,并不是直接用于应用层的。而Amphp[4]利用了Fibers的特殊,完成一个完整的协程框架,里面包含了一大堆组件,可以根据需要composer对应的组件就行。
安装:
composer require amphp/amp
Amphp的使用和Reactphp很像,使用方法大概如下:
<?php
use Amp\Future;
$httpClient = HttpClientBuilder::buildDefault();
$uris = [
"google" => "https://www.google.com",
"news" => "https://news.google.com",
"bing" => "https://www.bing.com",
"yahoo" => "https://www.yahoo.com",
];
try {
$responses = Future\await(array_map(function ($uri) use ($httpClient) {
return Amp\async(fn () => $httpClient->request(new Request($uri, 'HEAD')));
}, $uris));
foreach ($responses as $key => $response) {
printf(
"%s | HTTP/%s %d %s\n",
$key,
$response->getProtocolVersion(),
$response->getStatus(),
$response->getReason()
);
}
} catch (Exception $e) {
// If any one of the requests fails the combo will fail
echo $e->getMessage(), "\n";
}
Swoole
我们最后介绍Swoole,swoole目前应该是php中异步编程的首选框架了,在国内有很高的知名度,甚至在有些php招聘中要求必会swoole。
use Swoole\Coroutine\Channel;
use function Swoole\Coroutine\go;
use function Swoole\Coroutine\run;
run(function() {
$chan = new Channel(5);
go(function () use ($chan) {
$cli = new Swoole\Coroutine\Http\Client('www.qq.com', 80);
$ret = $cli->get('/');
$chan->push(['key' => 'www.qq.com', 'content' => '访问www.qq.com成功!']);
});
go(function () use ($chan) {
$cli = new Swoole\Coroutine\Http\Client('www.163.com', 80);
$ret = $cli->get('/');
$chan->push(['key' => 'www.163.com', 'content' => '访问www.qq.com成功!']);
});
for ($i = 0; $i < 2; $i++) {
$result = $chan->pop();
var_dump($result);
}
});
上面的代码执行结果如下:
发表评论