作者 邓超

x

@@ -3,76 +3,49 @@ @@ -3,76 +3,49 @@
3 //error_reporting(); 3 //error_reporting();
4 include_once __DIR__."/../vendor/autoload.php"; 4 include_once __DIR__."/../vendor/autoload.php";
5 5
6 -@file_put_contents('sync_run_my.log',"进程启动 ".getmypid(),FILE_APPEND);  
7 -  
8 swoole_set_process_name('php-email-sync-list-my'); 6 swoole_set_process_name('php-email-sync-list-my');
9 -//TODO:: 不知道为什么,隔断时间mysql会连不上  
10 -\Co\run(function (){  
11 - $start_time = time();  
12 - $goNum = 0;  
13 -// 循环阻塞  
14 - while (true){  
15 -  
16 - if (time() - $start_time > 43200){  
17 - @file_put_contents('sync_run_my.log',"进程停止 ".system("kill ".getmypid()),FILE_APPEND);  
18 - break;  
19 - }  
20 7
21 - if($goNum > 50){  
22 - co::sleep(0.5);  
23 - continue;  
24 - } 8 +$pm = new \Swoole\Process\Manager();
  9 +
25 10
26 - // 需要同步的id 11 +$pm->addBatch(30,function ($work_id){
  12 +
  13 + $number = 0;
  14 +
  15 + while (1){
  16 + if($number>500){
  17 + break;
  18 + }
27 $id = redis()->lPop('sync_email_lists_my'); 19 $id = redis()->lPop('sync_email_lists_my');
28 20
29 if($id && is_numeric($id)){ 21 if($id && is_numeric($id)){
30 // 占用当前的id,占用2小时 22 // 占用当前的id,占用2小时
31 if(redis()->add('just_sync_'.$id,time(),600)){ 23 if(redis()->add('just_sync_'.$id,time(),600)){
32 - // 启动一个协程  
33 - go(function () use ($id,&$goNum){  
34 - $goNum++; 24 + $number++;
35 try{ 25 try{
36 // 开始同步 26 // 开始同步
37 $email = db()->cache(3600)->first(\Model\emailSql::first($id)); 27 $email = db()->cache(3600)->first(\Model\emailSql::first($id));
38 if($email){ 28 if($email){
39 (new \Service\SyncMail($email))->sync(); 29 (new \Service\SyncMail($email))->sync();
40 -  
41 } 30 }
42 31
43 }catch (Throwable $e){ 32 }catch (Throwable $e){
44 logs('sync : '.$e->getMessage()); 33 logs('sync : '.$e->getMessage());
45 } 34 }
46 35
47 -  
48 - // 协程完成后执行的函数  
49 - co::defer(function () use ($id,&$goNum){  
50 - $goNum--;  
51 // 30秒后 消除占用 36 // 30秒后 消除占用
52 redis()->expire('just_sync_'.$id,120); 37 redis()->expire('just_sync_'.$id,120);
53 - // 写入日志  
54 - \Lib\Log::getInstance()->write();  
55 - });  
56 -  
57 - });  
58 } 38 }
  39 + }else{
  40 + sleep(1);
59 } 41 }
60 -  
61 - //每次都暂停1秒,防止同一时间启动太多的任务  
62 - co::sleep(0.1);  
63 -  
64 -  
65 } 42 }
66 43
67 -// while ($goNum > 0){  
68 -// if (time() - $start_time > 24060){ break; }  
69 -// co::sleep(1);  
70 -// }  
71 - 44 + _echo('子进程即将推出');
72 45
73 }); 46 });
74 47
75 - 48 +$pm->start();
76 49
77 50
78 51