<?php include_once "../vendor/autoload.php"; // 这里试试不用多进程模式,用多协程模式 class SendJob { public $cnum = 0; private $run_timer = 0; public function __construct() { $this->run_timer = time(); } /** * 是否停止 * @return bool * @author:dc * @time 2024/4/10 9:12 */ private function isStop(){ // 运行超过1天的 停止 if($this->run_timer < (time()-43200)){ // @posix_kill(getmypid(), SIGTERM); return true; } return redis()->get('send_job_is_stop') == 'stop'; } /** * 休眠 * @param float $sleep * @return bool * @author:dc * @time 2024/4/10 9:12 */ private function s_sleep(float $sleep):bool { if($sleep > 0){ $t = microtime(1); while (!$this->isStop()){ co::sleep(0.1); if($sleep - (microtime(1)-$t) <= 0){ break; } } } return true; } public function start(){ _echo('启动邮件群发任务 '.getmypid()); // 删除key redis()->delete('send_job_is_stop'); while (1){ // 是否要停止 if($this->isStop()){ break; } $lists = db()->all(\Model\sendJobsSql::sendList(500)); $lists = $lists?$lists:[]; if($lists){ foreach ($lists as $list){ if($list['status'] === 1) { $total = db()->first(\Model\sendJobStatusSql::countSum($list['id'])); if ($total && $total['t'] == $list['total']) { // 更新状态 db()->update(\Model\sendJobsSql::$table, [ 'status' => 2, 'success' => $total['s'], 'error' => $total['e'], ], dbWhere(['id' => $list['id']])); continue; } } $this->go_($list); } } // 休眠30秒 $this->s_sleep(30); } // 这个是等待所有协程退出 while (true){ _echo('等待协程退出...'); if(!$this->cnum){ break; } co::sleep(1); } } /** * @param $list * @throws \Lib\Err * @throws \PHPMailer\PHPMailer\Exception * @author:dc * @time 2024/4/10 9:25 */ public function go_($list){ // 占用 id if(redis()->add('send_job_run_id_'.$list['id'],$list['id'],600)){ go(function ($data) { _echo('正在执行任务 '.$data['id']); $this->cnum++; // 协程数+1 // 表单数据 $data['maildata'] = json_decode($data['maildata'],true); // 查询邮箱 $email = db()->first(\Model\emailSql::first($data['email_id'])); // 更新状态 db()->update(\Model\sendJobsSql::$table,[ 'status' => 1, 'total' => count(array_unique(array_map('strtolower',array_column($data['maildata']['tos']??[],'email')))) ],dbWhere([ 'id' => $data['id'] ])); _echo('更新任务状态 '.$data['id']); // 是否是单发送 if($data['maildata']['massSuit']??0){ $tos = $data['maildata']['tos']; foreach ($tos as $to){ // 续时间 redis()->set('send_job_run_id_'.$data['id'],$data['id'],600); // 是否暂停 $dst = db()->first(\Model\sendJobsSql::isStatus($data['id'])); if($dst && $dst['status'] === 3){ break; } // 是否已发送过了 if(db()->count(\Model\sendJobStatusSql::count($data['id'],$to['email']))){ continue; } _echo('正在执行任务 发送邮件 '.$to['email']); // 每个收件人单独发送 $data['maildata']['tos'] = [$to]; //替换邮件内容中的指定字段为客户名字 $data['maildata']['body'] = str_replace('{customer_name}', $to['name'], $data['maildata']['body']); $result = \Lib\Mail\MailFun::sendEmail($data['maildata'],$email); _echo('邮件发送 '.json_encode($result,JSON_UNESCAPED_UNICODE)); // 插入紫薯精 db()->insert(\Model\sendJobStatusSql::$table,[ 'job_id' => $data['id'], 'to_email' => $to['email'], 'status' => $result[0] ? 1 : 0, 'error' => $result[1] ]); // 时间距离下次的时间 if($data['maildata']['masssuit_interval_send']??[]){ $time = rand($data['maildata']['masssuit_interval_send']['start'],$data['maildata']['masssuit_interval_send']['end']); if($time){ _echo('进入时间等待区 '.$to['email'].' 等待:'.$time); $block = false; while (true){ // 没5秒循环一次 if($this->isStop()){ $block = true; break; } $time-=5; $this->s_sleep(5); // 执行下一次了 if (!$time){ $block = true; break; } } if($block){ break; } } } } } else{ // 是否已发送过了 if(!db()->count(\Model\sendJobStatusSql::count($data['id'],'all'))){ $result = \Lib\Mail\MailFun::sendEmail($data['maildata'],$email); // 更新状态 db()->update(\Model\sendJobsSql::$table,[ 'status' => 2, 'success' => $result[0] ? $data['total'] : 0, 'error' => $result[0] ? 0 : $data['total'], ],dbWhere(['id'=>$data['id']])); // 插入紫薯精 db()->insert(\Model\sendJobStatusSql::$table,[ 'job_id' => $data['id'], 'to_email' => 'all', 'status' => $result[0] ? 1 : 0, 'error' => $result[0] ? $result[1] : '' ]); }else{ _echo('发送过了 '.$data['id']); } } // 协程结束后 co::defer(function ($id) use($data){ $this->cnum--; // 验证是否完成 if($data['maildata']['massSuit']??0){ $dst = db()->first(\Model\sendJobsSql::isStatus($data['id'])); if($dst && $dst['status'] != 3){ $total = db()->first(\Model\sendJobStatusSql::countSum($data['id'])); if($total){ // 更新状态 db()->update(\Model\sendJobsSql::$table,[ 'status' => $total['t'] == $data['total'] ? 2 : 0, 'success' => $total['s'], 'error' => $total['e'], ],dbWhere(['id'=>$data['id']])); } } } // 写入日志 \Lib\Log::getInstance()->write(); // 删除占用 redis()->delete('send_job_run_id_'.$data['id']); _echo('执行任务完成'.$data['id']); }); },$list); } } } $ps = "ps -ef | grep \"send_job.php start\" | grep -v grep | wc -l"; switch ($argv[1]??0){ case 'start':{ // 开启协程 \Co\run(function (){ $handler = function ($signal){ // 可以处理其他程序 redis()->set('send_job_is_stop','stop'); _echo('收到退出信号 '.$signal); }; \Swoole\Process::signal(SIGTERM,$handler); \Swoole\Process::signal(SIGINT,$handler); (new SendJob)->start(); _echo('进程已退出'); }); break; } case 'stop':{ \Co\run(function ($ps){ echo "正在退出程序...\n非必要请不要强制kill掉进程\n"; redis()->set('send_job_is_stop','stop'); while (true){ $num = exec($ps); if(!$num){ break; } co::sleep(0.2); } echo "已退出程序\n"; },$ps); break; } default:{ break; } }