<?php
$arr_task = array(
array('cmd'=>'ssh -i /home/phpor/.ssh/id_rsa phpor@localhost "sleep 2 && echo -n succ 1"'),
array('cmd'=>'ssh -i /home/phpor/.ssh/id_rsa phpor@localhost "sleep 2 && echo -n succ 2"'),
array('cmd'=>'ssh -i /home/phpor/.ssh/id_rsa phpor@localhost "sleep 3 && echo -n succ 3"'),
);
$start = microtime(1);
$arr_result = multi_cmd($arr_task, 2);
foreach($arr_result as $k=>$v) {
echo "$k: {$v['cmd']}\n";
echo "\tretcode: ".$v['retcode']."\n";
echo "\tstdout: ".$v['output']['stdout']."\n";
echo "\tstderr: ".$v['output']['stderr']."\n";
}
$end = microtime(1);
echo "time_used:". ($end - $start) ."\n";
function add_task(&$arr_task, &$arr_result, $concurent) {
if (empty($arr_task)) return;
foreach ($arr_task as $k => $arr) {
if ($concurent != -1 && count($arr_result) >= $concurent ) return;
echo "add task $k\n";
$cmd = $arr['cmd'];
unset($arr_task[$k]);
$arr_result[$k] = array("cmd"=>$cmd, "result"=>0, "output"=>array("stdout"=>null, "stderr"=>null), "pipes"=>array());
$arr_result[$k]['filedesc'] = array(
0 => array("pipe", "r"), // stdin is a pipe that the child will read from
1 => array("pipe", "w"), // stdout is a pipe that the child will write to
2 => array("pipe", "w") // stderr is a file to write to
);
$arr_result[$k]['handle'] = proc_open($cmd, $arr_result[$k]['filedesc'], $arr_result[$k]["pipes"]);
if (!is_resource($arr_result[$k]['handle'])) {
$arr_result[$k] = 3;
continue;
}
//stream_set_blocking($arr_result[$k]["pipes"][1], 0);
//stream_set_blocking($arr_result[$k]["pipes"][2], 0);
}
}
function multi_cmd($arr_task, $concurent = -1) {
$arr_result = array();
add_task($arr_task, $arr_result, $concurent);
while(!empty($arr_result)) {
$arr_read_fds = array();
foreach($arr_result as $k=>&$v) {
if ($v['result'] == 3) {
unset($arr_result[$k]);
$v['retcode'] = @proc_close($v['handle']);
add_task($arr_task, $arr_result, $concurent);
yield $k=>$v;
continue;
}
if (!($v['result'] & 1)) {
$arr_read_fds[] = $v["pipes"][1];
}
if (!($v['result'] & 2)) {
$arr_read_fds[] = $v["pipes"][2];
}
}
if (empty($arr_read_fds)) break;
$arr_expect = array();
$arr_write_fds = $arr_expect_fds = array();
$arr_fds = $arr_read_fds;
while( 0 === stream_select($arr_fds, $arr_write_fds, $arr_expect_fds, 0, 500000)) {
$arr_fds = $arr_read_fds;
}
foreach($arr_result as $k=>&$v) {
$stdout = $arr_result[$k]["pipes"][1];
$stderr = $arr_result[$k]["pipes"][2];
if (!($v['result'] & 1) && in_array($stdout, $arr_fds)) {
$str = @fread($stdout, 1024);
$arr_result[$k]["output"]["stdout"] .= $str;
if (@feof($stdout)) {
$v['result'] |= 1;
@fclose($stdout);
}
}
if (!($v['result'] & 2) && in_array($stderr, $arr_fds)) {
$str = @fread($stderr, 1024);
$arr_result[$k]["output"]["stderr"] .= $str;
if (@feof($stderr)) {
$v['result'] |= 2;
@fclose($stderr);
}
}
}
}
}