作者 邓超

预热邮件

1 <?php 1 <?php
2 2
  3 +//error_reporting();
3 4
4 -swoole_set_process_name('php-email-sync-list'); 5 +use Swoole\Process;
5 6
6 -$pm = new \Swoole\Process\Manager();  
7 7
8 8
9 -$pm->addBatch(60,function ($work_id) { 9 +function start(){
10 10
11 - include_once __DIR__."/../vendor/autoload.php"; 11 +// 删除停止运行的值
  12 +// redis()->delete(SYNC_RUNNING_REDIS_KEY,'email_sync_stop_num');
12 13
13 - $number = 0;  
14 - while (true){  
15 - if($number > 500){ break; } 14 + // 进程管理器
  15 + $pm = new Process\Manager();
16 16
17 - // 需要同步的id  
18 - $id = redis()->lPop('sync_email_lists'); 17 + // 启动业务进程
  18 + $pm->addBatch(10,function (Process\Pool $pool, int $worker_id){
19 19
20 - if($id && is_numeric($id)){  
21 - // 占用当前的id,占用2小时  
22 - if(redis()->add('just_sync_'.$id,time(),600)){  
23 - $number++; 20 + swoole_set_process_name('php-email-sync-list-'.$worker_id);
24 21
25 - try{  
26 - // 开始同步  
27 - $email = db()->cache(3600)->first(\Model\emailSql::first($id));  
28 - if($email){  
29 - $sync = new \Service\SyncMail($email);  
30 - $search = new \Lib\Imap\ImapSearch();  
31 - // 第一次同步 只同步当天的  
32 - if(!db()->cache(600)->count(\Model\listsSql::first('`id` > 0'))){  
33 - $sync->search($search->dateGt($email['created_at']));  
34 - }else{ 22 + include_once __DIR__."/../vendor/autoload.php";
  23 + _echo("业务进程({$worker_id})启动成功");
35 24
36 - if(strtotime("-2 day") > strtotime($email['created_at'])){  
37 - $sync->search(  
38 - $search->dateGt(  
39 - date('Y-m-d',  
40 - strtotime("-2 day")  
41 - )  
42 - )  
43 - );  
44 - }else{ 25 + $goNum = 0;
  26 + // 循环阻塞
  27 + while (true){
  28 + while ($goNum > 50){
  29 + co::sleep(0.3);
  30 + continue;
  31 + }
  32 + // 需要同步的id
  33 + $id = redis()->lPop('sync_email_lists');
  34 +
  35 + if($id && is_numeric($id)){
  36 + // 占用当前的id,占用2小时
  37 + if(redis()->add('just_sync_'.$id,time(),600)){
  38 + // 启动一个协程
  39 + go(function () use ($id,&$goNum){
  40 + $goNum++;
  41 + try{
  42 + // 开始同步
  43 + $email = db()->cache(3600)->first(\Model\emailSql::first($id));
  44 + if($email){
  45 + $sync = new \Service\SyncMail($email);
  46 + // ai邮件只同步2天内的
45 $sync->search( 47 $sync->search(
46 - $search->dateGt($email['created_at']) 48 + (new \Lib\Imap\ImapSearch())
  49 + ->dateGt(date('Y-m-d',strtotime("-1 day")))
47 ); 50 );
  51 + $sync->sync();
  52 +
  53 + $sync = null;
  54 + unset($sync);
48 } 55 }
49 56
  57 + }catch (Throwable $e){
  58 + logs('sync : '.$e->getMessage());
50 } 59 }
51 60
52 - $sync->sync();  
53 61
54 - $sync = null;  
55 - unset($sync);  
56 - } 62 + // 协程完成后执行的函数
  63 + co::defer(function () use ($id,&$goNum){
  64 + $goNum--;
  65 + // 30秒后 消除占用
  66 + redis()->expire('just_sync_'.$id,120);
  67 + // 写入日志
  68 + \Lib\Log::getInstance()->write();
  69 + });
57 70
58 - }catch (Throwable $e){  
59 - logs('sync : '.$e->getMessage()); 71 + });
60 } 72 }
  73 + }
  74 +
  75 + //每次都暂停1秒,防止同一时间启动太多的任务
  76 + co::sleep(0.1);
  77 +
61 78
62 - // 30秒后 消除占用  
63 - redis()->expire('just_sync_'.$id,120);  
64 79
65 - }  
66 - }  
67 - else{  
68 - sleep(1);  
69 } 80 }
70 81
71 - }  
72 -}); 82 + },true);
  83 +
73 84
  85 + // 启动管理器
  86 + $pm->start();
74 87
75 -$pm->start(); 88 +}
76 89
77 90
78 91
79 92
  93 +start();
80 94
81 95
82 96
@@ -29,35 +29,53 @@ function stop(){ @@ -29,35 +29,53 @@ function stop(){
29 } 29 }
30 } 30 }
31 31
32 -while (1){ 32 +\Co\run(function (){
  33 + $goNum = 0;
  34 + while (1){
33 35
34 - $id = redis()->lPop('sync_email_lists_my');  
35 - redis()->set('sync_my_pid:'.getmypid(),time(),86400); 36 + if($goNum>=50){
  37 + co::sleep(1);
  38 + continue;
  39 + }
36 40
37 - stop(); 41 + $id = redis()->lPop('sync_email_lists_my');
  42 + redis()->set('sync_my_pid:'.getmypid(),time(),86400);
38 43
39 - // _echo('读取到'.$id);  
40 - if($id && is_numeric($id)){  
41 - // 占用当前的id,占用2小时  
42 - if(redis()->add('just_sync_'.$id,time(),600)){ 44 + stop();
43 45
44 - try{  
45 - // 开始同步  
46 - (new \Service\SyncMail($id))->sync();  
47 - }catch (Throwable $e){  
48 - _echo($e->getMessage());  
49 - }  
50 - // 30秒后 消除占用  
51 - redis()->expire('just_sync_'.$id,30); 46 + // _echo('读取到'.$id);
  47 + if($id && is_numeric($id)){
  48 + // 占用当前的id,占用2小时
  49 + if(redis()->add('just_sync_'.$id,time(),600)){
  50 +
  51 + go(function ($id) use (&$goNum){
  52 + $goNum++;
  53 + try{
  54 + // 开始同步
  55 + (new \Service\SyncMail($id))->sync();
  56 + }catch (Throwable $e){
  57 + _echo($e->getMessage());
  58 + }
  59 +
  60 + co::defer(function () use ($id,&$goNum){
  61 + $goNum--;
  62 + // 30秒后 消除占用
  63 + redis()->expire('just_sync_'.$id,30);
52 64
53 - \Lib\Log::getInstance()->write(); 65 + \Lib\Log::getInstance()->write();
54 66
  67 + });
  68 + },$id);
  69 +
  70 +
  71 + }
  72 + }else{
  73 + co::sleep(1);
55 } 74 }
56 - }else{  
57 - sleep(1); 75 +
58 } 76 }
  77 +});
59 78
60 -}  
61 79
62 80
63 81