«

php cli 多进程编程

时间:2024-11-19 11:46     作者:聆听岁月     分类:


前言
php cli 命令模式我想在日常开发中,大家用的都比较少。其实,在某些场景,cli命令真的很有作用,

我举个例子

在mysql数据库的某个表tab1中数据量有3000W+条数据,现在需要对这张表中的每一条数据做计算处理。将处理的结果放置在新表tab2上,因为数据量很大,我想大家不会采用php-fpm模式去处理这个吧,肯定还是要cli,并将执行时间设置为长时执行(set_time_limit(0))。经测试,单条数据处理耗时3s左右。那么3000W+ * 3 = 9000W 多秒的时间才能计算完全部的数据,这个时间是运营无法接受的。那么我们的想法肯定是单进程处理改为多进程处理。将计算量分摊到多个进程上处理达到优化的目的。 那么我们怎么来设计呢。

程序设计
封装 - php - swoole_proccess 多进程模块
`<?php

class ProcessPool{
private $process;

/**
 * Worker 进程数组
 * @var array
 */
private $process_list = [];

/**
 * 正在被使用的进程
 * @var array
 */
private $process_use = [];

/**
 * 最少进程数量
 * @var int
 */
private $min_worker_num = 3;

/**
 * 最多进程数量
 * @var int
 */
private $max_worker_num = 6;

/**
 * 当前进程数量
 * @var int
 */
private $current_num;

/**
 * @var callable 闭包待执行函数
 */
public $callable;

public function __construct($callable,$max_worker_num = 6)
{
    $this->callable = $callable;
    $this->max_worker_num = $max_worker_num;

    $this->process = new swoole_process(array($this, 'run'), false, 2);
    $this->process->start();
    swoole_process::wait();
}

public function run()
{
    $this->current_num = $this->min_worker_num;
    //建立全部的worker进程
    for($i = 0; $i < $this->current_num; $i++){
        $process = new swoole_process(array($this, 'task_run'), false, 2);
        $pid = $process->start();
        $this->process_list[$pid] = $process;
        $this->process_use[$pid] = 0;
    }

    foreach($this->process_list as $process){
        swoole_event_add($process->pipe, function ($pipe) use ($process){
            $data = $process->read();

// var_dump($data . '空闲');
//接收子进程处理完成的信息,而且重置为空闲
$this->process_use[$data] = 0;
});
}

    //每秒定时向worker管道投递任务
    swoole_timer_tick(1000 ,function ($timer_id){
        static $index = 0;
        $index = $index + 1;
        $flag = true; //是否新建worker
        foreach ($this->process_use as $pid => $used){
            if($used == 0){
                $flag = false;
                //标记为正在使用
                $this->process_use[$pid] = 1;
                // 在父进程内调用write,子进程能够调用read接收此数据
                $this->process_list[$pid]->write($index);
                break;
            }
        }

        if($flag && $this->current_num < $this->max_worker_num){
            //没有闲置worker,新建worker来处理
            $process = new swoole_process(array($this, 'task_run'), false, 2);
            $pid = $process->start();
            $this->process_list[$pid] = $process;
            $this->process_use[$pid] = 1;
            $this->process_list[$pid]->write($index);
            $this->current_num++;
        }

// var_dump('第' .$index. '个任务');
if($index >= ($this->max_worker_num + 1)){
foreach($this->process_list as $process){
$process->write("exit");
}
swoole_timer_clear($timer_id);
$this->process->exit();
}

    });
}

/**
 * 子进程处理
 * @param $worker
 */
public function task_run($worker)
{
    swoole_event_add($worker->pipe, function($pipe) use($worker){
        $data = $worker->read();

// var_dump($worker->pid . ':' . $data);
if($data == 'exit'){
$worker->exit();
exit;
}
//模拟耗时任务

// sleep(5);

        call_user_func($this->callable,$worker,$data);

        //告诉主进程处理完成
        //在子进程内调用write,父进程能够调用read接收此数据
        $worker->write($worker->pid);
    });
}

}

`

对数据量进行分摊,交给不同的进程处理。比如进程1处理0-200W的数据,进程2处理200W-300W的数据,依次类推。并做好日志记录,记录哪些处理失败了,原因是什么,方便后期针对性处理。

`<?php
$count = 30000000;//3000W 数据总量
$secNum = 2000000; //200W 每个进程要处理的数据量
$maxWorkNum = intval(ceil($count / $secNum));
$processPool = new ProcessPool(function(\Swoole\Process $worker,$index) use($maxWorkNum,$secNum){
//index 为进程编号从1开始计数;work为进程对象
$start = (($index-1) $secNum) + 1;
$end = $index > $maxWorkNum ? 0 : $index
$secNum;

    //开始任务
    $companyJob = new CompanyRelationshipJob($start,$end,$index);
    $companyJob->run();
},$maxWorkNum);

总结
当遇到数据量大的业务场景,我们需要考虑怎么将数据量均摊给多个进程处理,利用多核cpu的优势加快数据的处理满足业务需求。`