审查视图

cmd/send_job.php 10.0 KB
1  
邓超 authored
1 2 3 4 5 6
<?php

include_once "../vendor/autoload.php";

// 这里试试不用多进程模式,用多协程模式
x  
邓超 authored
7
1  
邓超 authored
8
邓超 authored
9
class SendJob {
1  
邓超 authored
10
邓超 authored
11
    public $cnum = 0;
1  
邓超 authored
12
邓超 authored
13 14 15 16 17 18 19
    private $run_timer = 0;

    public function __construct()
    {
        $this->run_timer = time();
    }
邓超 authored
20 21 22 23 24 25 26
    /**
     * 是否停止
     * @return bool
     * @author:dc
     * @time 2024/4/10 9:12
     */
    private function isStop(){
邓超 authored
27
        // 运行超过1天的 停止
邓超 authored
28
        if($this->run_timer < (time()-43200)){
邓超 authored
29
//            @posix_kill(getmypid(), SIGTERM);
邓超 authored
30 31
            return true;
        }
邓超 authored
32 33
        return redis()->get('send_job_is_stop') == 'stop';
    }
1  
邓超 authored
34
邓超 authored
35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54
    /**
     * 休眠
     * @param float $sleep
     * @return bool
     * @author:dc
     * @time 2024/4/10 9:12
     */
    private function s_sleep(float $sleep):bool {
        if($sleep > 0){
            $t = microtime(1);

            while (!$this->isStop()){
                co::sleep(0.1);
                if($sleep - (microtime(1)-$t) <= 0){
                    break;
                }
            }
        }
        return true;
    }
x  
邓超 authored
55 56

邓超 authored
57 58 59 60
    public function start(){
        _echo('启动邮件群发任务 '.getmypid());
        // 删除key
        redis()->delete('send_job_is_stop');
x  
邓超 authored
61 62

邓超 authored
63 64 65 66 67
        while (1){
            // 是否要停止
            if($this->isStop()){
                break;
            }
1  
邓超 authored
68
邓超 authored
69 70
            $lists  =   db()->all(\Model\sendJobsSql::sendList(500));
            $lists = $lists?$lists:[];
1  
邓超 authored
71
邓超 authored
72 73
            if($lists){
                foreach ($lists as $list){
邓超 authored
74 75 76 77 78 79 80 81 82 83 84 85 86 87 88
                    if($list['status'] === 1) {
                        $total = db()->first(\Model\sendJobStatusSql::countSum($list['id']));
                        if ($total && $total['t'] == $list['total']) {
                            // 更新状态
                            db()->update(\Model\sendJobsSql::$table, [
                                'status' => 2,
                                'success' => $total['s'],
                                'error' => $total['e'],
                            ], dbWhere(['id' => $list['id']]));

                            continue;
                        }

                    }
邓超 authored
89 90 91
                    $this->go_($list);

1  
邓超 authored
92 93
                }
            }
邓超 authored
94 95 96
                // 休眠30秒
            $this->s_sleep(30);
1  
邓超 authored
97 98
        }
邓超 authored
99
1  
邓超 authored
100 101
        // 这个是等待所有协程退出
        while (true){
邓超 authored
102 103
            _echo('等待协程退出...');
            if(!$this->cnum){
1  
邓超 authored
104 105
                break;
            }
邓超 authored
106
            co::sleep(1);
1  
邓超 authored
107 108
        }
邓超 authored
109
    }
1  
邓超 authored
110 111

邓超 authored
112 113 114 115 116 117 118 119 120
    /**
     * @param $list
     * @throws \Lib\Err
     * @throws \PHPMailer\PHPMailer\Exception
     * @author:dc
     * @time 2024/4/10 9:25
     */
    public function go_($list){
        // 占用 id
邓超 authored
121
        if(redis()->add('send_job_run_id_'.$list['id'],$list['id'],600)){
邓超 authored
122 123 124 125 126 127 128
            go(function ($data) {
                _echo('正在执行任务 '.$data['id']);
                $this->cnum++; // 协程数+1
                // 表单数据
                $data['maildata'] = json_decode($data['maildata'],true);
                // 查询邮箱
                $email = db()->first(\Model\emailSql::first($data['email_id']));
x  
邓超 authored
129
邓超 authored
130
                // 更新状态
x  
邓超 authored
131 132
                db()->update(\Model\sendJobsSql::$table,[
                    'status'    =>  1,
x  
邓超 authored
133
                    'total'  =>  count(array_unique(array_map('strtolower',array_column($data['maildata']['tos']??[],'email'))))
x  
邓超 authored
134 135 136 137 138
                ],dbWhere([
                    'id'    =>  $data['id']
                ]));

                _echo('更新任务状态 '.$data['id']);
邓超 authored
139 140 141 142
                // 是否是单发送
                if($data['maildata']['massSuit']??0){
                    $tos    =   $data['maildata']['tos'];
                    foreach ($tos as $to){
x  
邓超 authored
143
邓超 authored
144 145 146 147 148 149 150 151 152 153 154 155 156
                        // 续时间
                        redis()->set('send_job_run_id_'.$data['id'],$data['id'],600);

                        // 是否暂停
                        $dst = db()->first(\Model\sendJobsSql::isStatus($data['id']));
                        if($dst && $dst['status'] === 3){
                            break;
                        }

                        // 是否已发送过了
                        if(db()->count(\Model\sendJobStatusSql::count($data['id'],$to['email']))){
                            continue;
                        }
x  
邓超 authored
157
                        _echo('正在执行任务 发送邮件 '.$to['email']);
邓超 authored
158 159 160 161 162 163

                        // 每个收件人单独发送
                        $data['maildata']['tos'] = [$to];
                        //替换邮件内容中的指定字段为客户名字
                        $data['maildata']['body'] = str_replace('{customer_name}', $to['name'], $data['maildata']['body']);
                        $result = \Lib\Mail\MailFun::sendEmail($data['maildata'],$email);
邓超 authored
164
                        _echo('邮件发送 '.json_encode($result,JSON_UNESCAPED_UNICODE));
邓超 authored
165 166 167 168
                        // 插入紫薯精
                        db()->insert(\Model\sendJobStatusSql::$table,[
                            'job_id'  =>  $data['id'],
                            'to_email'  =>  $to['email'],
169
                            'status'    =>  $result[0] ? 1 : 0,
170
                            'error'    =>  $result[1]
邓超 authored
171 172 173 174 175 176 177
                        ]);


                        // 时间距离下次的时间
                        if($data['maildata']['masssuit_interval_send']??[]){
                            $time = rand($data['maildata']['masssuit_interval_send']['start'],$data['maildata']['masssuit_interval_send']['end']);
                            if($time){
邓超 authored
178
                                _echo('进入时间等待区 '.$to['email'].' 等待:'.$time);
邓超 authored
179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204
                                $block = false;
                                while (true){
                                    // 没5秒循环一次
                                    if($this->isStop()){
                                        $block = true;
                                        break;
                                    }
                                    $time-=5;
                                    $this->s_sleep(5);
                                    // 执行下一次了
                                    if (!$time){
                                        $block = true;
                                        break;
                                    }
                                }

                                if($block){
                                    break;
                                }
                            }
                        }

                    }

                }
                else{
邓超 authored
205 206 207 208 209 210 211 212 213 214 215 216 217
                    // 是否已发送过了
                    if(!db()->count(\Model\sendJobStatusSql::count($data['id'],'all'))){
                        $result = \Lib\Mail\MailFun::sendEmail($data['maildata'],$email);
                        // 更新状态
                        db()->update(\Model\sendJobsSql::$table,[
                            'status'    =>  2,
                            'success'   =>  $result[0] ? $data['total'] : 0,
                            'error'   =>  $result[0] ? 0 : $data['total'],
                        ],dbWhere(['id'=>$data['id']]));
                        // 插入紫薯精
                        db()->insert(\Model\sendJobStatusSql::$table,[
                            'job_id'  =>  $data['id'],
                            'to_email'  =>  'all',
218
                            'status'    =>  $result[0] ? 1 : 0,
邓超 authored
219 220 221 222 223 224 225
                            'error'    =>  $result[0] ? $result[1] : ''
                        ]);
                    }else{
                        _echo('发送过了 '.$data['id']);
                    }

邓超 authored
226 227 228 229 230 231 232
                }

                // 协程结束后
                co::defer(function ($id) use($data){
                    $this->cnum--;
                    // 验证是否完成
                    if($data['maildata']['massSuit']??0){
x  
邓超 authored
233 234 235 236 237 238 239 240 241 242 243
                        $dst = db()->first(\Model\sendJobsSql::isStatus($data['id']));
                        if($dst && $dst['status'] != 3){
                            $total = db()->first(\Model\sendJobStatusSql::countSum($data['id']));
                            if($total){
                                // 更新状态
                                db()->update(\Model\sendJobsSql::$table,[
                                    'status'    =>  $total['t'] == $data['total'] ? 2 : 0,
                                    'success'   =>  $total['s'],
                                    'error'   =>  $total['e'],
                                ],dbWhere(['id'=>$data['id']]));
                            }
邓超 authored
244 245
                        }
                    }
1  
邓超 authored
246
邓超 authored
247 248
                    // 写入日志
                    \Lib\Log::getInstance()->write();
1  
邓超 authored
249
邓超 authored
250 251 252
                    // 删除占用
                    redis()->delete('send_job_run_id_'.$data['id']);
邓超 authored
253 254 255

                    _echo('执行任务完成'.$data['id']);
邓超 authored
256 257 258 259 260 261 262 263
                });

            },$list);
        }

    }

}
1  
邓超 authored
264 265

1  
邓超 authored
266
$ps = "ps -ef | grep \"send_job.php start\" | grep -v grep | wc -l";
1  
邓超 authored
267 268 269

switch ($argv[1]??0){
    case 'start':{
邓超 authored
270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289

        // 开启协程
        \Co\run(function (){

            $handler = function ($signal){
                // 可以处理其他程序
                redis()->set('send_job_is_stop','stop');

                _echo('收到退出信号 '.$signal);
            };

            \Swoole\Process::signal(SIGTERM,$handler);
            \Swoole\Process::signal(SIGINT,$handler);

            (new SendJob)->start();

            _echo('进程已退出');

        });
1  
邓超 authored
290 291 292
        break;
    }
    case 'stop':{
1  
邓超 authored
293 294
        \Co\run(function ($ps){
            echo "正在退出程序...\n非必要请不要强制kill掉进程\n";
1  
邓超 authored
295
1  
邓超 authored
296
            redis()->set('send_job_is_stop','stop');
1  
邓超 authored
297
1  
邓超 authored
298
            while (true){
1  
邓超 authored
299
1  
邓超 authored
300 301 302 303 304
                $num = exec($ps);
                if(!$num){
                    break;
                }
                co::sleep(0.2);
1  
邓超 authored
305
            }
1  
邓超 authored
306 307 308
            echo "已退出程序\n";
        },$ps);
1  
邓超 authored
309 310 311 312 313 314
        break;
    }
    default:{
        break;
    }
}