在PHP没有协程的时候,我们也玩过并发多线程,但是,对于结果的实时处理就没那么方便了(尽管也可以实现),有了协程之后,代码看起来就会舒服许多,参考下面的multi_cmd.php
multi_cmd.php :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 |
<?php $arr_task = array( array('cmd'=>'ssh -i /home/phpor/.ssh/id_rsa phpor@localhost "sleep 1 && echo -n succ 1"'), array('cmd'=>'ssh -i /home/phpor/.ssh/id_rsa phpor@localhost "sleep 2 && echo -n succ 2"'), array('cmd'=>'ssh -i /home/phpor/.ssh/id_rsa phpor@localhost "sleep 3 && echo -n succ 3"'), ); $start = microtime(1); $arr_result = multi_cmd($arr_task); foreach($arr_result as $k=>$v) { echo "$k: {$v['cmd']}\n"; echo "\tstdout: ".$v['output']['stdout']."\n"; echo "\tstderr: ".$v['output']['stderr']."\n"; } $end = microtime(1); echo "time_used:". ($end - $start) ."\n"; function multi_cmd($arr_task) { $arr_result = array(); foreach ($arr_task as $k => $arr) { $cmd = $arr['cmd']; $arr_result[$k] = array("cmd"=>$cmd, "result"=>0, "output"=>array("stdout"=>null, "stderr"=>null), "pipes"=>array()); $descriptorspec[$k] = array( 0 => array("pipe", "r"), // stdin is a pipe that the child will read from 1 => array("pipe", "w"), // stdout is a pipe that the child will write to 2 => array("pipe", "w") // stderr is a file to write to ); $process[$k] = proc_open($cmd, $descriptorspec[$k], $arr_result[$k]["pipes"]); if (!is_resource($process[$k])) { $arr_result[$k] = 3; continue; } stream_set_blocking($arr_result[$k]["pipes"][1], 0); stream_set_blocking($arr_result[$k]["pipes"][2], 0); } while(!empty($arr_result)) { foreach($arr_result as $k=>&$v) { if ($v['result'] == 3) { yield $k=>$v; unset($arr_result[$k]); continue; } $stdout = $arr_result[$k]["pipes"][1]; $stderr = $arr_result[$k]["pipes"][2]; if ($v['result'] != 1) { $str = @fread($stdout, 1024); $arr_result[$k]["output"]["stdout"] .= $str; if (@feof($stdout)) { $v['result'] += 1; fclose($stdout); } } if ($v['result'] != 2) { $str = @fread($stderr, 1024); $arr_result[$k]["output"]["stderr"] .= $str; if (@feof($stderr)) { $v['result'] += 2; fclose($stderr); } } } } } |
我们见到的更多的可能是并发执行多个任务,每个任务都完成后(如: curl异步多请求)在一并处理结果,如果处理结果本身也是需要时间的话,就会比较浪费时间,如果能完成一个处理一个的结果,效果会好一些
上面脚本存在瑕疵:
1、为了避免某个流阻塞整个进程,上面使用了非阻塞;但是,后面的死循环却导致了大量cpu的占用,所以,考虑使用stream_select会更好一些
2、为了能控制并发数就更好了,并发太大也可能不是好事
改进的脚本如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 |
<?php $arr_task = array( array('cmd'=>'ssh -i /home/phpor/.ssh/id_rsa phpor@localhost "sleep 2 && echo -n succ 1"'), array('cmd'=>'ssh -i /home/phpor/.ssh/id_rsa phpor@localhost "sleep 2 && echo -n succ 2"'), array('cmd'=>'ssh -i /home/phpor/.ssh/id_rsa phpor@localhost "sleep 3 && echo -n succ 3"'), ); $start = microtime(1); $arr_result = multi_cmd($arr_task, 2); foreach($arr_result as $k=>$v) { echo "$k: {$v['cmd']}\n"; echo "\tretcode: ".$v['retcode']."\n"; echo "\tstdout: ".$v['output']['stdout']."\n"; echo "\tstderr: ".$v['output']['stderr']."\n"; } $end = microtime(1); echo "time_used:". ($end - $start) ."\n"; function add_task(&$arr_task, &$arr_result, $concurent) { if (empty($arr_task)) return; foreach ($arr_task as $k => $arr) { if ($concurent != -1 && count($arr_result) >= $concurent ) return; echo "add task $k\n"; $cmd = $arr['cmd']; unset($arr_task[$k]); $arr_result[$k] = array("cmd"=>$cmd, "result"=>0, "output"=>array("stdout"=>null, "stderr"=>null), "pipes"=>array()); $arr_result[$k]['filedesc'] = array( 0 => array("pipe", "r"), // stdin is a pipe that the child will read from 1 => array("pipe", "w"), // stdout is a pipe that the child will write to 2 => array("pipe", "w") // stderr is a file to write to ); $arr_result[$k]['handle'] = proc_open($cmd, $arr_result[$k]['filedesc'], $arr_result[$k]["pipes"]); if (!is_resource($arr_result[$k]['handle'])) { $arr_result[$k] = 3; continue; } //stream_set_blocking($arr_result[$k]["pipes"][1], 0); //stream_set_blocking($arr_result[$k]["pipes"][2], 0); } } function multi_cmd($arr_task, $concurent = -1) { $arr_result = array(); add_task($arr_task, $arr_result, $concurent); while(!empty($arr_result)) { $arr_read_fds = array(); foreach($arr_result as $k=>&$v) { if ($v['result'] == 3) { unset($arr_result[$k]); $v['retcode'] = @proc_close($v['handle']); add_task($arr_task, $arr_result, $concurent); yield $k=>$v; continue; } if (!($v['result'] & 1)) { $arr_read_fds[] = $v["pipes"][1]; } if (!($v['result'] & 2)) { $arr_read_fds[] = $v["pipes"][2]; } } if (empty($arr_read_fds)) break; $arr_expect = array(); $arr_write_fds = $arr_expect_fds = array(); $arr_fds = $arr_read_fds; while( 0 === stream_select($arr_fds, $arr_write_fds, $arr_expect_fds, 0, 500000)) { $arr_fds = $arr_read_fds; } foreach($arr_result as $k=>&$v) { $stdout = $arr_result[$k]["pipes"][1]; $stderr = $arr_result[$k]["pipes"][2]; if (!($v['result'] & 1) && in_array($stdout, $arr_fds)) { $str = @fread($stdout, 1024); $arr_result[$k]["output"]["stdout"] .= $str; if (@feof($stdout)) { $v['result'] |= 1; @fclose($stdout); } } if (!($v['result'] & 2) && in_array($stderr, $arr_fds)) { $str = @fread($stderr, 1024); $arr_result[$k]["output"]["stderr"] .= $str; if (@feof($stderr)) { $v['result'] |= 2; @fclose($stderr); } } } } } |
注意:
在使用stream_select 的时候, 是否阻塞也就不重要了? 也不完全,加入其中一个比较靠前的任务执行时间很长,就算第一批的大部分任务执行时间都很短,也会因为fread而阻塞在执行时间长的任务身上而无法快速完成其它任务,进而加入更多的任务; 所以,这里可能的办法有:
- 办法一:给stderr和stdout添加read timeout限制,但是测试发现,stream_set_timeout 给这两个流添加超时时间是失败的,stream_set_timeout 返回false
- 办法二:依然使用非阻塞模式, 依然不行,尽管stream_set_blocking(stdout, false) 返回true,也是无效的
- 注意:
也就是说仅有的两个函数都是不能用的,也就是说,不是函数不行,而是,这事儿就不可行