作者 邓超

x

正在显示 1 个修改的文件 包含 64 行增加92 行删除
1 <?php 1 <?php
2 2
3 //error_reporting(); 3 //error_reporting();
  4 +include_once __DIR__."/../vendor/autoload.php";
4 5
5 -use Swoole\Process;  
6 6
  7 +swoole_set_process_name('php-email-sync-list');
7 8
  9 +\Co\run(function (){
8 10
9 -function start(){ 11 + $goNum = 0;
  12 +// 循环阻塞
  13 + while (true){
10 14
11 -// 删除停止运行的值  
12 -// redis()->delete(SYNC_RUNNING_REDIS_KEY,'email_sync_stop_num');  
13 -  
14 - // 进程管理器  
15 - $pm = new Process\Manager();  
16 -  
17 - // 启动业务进程  
18 - $pm->addBatch(10,function (Process\Pool $pool, int $worker_id){  
19 -  
20 - swoole_set_process_name('php-email-sync-list-'.$worker_id);  
21 -  
22 - include_once __DIR__."/../vendor/autoload.php";  
23 - _echo("业务进程({$worker_id})启动成功");  
24 -  
25 - $goNum = 0;  
26 - $start_time = time();  
27 - // 循环阻塞  
28 - while (true){  
29 - // 每执行 12小时 就重启任务  
30 - if((time()-$start_time) > 43200){  
31 - if($goNum < 1 || (time()-$start_time) > 43300) break;  
32 - co::sleep(1);  
33 - continue;  
34 - }  
35 - if($goNum > 50){  
36 - co::sleep(0.5);  
37 - continue;  
38 -// break;  
39 - } 15 + if($goNum > 50){
  16 + co::sleep(0.5);
  17 + continue;
  18 + }
40 19
41 - // 需要同步的id  
42 - $id = redis()->lPop('sync_email_lists');  
43 -  
44 - if($id && is_numeric($id)){  
45 - // 占用当前的id,占用2小时  
46 - if(redis()->add('just_sync_'.$id,time(),600)){  
47 - // 启动一个协程  
48 - go(function () use ($id,&$goNum){  
49 - $goNum++;  
50 - try{  
51 - // 开始同步  
52 - $email = db()->cache(3600)->first(\Model\emailSql::first($id));  
53 - if($email){  
54 - $sync = new \Service\SyncMail($email);  
55 - $search = new \Lib\Imap\ImapSearch();  
56 - // 第一次同步 只同步当天的  
57 - if(!db()->cache(600)->count(\Model\listsSql::first('`id` > 0'))){  
58 - $sync->search($search->dateGt($email['created_at']));  
59 - }else{  
60 - // 是否是ai邮箱  
61 - if(db()->count("select count(*) from `hot_mail` where `email` = '{$email['email']}'")){  
62 - // ai邮件只同步2天内的  
63 - if(strtotime("-2 day") > strtotime($email['created_at'])){  
64 - $sync->search(  
65 - $search->dateGt(  
66 - date('Y-m-d',  
67 - strtotime("-2 day")  
68 - ) 20 + // 需要同步的id
  21 + $id = redis()->lPop('sync_email_lists');
  22 +
  23 + if($id && is_numeric($id)){
  24 + // 占用当前的id,占用2小时
  25 + if(redis()->add('just_sync_'.$id,time(),600)){
  26 + // 启动一个协程
  27 + go(function () use ($id,&$goNum){
  28 + $goNum++;
  29 + try{
  30 + // 开始同步
  31 + $email = db()->cache(3600)->first(\Model\emailSql::first($id));
  32 + if($email){
  33 + $sync = new \Service\SyncMail($email);
  34 + $search = new \Lib\Imap\ImapSearch();
  35 + // 第一次同步 只同步当天的
  36 + if(!db()->cache(600)->count(\Model\listsSql::first('`id` > 0'))){
  37 + $sync->search($search->dateGt($email['created_at']));
  38 + }else{
  39 + // 是否是ai邮箱
  40 + if(db()->count("select count(*) from `hot_mail` where `email` = '{$email['email']}'")){
  41 + // ai邮件只同步2天内的
  42 + if(strtotime("-2 day") > strtotime($email['created_at'])){
  43 + $sync->search(
  44 + $search->dateGt(
  45 + date('Y-m-d',
  46 + strtotime("-2 day")
69 ) 47 )
70 - );  
71 - }else{  
72 - $sync->search(  
73 - $search->dateGt($email['created_at'])  
74 - );  
75 - }  
76 - 48 + )
  49 + );
  50 + }else{
  51 + $sync->search(
  52 + $search->dateGt($email['created_at'])
  53 + );
77 } 54 }
78 - }  
79 -  
80 - $sync->sync();  
81 55
82 - $sync = null;  
83 - unset($sync); 56 + }
84 } 57 }
85 58
86 - }catch (Throwable $e){  
87 - logs('sync : '.$e->getMessage()); 59 + $sync->sync();
  60 +
  61 + $sync = null;
  62 + unset($sync);
88 } 63 }
89 64
  65 + }catch (Throwable $e){
  66 + logs('sync : '.$e->getMessage());
  67 + }
90 68
91 - // 协程完成后执行的函数  
92 - co::defer(function () use ($id,&$goNum){  
93 - $goNum--;  
94 - // 30秒后 消除占用  
95 - redis()->expire('just_sync_'.$id,120);  
96 - // 写入日志  
97 - \Lib\Log::getInstance()->write();  
98 - });  
99 69
  70 + // 协程完成后执行的函数
  71 + co::defer(function () use ($id,&$goNum){
  72 + $goNum--;
  73 + // 30秒后 消除占用
  74 + redis()->expire('just_sync_'.$id,120);
  75 + // 写入日志
  76 + \Lib\Log::getInstance()->write();
100 }); 77 });
101 - }  
102 - }  
103 -  
104 - //每次都暂停1秒,防止同一时间启动太多的任务  
105 - co::sleep(0.1);  
106 -  
107 78
  79 + });
  80 + }
108 } 81 }
109 82
110 - },true); 83 + //每次都暂停1秒,防止同一时间启动太多的任务
  84 + co::sleep(0.1);
111 85
112 86
113 - // 启动管理器  
114 - $pm->start(); 87 + }
115 88
116 -}  
117 89
  90 +});
118 91
119 92
120 93
121 -start();  
122 94
123 95
124 96