<?php //error_reporting(); use Swoole\Process; function start(){ // 删除停止运行的值 // redis()->delete(SYNC_RUNNING_REDIS_KEY,'email_sync_stop_num'); // 进程管理器 $pm = new Process\Manager(); // 启动业务进程 $pm->addBatch(10,function (Process\Pool $pool, int $worker_id){ swoole_set_process_name('php-email-sync-list-'.$worker_id); include_once __DIR__."/../vendor/autoload.php"; _echo("业务进程({$worker_id})启动成功"); $run_timer = time(); // 循环阻塞 while (true){ // 运行超过1天的 停止 if($run_timer < (time()-21600)){ break; } // 需要同步的id $id = redis()->lPop('sync_email_lists'); if($id && is_numeric($id)){ // 占用当前的id,占用2小时 if(redis()->add('just_sync_'.$id,time(),600)){ // 启动一个协程 go(function () use ($id){ // 开始同步 try { sync($id); }catch (\Throwable $e){ logs( $e->getMessage().PHP_EOL.$e->getTraceAsString(), LOG_PATH.'/sync/'.$id.'.log' ); } // 协程完成后执行的函数 co::defer(function () use ($id){ // 30秒后 消除占用 redis()->expire('just_sync_'.$id,30); // 写入日志 \Lib\Log::getInstance()->write(); }); }); } }else{ co::sleep(1); } //每次都暂停1秒,防止同一时间启动太多的任务 co::sleep(0.5); } },true); // 启动一个同步内容的进程 // $pm->add(function (Process\Pool $pool, int $worker_id){ // // swoole_set_process_name('php-email-sync-body-'.$worker_id); // // include_once __DIR__."/../vendor/autoload.php"; // // _echo("业务进程({$worker_id})启动成功,body"); // $run_timer = time(); // // 循环阻塞 // while (true){ // // 运行超过1天的 停止 // if($run_timer < (time()-21600)){ // break; // } // // 需要同步的id // $id = redis()->lPop('sync_email_body'); // // if(!$id){ // co::sleep(1); // }else{ // // 占用当前的id,占用2小时 // if(redis()->add('just_sync_body_'.$id['lists_id'],time(),600)){ // // 启动一个协程 // go(function () use ($id){ // // // 开始同步 // try { // sync_body($id); // }catch (\Throwable $e){ //// _echo($e->getMessage()); // logs( // $e->getMessage().PHP_EOL.$e->getTraceAsString(), // LOG_PATH.'/'.$id['email_id'].'.log' // ); // } // // // 协程完成后执行的函数 // co::defer(function () use ($id){ //// _echo('正常关闭进程('.$worker_id.')下的协程('.co::getCid().')'); // // 消除占用 // redis()->delete('just_sync_body_'.$id['lists_id']); // // 写入日志 // \Lib\Log::getInstance()->write(); // // }); // // }); // } // } // // } // // },true); // 启动管理器 $pm->start(); } /** * 同步内容 body * @param $id * @param $worker_id * @return int * @author:dc * @time 2023/3/23 10:18 */ function sync_body($id){ // 是否有数据 if(db()->count(\Model\bodySql::has((int) $id['lists_id']))){ return 0; } $email = db()->first(\Model\emailSql::first($id['email_id'])); if(!$email){ return 0; } if($email['pwd_error']){ return 1; } $mailServer = new Lib\Mail\Mail($email['email'],base64_decode($email['password']),$email['imap']); // 登录服务器 if($mailServer->login()!=1){ return 2; } // $mailServer->client->debug(true,LOG_PATH.'/'.$id['email_id'].'body/'); // 同步 body $mailServer->syncBody($id['folder'],$id['uid'],$id['lists_id'],db()); $mailServer = null; return 0; } /** * 开始同步, 这里是主要的业务代码 * @param $email_id * @param $worker_id * @return int * @author:dc * @time 2023/3/10 10:19 */ function sync($email_id){ $email = db()->first(\Model\emailSql::first($email_id)); if(!$email){ return 0; } if($email['pwd_error']){ return 1; } $mailServer = new Lib\Mail\Mail($email['email'],base64_decode($email['password']),$email['imap']); // 登录服务器 if($mailServer->login()!==1){ return 2; } // $mailServer->client->debug(true,LOG_PATH.'/'.$email_id.'/'); // 同步文件夹 $mailServer->syncFolder($email_id); _echo('文件夹同步成功-'.$email_id); // 读取到邮箱中的文件夹 $folders = db()->all(\Model\folderSql::all($email['id'])); if(!$folders){ return 3; } $call = function ($email_id,$folder_id,$origin_folder) use ($mailServer){ // gmail 邮箱 这个是不可选的 if($origin_folder == '[Gmail]'){ return; } // 同步父文件夹 $result = $mailServer->syncMail($email_id,$folder_id,$origin_folder); if(is_array($result) && $result){ _echo($email_id.' 同步文件夹('.$origin_folder.')邮件列表 '.count($result)); } }; // $folders = list_to_tree($folders); foreach ($folders as $folder){ try { if(empty($folder['_child'])){ $call($email_id,$folder['id'],$folder['origin_folder']); }else{ foreach ($folder['_child'] as $item){ // 同步子文件夹 $call($email_id,$item['id'],$item['origin_folder']); } } }catch (\Throwable $e){ logs( $e->getMessage().$e->getTraceAsString(), LOG_PATH.'/imap/'.$email['email'].'.error.log' ); } } $email = null; $mailServer = null; } if(!function_exists("imap_8bit")){ echo '请安装imap扩展'; exit(0); } start();