模擬 Java 線程:SimpleFork
SimpleFork
simple-fork-php 是基于 PCNTL 擴展的進程管理包,接口類似與 Java 的 Thread 和 Runnable
為什么要寫 SimpleFork
多進程程序的編寫相比較多線程編寫更加復雜,需要考慮進程回收、同步、互斥、通信等問題。對于初學者來說,處理上述問題會比較困難。尤其是信號處理和進程通信這塊,很難做到不出問題。 SimpleFork提供一套類似于JAVA多線程的進程控制接口,提供回收、同步、互斥、通信等方案,開發者可以關注業務問題,不需要過多考慮進程控制。
引入
composer require jenner/simple_fork
require path/to/SimpleFork/autoload.php
依賴
必須
-
ext-pcntl 進程控制
可選
-
ext-sysvmsg 消息隊列
-
ext-sysvsem 同步互斥鎖
-
ext-sysvshm 共享內存
特性
-
提供進程池
-
自動處理僵尸進程回收,支持無阻塞調用
-
提供共享內存、System V 消息隊列、Semaphore鎖,方便IPC通信(進程通信)
-
提供Process和Runnable兩種方式實現進程
-
可以實時獲取到進程狀態
-
shutdown所有進程或單獨stop一個進程時,可以注冊覆蓋beforeExit()方法,返回true則退出,false繼續運行(在某些場景,進程不能立即退出)
-
支持子進程運行時reload
注意事項
-
System V 消息隊列由于在程序退出時可能存在尚未處理完的數據,所以不會銷毀。如果需要銷毀,請調用$queue->remove()方法刪除隊列
-
共享內存會在所有進程退出后刪除
-
Semaphore對象會在對象回收時進行銷毀
-
進程池start()后,需要調用wait()進行僵尸進程回收,可以無阻塞調用
-
獲取進程狀態(調用isAlive()方法)前,最好調用一個無阻塞的wait(false)進行一次回收,由于進程運行狀態的判斷不是原子操作,所以isAlive()方法不保證與實際狀態完全一致
-
如果你不清楚在什么情況下需要在程序的最開始加入declare(ticks=1);,那么最好默認第一行都加入這段聲明。
如何使用declare(ticks=1);
-
declare(ticks=1); 這段聲明用于進程信號處理。如果注冊了信號處理器,程序會沒執行一行代碼后自動檢查是否有尚未處理的信號。http://php.net/manual/zh/control-structures.declare.php
TODO
-
提供更多功能的進程池,模仿java
-
提供第三方進程通信機制(Redis等)
-
更多的測試及示例程序
示例程序
更多示例程序見exmples目錄
simple.php
class TestRunnable extends \Jenner\SimpleFork\Runnable{ /** * 進程執行入口 * @return mixed */ public function run() { echo "I am a sub process" . PHP_EOL; } } $process = new \Jenner\SimpleFork\Process(new TestRunnable()); $process->start();
shared_memory.php
class Producer extends \Jenner\SimpleFork\Process{ public function run(){ for($i = 0; $i<10; $i++){ $this->cache->set($i, $i); echo "set {$i} : {$i}" . PHH_EOL; } } } class Worker extends \Jenner\SimpleFork\Process{ public function run(){ sleep(5); for($i=0; $i<10; $i++){ echo "get {$i} : " . $this->cache->get($i) . PHP_EOL; } } } $memory = new \Jenner\SimpleFork\IPC\SharedMemory(); $producer = new Producer(); $producer->setCache($memory); $worker = new Worker(); $worker->setCache($memory); $pool = new \Jenner\SimpleFork\Pool(); $pool->submit($producer); $pool->submit($worker); $pool->start(); $pool->wait();