审查视图

1  
邓超 authored
1 2 3 4 5 6 7 8 9 10 11
<?php

//error_reporting();

use Swoole\Process;



function start(){

// 删除停止运行的值
邓超 authored
12
//    redis()->delete(SYNC_RUNNING_REDIS_KEY,'email_sync_stop_num');
1  
邓超 authored
13 14 15 16 17

    // 进程管理器
    $pm = new Process\Manager();

    // 启动业务进程
邓超 authored
18
    $pm->addBatch(10,function (Process\Pool $pool, int $worker_id){
邓超 authored
19
邓超 authored
20
        swoole_set_process_name('php-email-sync-list-'.$worker_id);
1  
邓超 authored
21
邓超 authored
22
        include_once __DIR__."/../vendor/autoload.php";
邓超 authored
23
        _echo("业务进程({$worker_id})启动成功");
邓超 authored
24
邓超 authored
25
        $run_timer  = time();
1  
邓超 authored
26 27
        // 循环阻塞
        while (true){
邓超 authored
28 29

            // 运行超过1天的 停止
邓超 authored
30
            if($run_timer < (time()-21600)){
邓超 authored
31 32 33
                break;
            }
x  
邓超 authored
34 35
            // 需要同步的id
            $id = redis()->lPop('sync_email_lists');
1  
邓超 authored
36
x  
邓超 authored
37
            if($id && is_numeric($id)){
x  
邓超 authored
38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54
                // 占用当前的id,占用2小时
                if(redis()->add('just_sync_'.$id,time(),600)){
                    // 启动一个协程
                    go(function () use ($id){

                        // 开始同步
                        try {
                            sync($id);
                        }catch (\Throwable $e){
                            logs(
                                $e->getMessage().PHP_EOL.$e->getTraceAsString(),
                                LOG_PATH.'/sync/'.$id.'.log'
                            );
                        }

                        // 协程完成后执行的函数
                        co::defer(function () use ($id){
x  
邓超 authored
55 56
                            // 30秒后 消除占用
                            redis()->expire('just_sync_'.$id,30);
x  
邓超 authored
57 58 59
                            // 写入日志
                            \Lib\Log::getInstance()->write();
                        });
1  
邓超 authored
60
x  
邓超 authored
61 62
                    });
                }
x  
邓超 authored
63 64
            }else{
                co::sleep(1);
1  
邓超 authored
65
            }
x  
邓超 authored
66 67
            //每次都暂停1秒,防止同一时间启动太多的任务
            co::sleep(0.5);
1  
邓超 authored
68
        }
1  
邓超 authored
69 70

    },true);
1  
邓超 authored
71
1  
邓超 authored
72
    // 启动一个同步内容的进程
x  
邓超 authored
73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125
//    $pm->add(function (Process\Pool $pool, int $worker_id){
//
//        swoole_set_process_name('php-email-sync-body-'.$worker_id);
//
//        include_once __DIR__."/../vendor/autoload.php";
//
//        _echo("业务进程({$worker_id})启动成功,body");
//        $run_timer  = time();
//        // 循环阻塞
//        while (true){
//            // 运行超过1天的 停止
//            if($run_timer < (time()-21600)){
//                break;
//            }
//            // 需要同步的id
//            $id = redis()->lPop('sync_email_body');
//
//            if(!$id){
//                co::sleep(1);
//            }else{
//                // 占用当前的id,占用2小时
//                if(redis()->add('just_sync_body_'.$id['lists_id'],time(),600)){
//                    // 启动一个协程
//                    go(function () use ($id){
//
//                        // 开始同步
//                        try {
//                            sync_body($id);
//                        }catch (\Throwable $e){
////                            _echo($e->getMessage());
//                            logs(
//                                $e->getMessage().PHP_EOL.$e->getTraceAsString(),
//                                LOG_PATH.'/'.$id['email_id'].'.log'
//                            );
//                        }
//
//                        // 协程完成后执行的函数
//                        co::defer(function () use ($id){
////                        _echo('正常关闭进程('.$worker_id.')下的协程('.co::getCid().')');
//                            // 消除占用
//                            redis()->delete('just_sync_body_'.$id['lists_id']);
//                            // 写入日志
//                            \Lib\Log::getInstance()->write();
//
//                        });
//
//                    });
//                }
//            }
//
//        }
//
//    },true);
1  
邓超 authored
126 127 128 129 130 131

    // 启动管理器
    $pm->start();

}
1  
邓超 authored
132 133 134 135 136 137 138 139
/**
 * 同步内容 body
 * @param $id
 * @param $worker_id
 * @return int
 * @author:dc
 * @time 2023/3/23 10:18
 */
x  
邓超 authored
140
function sync_body($id){
1  
邓超 authored
141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158

    // 是否有数据
    if(db()->count(\Model\bodySql::has((int) $id['lists_id']))){
        return 0;
    }

    $email = db()->first(\Model\emailSql::first($id['email_id']));
    if(!$email){
        return 0;
    }

    if($email['pwd_error']){
        return 1;
    }

    $mailServer = new Lib\Mail\Mail($email['email'],base64_decode($email['password']),$email['imap']);

    // 登录服务器
x  
邓超 authored
159
    if($mailServer->login()!=1){
1  
邓超 authored
160 161
        return 2;
    }
x  
邓超 authored
162
//    $mailServer->client->debug(true,LOG_PATH.'/'.$id['email_id'].'body/');
1  
邓超 authored
163 164 165 166 167 168 169 170 171 172

    // 同步 body
    $mailServer->syncBody($id['folder'],$id['uid'],$id['lists_id'],db());

    $mailServer = null;

    return 0;

}
1  
邓超 authored
173 174 175 176 177 178 179 180 181

/**
 * 开始同步, 这里是主要的业务代码
 * @param $email_id
 * @param $worker_id
 * @return int
 * @author:dc
 * @time 2023/3/10 10:19
 */
x  
邓超 authored
182
function sync($email_id){
1  
邓超 authored
183 184 185 186 187 188 189 190 191 192

    $email = db()->first(\Model\emailSql::first($email_id));
    if(!$email){
        return 0;
    }

    if($email['pwd_error']){
        return 1;
    }
1  
邓超 authored
193
    $mailServer = new Lib\Mail\Mail($email['email'],base64_decode($email['password']),$email['imap']);
1  
邓超 authored
194
x  
邓超 authored
195
    // 登录服务器
邓超 authored
196
    if($mailServer->login()!==1){
1  
邓超 authored
197 198 199
        return 2;
    }
x  
邓超 authored
200
//    $mailServer->client->debug(true,LOG_PATH.'/'.$email_id.'/');
1  
邓超 authored
201
1  
邓超 authored
202
    // 同步文件夹
x  
邓超 authored
203 204
    $mailServer->syncFolder($email_id);
    _echo('文件夹同步成功-'.$email_id);
1  
邓超 authored
205 206 207 208 209 210

    // 读取到邮箱中的文件夹
    $folders = db()->all(\Model\folderSql::all($email['id']));
    if(!$folders){
        return 3;
    }
x  
邓超 authored
211 212

    $call = function ($email_id,$folder_id,$origin_folder) use ($mailServer){
x  
邓超 authored
213 214 215 216
        // gmail 邮箱 这个是不可选的
        if($origin_folder == '[Gmail]'){
            return;
        }
x  
邓超 authored
217 218
        // 同步父文件夹
        $result = $mailServer->syncMail($email_id,$folder_id,$origin_folder);
x  
邓超 authored
219 220 221 222
        if(is_array($result) && $result){
            _echo($email_id.' 同步文件夹('.$origin_folder.')邮件列表 '.count($result));
        }
x  
邓超 authored
223 224 225
    };

//    $folders = list_to_tree($folders);
1  
邓超 authored
226 227
    foreach ($folders as $folder){
        try {
x  
邓超 authored
228 229 230 231 232
            $is = true;
            foreach ($folders as $f){
                // 是否存在下级
                if($f['pid'] == $folder['id']){
                    $is = false;
1  
邓超 authored
233 234 235
                }
            }
x  
邓超 authored
236 237
            if($is) $call($email_id,$folder['id'],$folder['origin_folder']);
1  
邓超 authored
238
        }catch (\Throwable $e){
1  
邓超 authored
239
            logs(
1  
邓超 authored
240
                $e->getMessage().$e->getTraceAsString(),
1  
邓超 authored
241 242 243 244 245 246 247 248 249 250 251
                LOG_PATH.'/imap/'.$email['email'].'.error.log'
            );
        }
    }


    $email = null;
    $mailServer = null;
}

x  
邓超 authored
252
if(!function_exists("imap_8bit")){
x  
邓超 authored
253 254 255 256
    echo '请安装imap扩展';
    exit(0);
}
1  
邓超 authored
257 258

x  
邓超 authored
259
start();
1  
邓超 authored
260 261 262 263 264 265 266 267 268