28. 用swoole_process写一个跑数平台

还是在上家公司的时候,同事在群里突然发了这个Swoole连接,才看到原来PHP还有这么一个扩展,顺便了也发Workerman,说也是PHP新的通信框架,而swoole则重新定义了PHP,那时候项目中还没使用这些扩展,就没去看过文档。前不久,这家公司当时带我的这位小伙伴突然告诉说,他们在准备要用swoole重构下一版接口了。

刚好这段时间我们也在准备街电的财务项目,用了Laravel框架,顺便也提前重新写一下跑数平台。以前没做过类似的工具,不知道怎么设计好,就借鉴了原来项目的跑数平台,完全是迁移过来的,并没有自己多的独特创造。

1. 起因

Laravel框架提供了非常丰富的控制台Artisan 命令行工具任务调度功能Task应用,对于测试通过和已经运行正常的脚本来说,直接添加不需要观察就可以很好的得到执行,但是线上项目我没有权限登陆控制台手动去运行命令,也不想申请这些权限,故只能依赖后台网页端来输入命令,让它自动执行,收集每一个脚本的输出结果,执行完毕后直接发邮件给我就行。

2. 期望

我想管理每一个脚本,对该脚本的输出和错误输出都单独发送邮件,同时直接在脚本中使用echo输出就能得到excel的原始数据,即是将该输出粘贴到excel中即可,不需要在程序中再次将输出写入到excel表格中再发送邮件。

但是Laravel框架的控制台把异常给屏蔽掉了,没注意到怎么去修改,这样会把错误输出也当做正常的输出,这点不是我想要的结果。我想直接看输出就知道脚本有没有执行异常,而不是打开日志来确定脚本执行是否顺利通过。

3. swoole_process多进程扩展

官网说swoole-1.7.2增加了一个进程管理模块,用来替代PHP的pcntl扩展。需要注意Process进程在系统是非常昂贵的资源,创建进程消耗很大。另外创建的进程过多会导致进程切换开销大幅上升。

原来的项目是基于多进程扩展pcntl来做的,同样也是使用信号函数来监听子进程的回收。其实对我这样简单的需求,依旧使用该扩展也是完全能够应付。这次是为了学习新的扩展,才使用swoole_process来代替。

4. 命令添加入口

提供给其它类调用的入口,只需要按格式添加命令和参数即可,目前未处理数组参数。在这里每次启动一个进程来执行,不需要监听,执行完毕后直接结束,下次再有需要时,在启动进程,如此往复。当然,如果启动进程太多,需要限制最大进程数量,以免给系统造成崩溃。

/**
 * 执行命令
 *
 * @param $commandName
 * @param array $params
 * @throws \Exception
 */
public function addRun($commandName, $params = [])
{
    $cmd = $this->getCmd($commandName, $params);
    $process = new \swoole_process([$this, 'process'], false);
    if (($pid = $process->start()) === false) {
        throw new \Exception(sprintf('ErrNo:%s, Error: %s', swoole_errno(), swoole_strerror(swoole_errno())));
    }

    // 保存准备的工作
    $this->works[$pid] = [
        'cmd' => $cmd,
        //'process' => $process
    ];
    // Master写入当前工作信息到管道
    $process->write(json_encode($this->works));

    // 信号注册函数必须在启动进程后注册, 否则无法子进程无法退出
    // 17) SIGCHLD 子进程结束时, 父进程会收到这个信号
    \swoole_process::signal(SIGCHLD, [$this, 'finished']);
    // 2) SIGINT 程序终止(interrupt)信号, 在用户键入INTR字符(通常是Ctrl-C)时发出, 用于通知前台进程组终止进程。
    \swoole_process::signal(SIGINT, [$this, 'finished']);
    // 9) SIGKILL 用来立即结束程序的运行.
    // 本信号不能被阻塞, 处理和忽略。如果管理员发现某个进程终止不了, 可尝试发送这个信号
    \swoole_process::signal(SIGKILL, [$this, 'finished']);
    // 15) SIGTERM 程序结束(terminate)信号, 与SIGKILL不同的是该信号可以被阻塞和处理
    // 通常用来要求程序自己正常退出, shell命令kill缺省产生这个信号。如果进程终止不了, 我们才会尝试SIGKILL
    \swoole_process::signal(SIGTERM, [$this, 'finished']);
}

原来的项目中,信号函数是在构造方法中注入的,在这调试了很久才知道信号注入差异。如果用swoole的话,在构造函数中注入信号会导致子进程无法正常执行完毕,必须在进程成功启动后在注册信号处理函数。

5. 命令运行机制

借助shell的输入输出重定向机制,将正确输出和错误输出分别重定向到对应的文件。

/**
 * 调用exec执行外部命令
 *
 * @param $command
 * @param $outFile
 * @param $errorFile
 * @return mixed
 */
protected function runCmd($command, $outFile, $errorFile)
{
    $cmd = sprintf("%s >>%s 2>>%s", $command, $outFile, $errorFile);
    $lastLine = exec($cmd, $output, $returnValue);
    return $returnValue;
}

比如:

php artisan battery 2017-08-01 2017-08-02 >>/Users/zhgxun/Public/html/logs/test/20170826/4853/4872_out.txt 2>>/Users/zhgxun/Public/html/logs/test/20170826/4853/4872_error.txt

swoole提供了exec方法,说是对exec()原本系统调用的封装,但是不知道如何重定向输入输出到文件,故无法采用,而且该处不需要进程间通信,反而更高级的功能把基本功能给改造得无法使用了。

6. battery命令

battery命令,是一个真正需要运行的脚本,该脚本代码如下:

<?php

namespace App\Console\Commands;

use Illuminate\Console\Command;
use App\Common\DbConnection;
use App\Common\DateRange;

class Battery extends Command
{
    protected $signature = 'battery {from=0} {to=0}';
    protected $description = '脚本描述';

    public function test(\swoole_process $w)
    {
        echo "程序正在计算中...\n";
        echo $w->pid . PHP_EOL;
    }

    public function handle()
    {
        $from = $this->argument('from');
        $to = $this->argument('to');
        if (!strtotime($from) || !strtotime($to)) {
            $to = date('Y-m-d');
            $from = DateRange::preDay($to);
        }

        $processNum = 0;

        $dates = DateRange::datesBetween($from, $to);
        foreach ($dates as $date) {
            $startDate = $date;
            $nextDate = DateRange::nextDay($date);
            $process = new \swoole_process(function (\swoole_process $work) use ($startDate, $nextDate) {
                // 每个进程都执行当天的业务逻辑
                echo "执行该天 {$startDate} 逻辑\n";
                $this->test($work);
                echo "该天 {$startDate} 数据执行完毕\n";
            });
            // 进程启动成功后会返回进程ID, 该进程ID可以进程进程间通信
            $pid = $process->start();
            $processNum++;
        }

        // 必须回收子进程
        for ($i = 0; $i < $processNum; $i++) {
            $ret = \swoole_process::wait();
            echo "进程 {$ret['pid']} 执行完毕, 回收后正常退出\n";
        }
    }
}

该脚本本身也是一个多进程执行的程序,test()方法就是真正要处理的业务逻辑,对于这种没有相互依赖的进程,不需要信号监听函数,运行完毕后回收子进程,避免成为僵尸进程浪费系统资源即可。

7. 添加命令

工具类本身是不需要直接运行的,需要一个继承它的使用类,该类中直接添加需要执行的命令和参数即可。

<?php

namespace App\Console\Commands;

use App\Console\Commands\Runner\Platform;

class Test extends Platform
{
    protected $signature = 'test';

    protected $description = 'Test';

    public function handle()
    {
        // battery 2017-08-01 2017-08-02
        $this->addRun('battery', [
            'from' => '2017-08-01',
            'to' => '2017-08-05'
        ]);
        $this->addRun('battery', [
            'from' => '2017-09-01',
            'to' => '2017-09-10'
        ]);
    }
}

只需要继承该基础类,就可以直接按其它脚本需要的参数和设置的名称,添加后就可以自动运行。

8. 运行效果

8.1 启动命令

zhgxun-pro:ankerbox_finance zhgxun$ 
zhgxun-pro:ankerbox_finance zhgxun$ php artisan test
zhgxun-pro:ankerbox_finance zhgxun$

8.2 查看日志

当前脚本文件中,会跟踪每一个命令的启动和完成时间,比如我们在添加命令中添加的两个命令,命令名称一致,但是参数不一致。

当前机器: 192.168.1.100, 当前用户: zhgxun, 当前代码分支: * runner, 版本: * runner 3fcd287 增加日历语言包配置
当前引导命令: php artisan test
运行日志目录: /Users/zhgxun/Public/html/logs/test/20170826/5880

5899 : 启动 "php artisan battery 2017-08-01 2017-08-05"

5900 : 启动 "php artisan battery 2017-09-01 2017-09-10"


5899 : 结束: "php artisan battery 2017-08-01 2017-08-05" 
[begin:2017-08-26 13:00:35 end:2017-08-26 13:00:36] 历时:01秒

5900 : 结束: "php artisan battery 2017-09-01 2017-09-10" 
[begin:2017-08-26 13:00:35 end:2017-08-26 13:00:36] 历时:01秒

比如进程5899的输出内容,该内容完全是代码中的echo输出,通过shell的机制收集,所有的输出都被写入到了日志文件中。

5899 : 启动 "php artisan battery 2017-08-01 2017-08-05"
执行该天 2017-08-01 逻辑
程序正在计算中...
5939
该天 2017-08-01 数据执行完毕
执行该天 2017-08-02 逻辑
程序正在计算中...
5941
该天 2017-08-02 数据执行完毕
执行该天 2017-08-03 逻辑
程序正在计算中...
5943
该天 2017-08-03 数据执行完毕
执行该天 2017-08-04 逻辑
程序正在计算中...
5945
该天 2017-08-04 数据执行完毕
进程 5939 执行完毕, 回收后正常退出
进程 5941 执行完毕, 回收后正常退出
进程 5943 执行完毕, 回收后正常退出
进程 5945 执行完毕, 回收后正常退出

exitStatus:0
5899 : 结束: "php artisan battery 2017-08-01 2017-08-05" 
[begin:2017-08-26 13:00:35 end:2017-08-26 13:00:36] 历时:01秒

如果有错误发生,还会将错误信息写入到错误日志文件中,但是因为框架的原因,比如我把命令名称故意写错,写成battery1和battery2,这两个命令明显不存在,但是框架把异常给处理为正常输出了,这点有些封装太过了。

public function handle()
{
    // battery 2017-08-01 2017-08-02
    $this->addRun('battery1', [
        'from' => '2017-08-01',
        'to' => '2017-08-05'
    ]);
    $this->addRun('battery2', [
        'from' => '2017-09-01',
        'to' => '2017-09-10'
    ]);
}

框架对这种明显的错误记录为:

6010 : 启动 "php artisan battery1 2017-08-01 2017-08-05"


  [Symfony\Component\Console\Exception\CommandNotFoundException]  
  Command "battery1" is not defined.                              
  Did you mean this?                                              
      battery                                                     



exitStatus:1
6010 : 结束: "php artisan battery1 2017-08-01 2017-08-05" 
[begin:2017-08-26 13:09:36 end:2017-08-26 13:09:36] 历时:00秒

虽然从退出状态等一系列信息中还是能判断出来,但是却无法直接写入到错误日志文件中。不过通过error_log()等的方式写入的日志信息,却可以正常写入到错误日志文件中,这点稍微能改造了一点点。

9. 信号函数

需要注意的是,在程序执行完毕后,如果使用了信号函数,脚本是不会自动退出的,需要直接使用exit()等方式来退出。这篇文章Swoole编程指南-2.9 Process进程中提到说:Process的signal方法是一个异步方法,其底层会开启事件循环,因此使用了signal方法的进程在主代码执行完后并不会主动退出,需要调用exit、发送信号等方式关闭。

10. 工具源码

<?php

namespace App\Console\Commands\Runner;

use Illuminate\Console\Command;

/**
 * 简易跑数工具
 *
 * @package App\Console\Commands\Runner
 */
class Platform extends Command
{
    /**
     * 输出日志目录
     *
     * @var null
     */
    protected $outputDirectory = null;

    /**
     * 当前php进程ID
     *
     * @var null
     */
    protected $myPid = null;

    /**
     * 设置可执行文件名称, 默认为框架入口脚本php artisan
     *
     * @var null
     */
    protected $execFile = 'php artisan';

    /**
     * 设置当前php进程总报告文件
     *
     * @var null
     */
    protected $myPidReportFile = null;

    /**
     * 当前引导执行的命令名称
     *
     * @var null
     */
    protected $currentCommand = null;

    /**
     * 保存当前正在运行的工作
     *
     * @var array
     */
    protected $works = [];

    /**
     * 是否需要发送邮件通知
     *
     * @var bool
     */
    protected $needSendEmail = false;

    /**
     * Platform constructor.
     */
    public function __construct()
    {
        parent::__construct();
        $this->myPid = getmypid();

        // 输出目录
        $this->setOutputDirectory();
        array_map('unlink', glob($this->outputDirectory . '/*'));

        // 设置当前运行的命令名称
        $this->setCurrentCommand();

        // 设置当前进程日志保存文件
        $this->setMyPidReportFile();

        // 当前进程总报告
        $this->initMyPidReport();
    }

    /**
     * 获取输出日志目录
     *
     * @return null|string
     */
    protected function setOutputDirectory()
    {
        if ($this->outputDirectory === null) {
            $_outputDirectory = env('LOG_PATH');
            // 配置文件未设置日志目录时, 使用系统临时目录
            if (!$_outputDirectory || !is_dir($_outputDirectory)) {
                $_outputDirectory = '/tmp';
            }
            $this->outputDirectory = sprintf('%s/%s/%s', $_outputDirectory, date('Ymd'), $this->myPid);
            if (!is_dir($this->outputDirectory) && !mkdir($this->outputDirectory, 0777, true)) {
                exit(sprintf('Failed to create folders [%s]...', $this->outputDirectory));
            }
            unset($_outputDirectory);
        }

        return $this->outputDirectory;
    }

    /**
     * 传递给该脚本的参数的数组
     * 当脚本以命令行方式运行时, argv 变量传递给程序 C 语言样式的命令行参数。
     */
    protected function setCurrentCommand()
    {
        if ($this->currentCommand === null) {
            $rawParams = [];
            if (isset($_SERVER['argv'])) {
                $rawParams = $_SERVER['argv'];
                array_shift($rawParams);
            }

            $params = [];
            foreach ($rawParams as $param) {
                if (preg_match('/^--(\w+)(=(.*))?$/', $param, $matches)) {
                    $name = $matches[1];
                    $params[$name] = isset($matches[3]) ? $matches[3] : true;
                } else {
                    $params[] = $param;
                }
            }
            if (!count($params)) {
                throw new \Exception(__METHOD__ . ", Line: " . __LINE__ . " Params is empty");
            }
            $this->currentCommand = $this->execFile . ' ' . $params[0];
            unset($rawParams, $params);
        }
    }

    /**
     * 设置当前php进程总报告文件
     */
    protected function setMyPidReportFile()
    {
        if ($this->myPidReportFile === null) {
            $this->myPidReportFile = sprintf('%s/%s_report.txt', $this->outputDirectory, $this->myPid);
        }
    }

    /**
     * 初始化当前php进程总报告日志
     */
    protected function initMyPidReport()
    {
        $whoami = exec("whoami");
        $branch = exec("git branch");
        $version = exec("git branch -v");
        file_put_contents($this->myPidReportFile, sprintf("当前机器: %s, 当前用户: %s, 当前代码分支: %s, 版本: %s\n当前引导命令: %s\n运行日志目录: %s\n\n",
            $this->getLocalIp(), $whoami, $branch, $version, $this->currentCommand, $this->outputDirectory));
    }

    /**
     * 获取本机IP地址
     *
     * @return null|string
     */
    public function getLocalIp()
    {
        static $ip = null;
        if (!$ip) {
            // centos
            $ip = exec("/sbin/ifconfig em1 2>&1 | grep -E 'inet ' | awk '{split($2,a,\":\");print a[2]}'");
            // debian
            if (!$ip) {
                $ip = exec("/sbin/ifconfig eth0 2>&1 | grep -E 'inet ' | awk '{split($2,a,\":\");print a[2]}'");
            }
            // mac
            if (!$ip) {
                $ip = exec("/sbin/ifconfig en0 | grep -E 'inet ' |  awk '{print $2}'");
            }
            if (!$ip) {
                $ip = exec("/sbin/ifconfig en1 | grep -E 'inet ' |  awk '{print $2}'");
            }
        }
        return $ip;
    }

    /**
     * 将命令拼接成shell可执行的格式
     *
     * @param $commandName
     * @param $params
     * @return string
     * @throws \Exception
     */
    protected function getCmd($commandName, $params)
    {
        $str = sprintf("%s %s", $this->execFile, $commandName);
        foreach ($params as $key => $value) {
            if (!is_string($value) && !is_numeric($value)) {
                throw new \Exception(sprintf("Parameter only allows string or number:\n%s -> %s\n%s\nLine:%s",
                    $key, var_export($value, true), __METHOD__, __LINE__));
            }
            $str .= sprintf(' %s', $value);
        }
        return $str;
    }

    /**
     * 调用system执行外部命令
     *
     * @param $command
     * @param $outFile
     * @param $errorFile
     * @return mixed
     */
    protected function runCmd($command, $outFile, $errorFile)
    {
        $cmd = sprintf("%s >>%s 2>>%s", $command, $outFile, $errorFile);
        $lastLine = exec($cmd, $output, $returnValue);
        return $returnValue;
    }

    /**
     * 执行命令
     *
     * @param $commandName
     * @param array $params
     * @throws \Exception
     */
    public function addRun($commandName, $params = [])
    {
        $cmd = $this->getCmd($commandName, $params);
        $process = new \swoole_process([$this, 'process'], false);
        if (($pid = $process->start()) === false) {
            throw new \Exception(sprintf('ErrNo:%s, Error: %s', swoole_errno(), swoole_strerror(swoole_errno())));
        }

        // 保存准备的工作
        $this->works[$pid] = [
            'cmd' => $cmd,
            //'process' => $process
        ];
        // Master写入当前工作信息到管道
        $process->write(json_encode($this->works));

        // 信号注册函数必须在启动进程后注册, 否则无法子进程无法退出
        // 17) SIGCHLD 子进程结束时, 父进程会收到这个信号
        \swoole_process::signal(SIGCHLD, [$this, 'finished']);
        // 2) SIGINT 程序终止(interrupt)信号, 在用户键入INTR字符(通常是Ctrl-C)时发出, 用于通知前台进程组终止进程。
        \swoole_process::signal(SIGINT, [$this, 'finished']);
        // 9) SIGKILL 用来立即结束程序的运行.
        // 本信号不能被阻塞, 处理和忽略。如果管理员发现某个进程终止不了, 可尝试发送这个信号
        \swoole_process::signal(SIGKILL, [$this, 'finished']);
        // 15) SIGTERM 程序结束(terminate)信号, 与SIGKILL不同的是该信号可以被阻塞和处理
        // 通常用来要求程序自己正常退出, shell命令kill缺省产生这个信号。如果进程终止不了, 我们才会尝试SIGKILL
        \swoole_process::signal(SIGTERM, [$this, 'finished']);
    }

    /**
     * 所有输出日志文件名
     *
     * @param $pid
     * @return array
     */
    protected function getOutputFiles($pid = null)
    {
        if ($pid === null) {
            $pid = getmypid();
        }
        return [
            $this->outputDirectory . "/{$pid}_out.txt",
            $this->outputDirectory . "/{$pid}_error.txt",
            $this->outputDirectory . "/{$pid}_status.txt",
            $this->outputDirectory . "/{$pid}_report.txt",
        ];
    }

    /**
     * 往总报告中写入命令启动日志
     *
     * @param $content
     */
    protected function pushMyPidReport($content)
    {
        file_put_contents($this->myPidReportFile, sprintf("%s\n", $content), FILE_APPEND);
    }

    /**
     * 注册的回调函数
     *
     * @param \swoole_process $work
     * @throws \Exception
     */
    public function process(\swoole_process $work)
    {
        $pid = $work->pid;
        // Worker读取Master管道信息
        $receive = $work->read();
        $_works = json_decode($receive, true);
        if (!isset($_works[$pid])) {
            throw new \Exception("empty work->pid({$pid})");
        }
        $command = $_works[$pid]['cmd'];

        // 记录命令启动信息到日志文件中
        list($outFile, $errorFile, $statusFile, $reportFile) = $this->getOutputFiles();
        $titleInfo = sprintf("%s : 启动 \"%s\"\n", getmypid(), $command);
        file_put_contents($outFile, $titleInfo);
        file_put_contents($reportFile, $titleInfo);
        file_put_contents($statusFile, date("Y-m-d H:i:s"));

        // 命令启动记录到总报告中
        $this->pushMyPidReport($titleInfo);

        $exitStatus = $this->runCmd($command, $outFile, $errorFile);
        file_put_contents($outFile, "\nexitStatus:$exitStatus\n", FILE_APPEND);

        $work->exit(1);
    }

    /**
     * 信号处理回调函数
     *
     * @param $signo
     */
    public function finished($signo = null)
    {
        // $blocking 参数可以指定是否阻塞等待,默认为阻塞
        while ($result = \swoole_process::wait(false)) {
            $pid = $result['pid'];
            $exitCode = $result['signal'];
            // 信号函数能直接共享主进程的内容
            $cmd = $this->works[$pid]['cmd'];
            list($outFile, $errorFile, $statusFile, $reportFile) = $this->getOutputFiles($pid);

            // 处理错误信息
            $interrupt = '';
            switch ($exitCode) {
                case 2:
                    $interrupt .= "该进程通常是Ctrl-C结束的\n";
                    break;
                case 9:
                    $interrupt .= "该进程是被kill掉的\n";
                    break;
                case 15:
                    $interrupt .= "程序结束(terminate)信号\n";
                    break;
                // 子进程结束时, 父进程会收到这个信号
                case 17:
                default:
            }
            if ($interrupt) {
                file_put_contents($errorFile, $interrupt, FILE_APPEND);
            }
            unset($interrupt);

            // 输出日志记录
            $beginDate = file_get_contents($statusFile);
            $endDate = date("Y-m-d H:i:s");
            $beginTime = strtotime($beginDate);
            $endMessage = sprintf("%s : 结束: \"%8s\" \n[begin:%s end:%s] 历时:%s", $pid, $cmd, $beginDate, $endDate, $this->getDiffTimeString($beginTime));
            file_put_contents($outFile, "{$endMessage}", FILE_APPEND);
            file_put_contents($reportFile, "{$endMessage}", FILE_APPEND);

            // 总报告日志记录
            $content = file_get_contents($errorFile);
            $message = '';
            if (!empty($content)) {
                $message .= "---------------error---------------\n";
                $message .= sprintf("pid: %s \n%s\n", $pid, $content);
                $message .= "---------------error---------------\n";
            }
            $this->pushMyPidReport(sprintf("%s\n%s", $message, $endMessage));

            // 标识命令执行完毕
            file_put_contents($statusFile, 'done');

            // 释放工作表
            unset($this->works[$pid]);

            // 发送邮件
//            if ($this->needSendEmail) {
//                $this->mail($pid);
//            }
        }

        // 终止主进程
        if (!count($this->works)) {
            exit();
        }
    }

    /**
     * 时间段描述
     *
     * @param $beginTime
     * @param null $endTime
     * @return string
     */
    protected function getDiffTimeString($beginTime, $endTime = null)
    {
        if ($endTime === null) {
            $endTime = time();
        }
        $diff = $endTime - $beginTime;
        $totalLeft = '';
        if ($diff >= 3600) {
            $totalLeft .= sprintf("%10d小时", $diff / 3600);
            $diff %= 3600;
        }
        if ($diff >= 60) {
            $totalLeft .= sprintf("%02d分钟", $diff / 60);
            $diff %= 60;
        }
        if ($diff < 60) {
            $totalLeft .= sprintf("%02d秒", $diff);
        }

        return $totalLeft;
    }
}