还是在上家公司的时候,同事在群里突然发了这个Swoole连接,才看到原来PHP还有这么一个扩展,顺便了也发Workerman,说也是PHP新的通信框架,而swoole则重新定义了PHP,那时候项目中还没使用这些扩展,就没去看过文档。前不久,这家公司当时带我的这位小伙伴突然告诉说,他们在准备要用swoole重构下一版接口了。
刚好这段时间我们也在准备街电的财务项目,用了Laravel框架,顺便也提前重新写一下跑数平台。以前没做过类似的工具,不知道怎么设计好,就借鉴了原来项目的跑数平台,完全是迁移过来的,并没有自己多的独特创造。
1. 起因
Laravel框架提供了非常丰富的控制台Artisan 命令行工具和任务调度功能Task应用,对于测试通过和已经运行正常的脚本来说,直接添加不需要观察就可以很好的得到执行,但是线上项目我没有权限登陆控制台手动去运行命令,也不想申请这些权限,故只能依赖后台网页端来输入命令,让它自动执行,收集每一个脚本的输出结果,执行完毕后直接发邮件给我就行。
2. 期望
我想管理每一个脚本,对该脚本的输出和错误输出都单独发送邮件,同时直接在脚本中使用echo输出就能得到excel的原始数据,即是将该输出粘贴到excel中即可,不需要在程序中再次将输出写入到excel表格中再发送邮件。
但是Laravel框架的控制台把异常给屏蔽掉了,没注意到怎么去修改,这样会把错误输出也当做正常的输出,这点不是我想要的结果。我想直接看输出就知道脚本有没有执行异常,而不是打开日志来确定脚本执行是否顺利通过。
3. swoole_process多进程扩展
官网说swoole-1.7.2增加了一个进程管理模块,用来替代PHP的pcntl扩展。需要注意Process进程在系统是非常昂贵的资源,创建进程消耗很大。另外创建的进程过多会导致进程切换开销大幅上升。
原来的项目是基于多进程扩展pcntl来做的,同样也是使用信号函数来监听子进程的回收。其实对我这样简单的需求,依旧使用该扩展也是完全能够应付。这次是为了学习新的扩展,才使用swoole_process来代替。
4. 命令添加入口
提供给其它类调用的入口,只需要按格式添加命令和参数即可,目前未处理数组参数。在这里每次启动一个进程来执行,不需要监听,执行完毕后直接结束,下次再有需要时,在启动进程,如此往复。当然,如果启动进程太多,需要限制最大进程数量,以免给系统造成崩溃。
/**
* 执行命令
*
* @param $commandName
* @param array $params
* @throws \Exception
*/
public function addRun($commandName, $params = [])
{
$cmd = $this->getCmd($commandName, $params);
$process = new \swoole_process([$this, 'process'], false);
if (($pid = $process->start()) === false) {
throw new \Exception(sprintf('ErrNo:%s, Error: %s', swoole_errno(), swoole_strerror(swoole_errno())));
}
// 保存准备的工作
$this->works[$pid] = [
'cmd' => $cmd,
//'process' => $process
];
// Master写入当前工作信息到管道
$process->write(json_encode($this->works));
// 信号注册函数必须在启动进程后注册, 否则无法子进程无法退出
// 17) SIGCHLD 子进程结束时, 父进程会收到这个信号
\swoole_process::signal(SIGCHLD, [$this, 'finished']);
// 2) SIGINT 程序终止(interrupt)信号, 在用户键入INTR字符(通常是Ctrl-C)时发出, 用于通知前台进程组终止进程。
\swoole_process::signal(SIGINT, [$this, 'finished']);
// 9) SIGKILL 用来立即结束程序的运行.
// 本信号不能被阻塞, 处理和忽略。如果管理员发现某个进程终止不了, 可尝试发送这个信号
\swoole_process::signal(SIGKILL, [$this, 'finished']);
// 15) SIGTERM 程序结束(terminate)信号, 与SIGKILL不同的是该信号可以被阻塞和处理
// 通常用来要求程序自己正常退出, shell命令kill缺省产生这个信号。如果进程终止不了, 我们才会尝试SIGKILL
\swoole_process::signal(SIGTERM, [$this, 'finished']);
}
原来的项目中,信号函数是在构造方法中注入的,在这调试了很久才知道信号注入差异。如果用swoole的话,在构造函数中注入信号会导致子进程无法正常执行完毕,必须在进程成功启动后在注册信号处理函数。
5. 命令运行机制
借助shell的输入输出重定向机制,将正确输出和错误输出分别重定向到对应的文件。
/**
* 调用exec执行外部命令
*
* @param $command
* @param $outFile
* @param $errorFile
* @return mixed
*/
protected function runCmd($command, $outFile, $errorFile)
{
$cmd = sprintf("%s >>%s 2>>%s", $command, $outFile, $errorFile);
$lastLine = exec($cmd, $output, $returnValue);
return $returnValue;
}
比如:
php artisan battery 2017-08-01 2017-08-02 >>/Users/zhgxun/Public/html/logs/test/20170826/4853/4872_out.txt 2>>/Users/zhgxun/Public/html/logs/test/20170826/4853/4872_error.txt
swoole提供了exec方法,说是对exec()原本系统调用的封装,但是不知道如何重定向输入输出到文件,故无法采用,而且该处不需要进程间通信,反而更高级的功能把基本功能给改造得无法使用了。
6. battery命令
battery命令,是一个真正需要运行的脚本,该脚本代码如下:
<?php
namespace App\Console\Commands;
use Illuminate\Console\Command;
use App\Common\DbConnection;
use App\Common\DateRange;
class Battery extends Command
{
protected $signature = 'battery {from=0} {to=0}';
protected $description = '脚本描述';
public function test(\swoole_process $w)
{
echo "程序正在计算中...\n";
echo $w->pid . PHP_EOL;
}
public function handle()
{
$from = $this->argument('from');
$to = $this->argument('to');
if (!strtotime($from) || !strtotime($to)) {
$to = date('Y-m-d');
$from = DateRange::preDay($to);
}
$processNum = 0;
$dates = DateRange::datesBetween($from, $to);
foreach ($dates as $date) {
$startDate = $date;
$nextDate = DateRange::nextDay($date);
$process = new \swoole_process(function (\swoole_process $work) use ($startDate, $nextDate) {
// 每个进程都执行当天的业务逻辑
echo "执行该天 {$startDate} 逻辑\n";
$this->test($work);
echo "该天 {$startDate} 数据执行完毕\n";
});
// 进程启动成功后会返回进程ID, 该进程ID可以进程进程间通信
$pid = $process->start();
$processNum++;
}
// 必须回收子进程
for ($i = 0; $i < $processNum; $i++) {
$ret = \swoole_process::wait();
echo "进程 {$ret['pid']} 执行完毕, 回收后正常退出\n";
}
}
}
该脚本本身也是一个多进程执行的程序,test()方法就是真正要处理的业务逻辑,对于这种没有相互依赖的进程,不需要信号监听函数,运行完毕后回收子进程,避免成为僵尸进程浪费系统资源即可。
7. 添加命令
工具类本身是不需要直接运行的,需要一个继承它的使用类,该类中直接添加需要执行的命令和参数即可。
<?php
namespace App\Console\Commands;
use App\Console\Commands\Runner\Platform;
class Test extends Platform
{
protected $signature = 'test';
protected $description = 'Test';
public function handle()
{
// battery 2017-08-01 2017-08-02
$this->addRun('battery', [
'from' => '2017-08-01',
'to' => '2017-08-05'
]);
$this->addRun('battery', [
'from' => '2017-09-01',
'to' => '2017-09-10'
]);
}
}
只需要继承该基础类,就可以直接按其它脚本需要的参数和设置的名称,添加后就可以自动运行。
8. 运行效果
8.1 启动命令
zhgxun-pro:ankerbox_finance zhgxun$
zhgxun-pro:ankerbox_finance zhgxun$ php artisan test
zhgxun-pro:ankerbox_finance zhgxun$
8.2 查看日志
当前脚本文件中,会跟踪每一个命令的启动和完成时间,比如我们在添加命令中添加的两个命令,命令名称一致,但是参数不一致。
当前机器: 192.168.1.100, 当前用户: zhgxun, 当前代码分支: * runner, 版本: * runner 3fcd287 增加日历语言包配置
当前引导命令: php artisan test
运行日志目录: /Users/zhgxun/Public/html/logs/test/20170826/5880
5899 : 启动 "php artisan battery 2017-08-01 2017-08-05"
5900 : 启动 "php artisan battery 2017-09-01 2017-09-10"
5899 : 结束: "php artisan battery 2017-08-01 2017-08-05"
[begin:2017-08-26 13:00:35 end:2017-08-26 13:00:36] 历时:01秒
5900 : 结束: "php artisan battery 2017-09-01 2017-09-10"
[begin:2017-08-26 13:00:35 end:2017-08-26 13:00:36] 历时:01秒
比如进程5899的输出内容,该内容完全是代码中的echo输出,通过shell的机制收集,所有的输出都被写入到了日志文件中。
5899 : 启动 "php artisan battery 2017-08-01 2017-08-05"
执行该天 2017-08-01 逻辑
程序正在计算中...
5939
该天 2017-08-01 数据执行完毕
执行该天 2017-08-02 逻辑
程序正在计算中...
5941
该天 2017-08-02 数据执行完毕
执行该天 2017-08-03 逻辑
程序正在计算中...
5943
该天 2017-08-03 数据执行完毕
执行该天 2017-08-04 逻辑
程序正在计算中...
5945
该天 2017-08-04 数据执行完毕
进程 5939 执行完毕, 回收后正常退出
进程 5941 执行完毕, 回收后正常退出
进程 5943 执行完毕, 回收后正常退出
进程 5945 执行完毕, 回收后正常退出
exitStatus:0
5899 : 结束: "php artisan battery 2017-08-01 2017-08-05"
[begin:2017-08-26 13:00:35 end:2017-08-26 13:00:36] 历时:01秒
如果有错误发生,还会将错误信息写入到错误日志文件中,但是因为框架的原因,比如我把命令名称故意写错,写成battery1和battery2,这两个命令明显不存在,但是框架把异常给处理为正常输出了,这点有些封装太过了。
public function handle()
{
// battery 2017-08-01 2017-08-02
$this->addRun('battery1', [
'from' => '2017-08-01',
'to' => '2017-08-05'
]);
$this->addRun('battery2', [
'from' => '2017-09-01',
'to' => '2017-09-10'
]);
}
框架对这种明显的错误记录为:
6010 : 启动 "php artisan battery1 2017-08-01 2017-08-05"
[Symfony\Component\Console\Exception\CommandNotFoundException]
Command "battery1" is not defined.
Did you mean this?
battery
exitStatus:1
6010 : 结束: "php artisan battery1 2017-08-01 2017-08-05"
[begin:2017-08-26 13:09:36 end:2017-08-26 13:09:36] 历时:00秒
虽然从退出状态等一系列信息中还是能判断出来,但是却无法直接写入到错误日志文件中。不过通过error_log()等的方式写入的日志信息,却可以正常写入到错误日志文件中,这点稍微能改造了一点点。
9. 信号函数
需要注意的是,在程序执行完毕后,如果使用了信号函数,脚本是不会自动退出的,需要直接使用exit()等方式来退出。这篇文章Swoole编程指南-2.9 Process进程中提到说:Process的signal方法是一个异步方法,其底层会开启事件循环,因此使用了signal方法的进程在主代码执行完后并不会主动退出,需要调用exit、发送信号等方式关闭。
10. 工具源码
<?php
namespace App\Console\Commands\Runner;
use Illuminate\Console\Command;
/**
* 简易跑数工具
*
* @package App\Console\Commands\Runner
*/
class Platform extends Command
{
/**
* 输出日志目录
*
* @var null
*/
protected $outputDirectory = null;
/**
* 当前php进程ID
*
* @var null
*/
protected $myPid = null;
/**
* 设置可执行文件名称, 默认为框架入口脚本php artisan
*
* @var null
*/
protected $execFile = 'php artisan';
/**
* 设置当前php进程总报告文件
*
* @var null
*/
protected $myPidReportFile = null;
/**
* 当前引导执行的命令名称
*
* @var null
*/
protected $currentCommand = null;
/**
* 保存当前正在运行的工作
*
* @var array
*/
protected $works = [];
/**
* 是否需要发送邮件通知
*
* @var bool
*/
protected $needSendEmail = false;
/**
* Platform constructor.
*/
public function __construct()
{
parent::__construct();
$this->myPid = getmypid();
// 输出目录
$this->setOutputDirectory();
array_map('unlink', glob($this->outputDirectory . '/*'));
// 设置当前运行的命令名称
$this->setCurrentCommand();
// 设置当前进程日志保存文件
$this->setMyPidReportFile();
// 当前进程总报告
$this->initMyPidReport();
}
/**
* 获取输出日志目录
*
* @return null|string
*/
protected function setOutputDirectory()
{
if ($this->outputDirectory === null) {
$_outputDirectory = env('LOG_PATH');
// 配置文件未设置日志目录时, 使用系统临时目录
if (!$_outputDirectory || !is_dir($_outputDirectory)) {
$_outputDirectory = '/tmp';
}
$this->outputDirectory = sprintf('%s/%s/%s', $_outputDirectory, date('Ymd'), $this->myPid);
if (!is_dir($this->outputDirectory) && !mkdir($this->outputDirectory, 0777, true)) {
exit(sprintf('Failed to create folders [%s]...', $this->outputDirectory));
}
unset($_outputDirectory);
}
return $this->outputDirectory;
}
/**
* 传递给该脚本的参数的数组
* 当脚本以命令行方式运行时, argv 变量传递给程序 C 语言样式的命令行参数。
*/
protected function setCurrentCommand()
{
if ($this->currentCommand === null) {
$rawParams = [];
if (isset($_SERVER['argv'])) {
$rawParams = $_SERVER['argv'];
array_shift($rawParams);
}
$params = [];
foreach ($rawParams as $param) {
if (preg_match('/^--(\w+)(=(.*))?$/', $param, $matches)) {
$name = $matches[1];
$params[$name] = isset($matches[3]) ? $matches[3] : true;
} else {
$params[] = $param;
}
}
if (!count($params)) {
throw new \Exception(__METHOD__ . ", Line: " . __LINE__ . " Params is empty");
}
$this->currentCommand = $this->execFile . ' ' . $params[0];
unset($rawParams, $params);
}
}
/**
* 设置当前php进程总报告文件
*/
protected function setMyPidReportFile()
{
if ($this->myPidReportFile === null) {
$this->myPidReportFile = sprintf('%s/%s_report.txt', $this->outputDirectory, $this->myPid);
}
}
/**
* 初始化当前php进程总报告日志
*/
protected function initMyPidReport()
{
$whoami = exec("whoami");
$branch = exec("git branch");
$version = exec("git branch -v");
file_put_contents($this->myPidReportFile, sprintf("当前机器: %s, 当前用户: %s, 当前代码分支: %s, 版本: %s\n当前引导命令: %s\n运行日志目录: %s\n\n",
$this->getLocalIp(), $whoami, $branch, $version, $this->currentCommand, $this->outputDirectory));
}
/**
* 获取本机IP地址
*
* @return null|string
*/
public function getLocalIp()
{
static $ip = null;
if (!$ip) {
// centos
$ip = exec("/sbin/ifconfig em1 2>&1 | grep -E 'inet ' | awk '{split($2,a,\":\");print a[2]}'");
// debian
if (!$ip) {
$ip = exec("/sbin/ifconfig eth0 2>&1 | grep -E 'inet ' | awk '{split($2,a,\":\");print a[2]}'");
}
// mac
if (!$ip) {
$ip = exec("/sbin/ifconfig en0 | grep -E 'inet ' | awk '{print $2}'");
}
if (!$ip) {
$ip = exec("/sbin/ifconfig en1 | grep -E 'inet ' | awk '{print $2}'");
}
}
return $ip;
}
/**
* 将命令拼接成shell可执行的格式
*
* @param $commandName
* @param $params
* @return string
* @throws \Exception
*/
protected function getCmd($commandName, $params)
{
$str = sprintf("%s %s", $this->execFile, $commandName);
foreach ($params as $key => $value) {
if (!is_string($value) && !is_numeric($value)) {
throw new \Exception(sprintf("Parameter only allows string or number:\n%s -> %s\n%s\nLine:%s",
$key, var_export($value, true), __METHOD__, __LINE__));
}
$str .= sprintf(' %s', $value);
}
return $str;
}
/**
* 调用system执行外部命令
*
* @param $command
* @param $outFile
* @param $errorFile
* @return mixed
*/
protected function runCmd($command, $outFile, $errorFile)
{
$cmd = sprintf("%s >>%s 2>>%s", $command, $outFile, $errorFile);
$lastLine = exec($cmd, $output, $returnValue);
return $returnValue;
}
/**
* 执行命令
*
* @param $commandName
* @param array $params
* @throws \Exception
*/
public function addRun($commandName, $params = [])
{
$cmd = $this->getCmd($commandName, $params);
$process = new \swoole_process([$this, 'process'], false);
if (($pid = $process->start()) === false) {
throw new \Exception(sprintf('ErrNo:%s, Error: %s', swoole_errno(), swoole_strerror(swoole_errno())));
}
// 保存准备的工作
$this->works[$pid] = [
'cmd' => $cmd,
//'process' => $process
];
// Master写入当前工作信息到管道
$process->write(json_encode($this->works));
// 信号注册函数必须在启动进程后注册, 否则无法子进程无法退出
// 17) SIGCHLD 子进程结束时, 父进程会收到这个信号
\swoole_process::signal(SIGCHLD, [$this, 'finished']);
// 2) SIGINT 程序终止(interrupt)信号, 在用户键入INTR字符(通常是Ctrl-C)时发出, 用于通知前台进程组终止进程。
\swoole_process::signal(SIGINT, [$this, 'finished']);
// 9) SIGKILL 用来立即结束程序的运行.
// 本信号不能被阻塞, 处理和忽略。如果管理员发现某个进程终止不了, 可尝试发送这个信号
\swoole_process::signal(SIGKILL, [$this, 'finished']);
// 15) SIGTERM 程序结束(terminate)信号, 与SIGKILL不同的是该信号可以被阻塞和处理
// 通常用来要求程序自己正常退出, shell命令kill缺省产生这个信号。如果进程终止不了, 我们才会尝试SIGKILL
\swoole_process::signal(SIGTERM, [$this, 'finished']);
}
/**
* 所有输出日志文件名
*
* @param $pid
* @return array
*/
protected function getOutputFiles($pid = null)
{
if ($pid === null) {
$pid = getmypid();
}
return [
$this->outputDirectory . "/{$pid}_out.txt",
$this->outputDirectory . "/{$pid}_error.txt",
$this->outputDirectory . "/{$pid}_status.txt",
$this->outputDirectory . "/{$pid}_report.txt",
];
}
/**
* 往总报告中写入命令启动日志
*
* @param $content
*/
protected function pushMyPidReport($content)
{
file_put_contents($this->myPidReportFile, sprintf("%s\n", $content), FILE_APPEND);
}
/**
* 注册的回调函数
*
* @param \swoole_process $work
* @throws \Exception
*/
public function process(\swoole_process $work)
{
$pid = $work->pid;
// Worker读取Master管道信息
$receive = $work->read();
$_works = json_decode($receive, true);
if (!isset($_works[$pid])) {
throw new \Exception("empty work->pid({$pid})");
}
$command = $_works[$pid]['cmd'];
// 记录命令启动信息到日志文件中
list($outFile, $errorFile, $statusFile, $reportFile) = $this->getOutputFiles();
$titleInfo = sprintf("%s : 启动 \"%s\"\n", getmypid(), $command);
file_put_contents($outFile, $titleInfo);
file_put_contents($reportFile, $titleInfo);
file_put_contents($statusFile, date("Y-m-d H:i:s"));
// 命令启动记录到总报告中
$this->pushMyPidReport($titleInfo);
$exitStatus = $this->runCmd($command, $outFile, $errorFile);
file_put_contents($outFile, "\nexitStatus:$exitStatus\n", FILE_APPEND);
$work->exit(1);
}
/**
* 信号处理回调函数
*
* @param $signo
*/
public function finished($signo = null)
{
// $blocking 参数可以指定是否阻塞等待,默认为阻塞
while ($result = \swoole_process::wait(false)) {
$pid = $result['pid'];
$exitCode = $result['signal'];
// 信号函数能直接共享主进程的内容
$cmd = $this->works[$pid]['cmd'];
list($outFile, $errorFile, $statusFile, $reportFile) = $this->getOutputFiles($pid);
// 处理错误信息
$interrupt = '';
switch ($exitCode) {
case 2:
$interrupt .= "该进程通常是Ctrl-C结束的\n";
break;
case 9:
$interrupt .= "该进程是被kill掉的\n";
break;
case 15:
$interrupt .= "程序结束(terminate)信号\n";
break;
// 子进程结束时, 父进程会收到这个信号
case 17:
default:
}
if ($interrupt) {
file_put_contents($errorFile, $interrupt, FILE_APPEND);
}
unset($interrupt);
// 输出日志记录
$beginDate = file_get_contents($statusFile);
$endDate = date("Y-m-d H:i:s");
$beginTime = strtotime($beginDate);
$endMessage = sprintf("%s : 结束: \"%8s\" \n[begin:%s end:%s] 历时:%s", $pid, $cmd, $beginDate, $endDate, $this->getDiffTimeString($beginTime));
file_put_contents($outFile, "{$endMessage}", FILE_APPEND);
file_put_contents($reportFile, "{$endMessage}", FILE_APPEND);
// 总报告日志记录
$content = file_get_contents($errorFile);
$message = '';
if (!empty($content)) {
$message .= "---------------error---------------\n";
$message .= sprintf("pid: %s \n%s\n", $pid, $content);
$message .= "---------------error---------------\n";
}
$this->pushMyPidReport(sprintf("%s\n%s", $message, $endMessage));
// 标识命令执行完毕
file_put_contents($statusFile, 'done');
// 释放工作表
unset($this->works[$pid]);
// 发送邮件
// if ($this->needSendEmail) {
// $this->mail($pid);
// }
}
// 终止主进程
if (!count($this->works)) {
exit();
}
}
/**
* 时间段描述
*
* @param $beginTime
* @param null $endTime
* @return string
*/
protected function getDiffTimeString($beginTime, $endTime = null)
{
if ($endTime === null) {
$endTime = time();
}
$diff = $endTime - $beginTime;
$totalLeft = '';
if ($diff >= 3600) {
$totalLeft .= sprintf("%10d小时", $diff / 3600);
$diff %= 3600;
}
if ($diff >= 60) {
$totalLeft .= sprintf("%02d分钟", $diff / 60);
$diff %= 60;
}
if ($diff < 60) {
$totalLeft .= sprintf("%02d秒", $diff);
}
return $totalLeft;
}
}