国产片侵犯亲女视频播放_亚洲精品二区_在线免费国产视频_欧美精品一区二区三区在线_少妇久久久_在线观看av不卡

服務器之家:專注于服務器技術及軟件下載分享
分類導航

PHP教程|ASP.NET教程|Java教程|ASP教程|編程技術|正則表達式|C/C++|IOS|C#|Swift|Android|VB|R語言|JavaScript|易語言|vb.net|

服務器之家 - 編程語言 - PHP教程 - PHP中使用協同程序實現合作多任務

PHP中使用協同程序實現合作多任務

2020-05-07 13:32PHP教程網 PHP教程

這篇文章指導你通過使用協同程序來實施任務調度,通過實例實現對技術的理解。我將在前三節做一個簡單的背景介紹。如果你已經有了比較好的基礎,可以直接跳到“協同多任務處理”一節

PHP5.5一個比較好的新功能是實現對生成器和協同程序的支持。對于生成器,PHP的文檔和各種其他的博客文章(就像這一個或這一個)已經有了非常詳細的講解。協同程序相對受到的關注就少了,所以協同程序雖然有很強大的功能但也很難被知曉,解釋起來也比較困難。

這篇文章指導你通過使用協同程序來實施任務調度,通過實例實現對技術的理解。我將在前三節做一個簡單的背景介紹。如果你已經有了比較好的基礎,可以直接跳到“協同多任務處理”一節。

生成器

生成器最基本的思想也是一個函數,這個函數的返回值是依次輸出,而不是只返回一個單獨的值。或者,換句話說,生成器使你更方便的實現了迭代器接口。下面通過實現一個xrange函數來簡單說明:

 

復制代碼 代碼如下:


<?php
function xrange($start, $end, $step = 1) {
    for ($i = $start; $i <= $end; $i += $step) {
        yield $i;
    }
}

 

foreach (xrange(1, 1000000) as $num) {
    echo $num, "\n";
}

 

上面這個xrange()函數提供了和PHP的內建函數range()一樣的功能。但是不同的是range()函數返回的是一個包含屬組值從1到100萬的數組(注:請查看手冊)。而xrange()函數返回的是依次輸出這些值的一個迭代器,而且并不會真正以數組形式計算。

這種方法的優點是顯而易見的。它可以讓你在處理大數據集合的時候不用一次性的加載到內存中。甚至你可以處理無限大的數據流。

當然,也可以不同通過生成器來實現這個功能,而是可以通過繼承Iterator接口實現。通過使用生成器實現起來會更方便,而不用再去實現iterator接口中的5個方法了。

生成器為可中斷的函數
要從生成器認識協同程序,理解它們內部是如何工作的非常重要:生成器是可中斷的函數,在它里面,yield構成了中斷點。 

緊接著上面的例子,如果你調用xrange(1,1000000)的話,xrange()函數里代碼沒有真正地運行。相反,PHP只是返回了一個實現了迭代器接口的 生成器類實例: 
 

復制代碼 代碼如下:

<?php
$range = xrange(1, 1000000);
var_dump($range); // object(Generator)#1
var_dump($range instanceof Iterator); // bool(true)

 

你對某個對象調用迭代器方法一次,其中的代碼運行一次。例如,如果你調用$range->rewind(),那么xrange()里的代碼運行到控制流 第一次出現yield的地方。在這種情況下,這就意味著當$i=$start時yield $i才運行。傳遞給yield語句的值是使用$range->current()獲取的。

 為了繼續執行生成器中的代碼,你必須調用$range->next()方法。這將再次啟動生成器,直到yield語句出現。因此,連續調用next()和current()方法 你將能從生成器里獲得所有的值,直到某個點沒有再出現yield語句。對xrange()來說,這種情形出現在$i超過$end時。在這中情況下, 控制流將到達函數的終點,因此將不執行任何代碼。一旦這種情況發生,vaild()方法將返回假,這時迭代結束。

協程

協程給上面功能添加的主要東西是回送數據給生成器的能力。這將把生成器到調用者的單向通信轉變為兩者之間的雙向通信。
通過調用生成器的send()方法而不是其next()方法傳遞數據給協程。下面的logger()協程是這種通信如何運行的例子: 

 

復制代碼 代碼如下:


<?php

 

function logger($fileName) {
    $fileHandle = fopen($fileName, 'a');
    while (true) {
        fwrite($fileHandle, yield . "\n");
    }
}

$logger = logger(__DIR__ . '/log');
$logger->send('Foo');
$logger->send('Bar')

 

正如你能看到,這兒yield沒有作為一個語句來使用,而是用作一個表達式。即它有一個返回值。yield的返回值是傳遞給send()方法的值。 在這個例子里,yield將首先返回"Foo",然后返回"Bar"。

上面的例子里yield僅作為接收者。混合兩種用法是可能的,即既可接收也可發送。接收和發送通信如何進行的例子如下:

 

復制代碼 代碼如下:


<?php

 

function gen() {
    $ret = (yield 'yield1');
    var_dump($ret);
    $ret = (yield 'yield2');
    var_dump($ret);
}

$gen = gen();
var_dump($gen->current());    // string(6) "yield1"
var_dump($gen->send('ret1')); // string(4) "ret1"   (the first var_dump in gen)
                              // string(6) "yield2" (the var_dump of the ->send() return value)
var_dump($gen->send('ret2')); // string(4) "ret2"   (again from within gen)
                              // NULL               (the return value of ->send())

 

馬上理解輸出的精確順序有點困難,因此確定你知道為什按照這種方式輸出。我愿意特別指出的有兩點:第一點,yield表達式兩邊使用 圓括號不是偶然。由于技術原因(雖然我已經考慮為賦值增加一個異常,就像Python那樣),圓括號是必須的。第二點,你可能已經注意到 調用current()之前沒有調用rewind()。如果是這么做的,那么已經隱含地執行了rewind操作。 

多任務協作

如果閱讀了上面的logger()例子,那么你認為“為了雙向通信我為什么要使用協程呢? 為什么我不能只用常見的類呢?”,你這么問完全正確。上面的例子演示了基本用法,然而上下文中沒有真正的展示出使用協程的優點。這就是列舉許多協程例子的理由。正如上面介紹里提到的,協程是非常強大的概念,不過這樣的應用很稀少而且常常十分復雜。給出一些簡單而真實的例子很難。

在這篇文章里,我決定去做的是使用協程實現多任務協作。我們盡力解決的問題是你想并發地運行多任務(或者“程序”)。不過處理器在一個時刻只能運行一個任務(這篇文章的目標是不考慮多核的)。因此處理器需要在不同的任務之間進行切換,而且總是讓每個任務運行 “一小會兒”。 

多任務協作這個術語中的“協作”說明了如何進行這種切換的:它要求當前正在運行的任務自動把控制傳回給調度器,這樣它就可以運行其他任務了。這與“搶占”多任務相反,搶占多任務是這樣的:調度器可以中斷運行了一段時間的任務,不管它喜歡還是不喜歡。協作多任務在Windows的早期版本(windows95)和Mac OS中有使用,不過它們后來都切換到使用搶先多任務了。理由相當明確:如果你依靠程序自動傳回 控制的話,那么壞行為的軟件將很容易為自身占用整個CPU,不與其他任務共享。 

這個時候你應當明白協程和任務調度之間的聯系:yield指令提供了任務中斷自身的一種方法,然后把控制傳遞給調度器。因此協程可以運行多個其他任務。更進一步來說,yield可以用來在任務和調度器之間進行通信。

我們的目的是 對 “任務”用更輕量級的包裝的協程函數:
 

復制代碼 代碼如下:


<?php

 

class Task {
    protected $taskId;
    protected $coroutine;
    protected $sendValue = null;
    protected $beforeFirstYield = true;

    public function __construct($taskId, Generator $coroutine) {
        $this->taskId = $taskId;
        $this->coroutine = $coroutine;
    }

    public function getTaskId() {
        return $this->taskId;
    }

    public function setSendValue($sendValue) {
        $this->sendValue = $sendValue;
    }

    public function run() {
        if ($this->beforeFirstYield) {
            $this->beforeFirstYield = false;
            return $this->coroutine->current();
        } else {
            $retval = $this->coroutine->send($this->sendValue);
            $this->sendValue = null;
            return $retval;
        }
    }

    public function isFinished() {
        return !$this->coroutine->valid();
    }
}

 

一個任務是用 任務ID標記一個協程。使用setSendValue()方法,你可以指定哪些值將被發送到下次的恢復(在之后你會了解到我們需要這個)。 run()函數確實沒有做什么,除了調用send()方法的協同程序。要理解為什么添加beforeFirstYieldflag,需要考慮下面的代碼片段:

 

復制代碼 代碼如下:


<?php

 

function gen() {
    yield 'foo';
    yield 'bar';
}

$gen = gen();
var_dump($gen->send('something'));

// As the send() happens before the first yield there is an implicit rewind() call,
// so what really happens is this:
$gen->rewind();
var_dump($gen->send('something'));

// The rewind() will advance to the first yield (and ignore its value), the send() will
// advance to the second yield (and dump its value). Thus we loose the first yielded value!

 

通過添加 beforeFirstYieldcondition 我們可以確定 first yield 的值 被返回。 

調度器現在不得不比多任務循環要做稍微多點了,然后才運行多任務:
 

復制代碼 代碼如下:


<?php

 

class Scheduler {
    protected $maxTaskId = 0;
    protected $taskMap = []; // taskId => task
    protected $taskQueue;

    public function __construct() {
        $this->taskQueue = new SplQueue();
    }

    public function newTask(Generator $coroutine) {
        $tid = ++$this->maxTaskId;
        $task = new Task($tid, $coroutine);
        $this->taskMap[$tid] = $task;
        $this->schedule($task);
        return $tid;
    }

    public function schedule(Task $task) {
        $this->taskQueue->enqueue($task);
    }

    public function run() {
        while (!$this->taskQueue->isEmpty()) {
            $task = $this->taskQueue->dequeue();
            $task->run();

            if ($task->isFinished()) {
                unset($this->taskMap[$task->getTaskId()]);
            } else {
                $this->schedule($task);
            }
        }
    }
}

 

 newTask()方法(使用下一個空閑的任務id)創建一個新任務,然后把這個任務放入任務映射數組里。接著它通過把任務放入任務隊列里來實現對任務的調度。接著run()方法掃描任務隊列,運行任務。如果一個任務結束了,那么它將從隊列里刪除,否則它將在隊列的末尾再次被調度。
 讓我們看看下面具有兩個簡單(并且沒有什么意義)任務的調度器: 

 

復制代碼 代碼如下:


<?php

 

function task1() {
    for ($i = 1; $i <= 10; ++$i) {
        echo "This is task 1 iteration $i.\n";
        yield;
    }
}

function task2() {
    for ($i = 1; $i <= 5; ++$i) {
        echo "This is task 2 iteration $i.\n";
        yield;
    }
}

$scheduler = new Scheduler;

$scheduler->newTask(task1());
$scheduler->newTask(task2());

$scheduler->run();

 

 兩個任務都僅僅回顯一條信息,然后使用yield把控制回傳給調度器。輸出結果如下:

 

復制代碼 代碼如下:

 This is task 1 iteration 1.
This is task 2 iteration 1.
This is task 1 iteration 2.
This is task 2 iteration 2.
This is task 1 iteration 3.
This is task 2 iteration 3.
This is task 1 iteration 4.
This is task 2 iteration 4.
This is task 1 iteration 5.
This is task 2 iteration 5.
This is task 1 iteration 6.
This is task 1 iteration 7.
This is task 1 iteration 8.
This is task 1 iteration 9.
This is task 1 iteration 10.
 

 

輸出確實如我們所期望的:對前五個迭代來說,兩個任務是交替運行的,接著第二個任務結束后,只有第一個任務繼續運行。  

與調度器之間通信

既然調度器已經運行了,那么我們就轉向日程表的下一項:任務和調度器之間的通信。我們將使用進程用來和操作系統會話的同樣的方式來通信:系統調用。我們需要系統調用的理由是操作系統與進程相比它處在不同的權限級別上。因此為了執行特權級別的操作(如殺死另一個進程),就不得不以某種方式把控制傳回給內核,這樣內核就可以執行所說的操作了。再說一遍,這種行為在內部是通過使用中斷指令來實現的。過去使用的是通用的int指令,如今使用的是更特殊并且更快速的syscall/sysenter指令。

我們的任務調度系統將反映這種設計:不是簡單地把調度器傳遞給任務(這樣久允許它做它想做的任何事),我們將通過給yield表達式傳遞信息來與系統調用通信。這兒yield即是中斷,也是傳遞信息給調度器(和從調度器傳遞出信息)的方法。 

為了說明系統調用,我將對可調用的系統調用做一個小小的封裝:
 

復制代碼 代碼如下:


<?php

 

class SystemCall {
    protected $callback;

    public function __construct(callable $callback) {
        $this->callback = $callback;
    }

    public function __invoke(Task $task, Scheduler $scheduler) {
        $callback = $this->callback; // Can't call it directly in PHP :/
        return $callback($task, $scheduler);
    }
}

 

它將像其他任何可調用那樣(使用_invoke)運行,不過它要求調度器把正在調用的任務和自身傳遞給這個函數。為了解決這個問題 我們不得不微微的修改調度器的run方法:

 

復制代碼 代碼如下:


<?php
public function run() {
    while (!$this->taskQueue->isEmpty()) {
        $task = $this->taskQueue->dequeue();
        $retval = $task->run();

 

        if ($retval instanceof SystemCall) {
            $retval($task, $this);
            continue;
        }

        if ($task->isFinished()) {
            unset($this->taskMap[$task->getTaskId()]);
        } else {
            $this->schedule($task);
        }
    }
}

 

第一個系統調用除了返回任務ID外什么都沒有做:

 

復制代碼 代碼如下:

<?php
function getTaskId() {
    return new SystemCall(function(Task $task, Scheduler $scheduler) {
        $task->setSendValue($task->getTaskId());
        $scheduler->schedule($task);
    });
}

 

這個函數確實設置任務id為下一次發送的值,并再次調度了這個任務。由于使用了系統調用,所以調度器不能自動調用任務,我們需要手工調度任務(稍后你將明白為什么這么做)。要使用這個新的系統調用的話,我們要重新編寫以前的例子:

 

復制代碼 代碼如下:


<?php

 

function task($max) {
    $tid = (yield getTaskId()); // <-- here's the syscall!
    for ($i = 1; $i <= $max; ++$i) {
        echo "This is task $tid iteration $i.\n";
        yield;
    }
}

$scheduler = new Scheduler;

$scheduler->newTask(task(10));
$scheduler->newTask(task(5));

$scheduler->run();

 

這段代碼將給出與前一個例子相同的輸出。注意系統調用同其他任何調用一樣正常地運行,不過預先增加了yield。要創建新的任務,然后再殺死它們的話,需要兩個以上的系統調用:  

復制代碼 代碼如下:


<?php

 

function newTask(Generator $coroutine) {
    return new SystemCall(
        function(Task $task, Scheduler $scheduler) use ($coroutine) {
            $task->setSendValue($scheduler->newTask($coroutine));
            $scheduler->schedule($task);
        }
    );
}

function killTask($tid) {
    return new SystemCall(
        function(Task $task, Scheduler $scheduler) use ($tid) {
            $task->setSendValue($scheduler->killTask($tid));
            $scheduler->schedule($task);
        }
    );
}

 

killTask函數需要在調度器里增加一個方法:

 

復制代碼 代碼如下:


<?php

 

public function killTask($tid) {
    if (!isset($this->taskMap[$tid])) {
        return false;
    }

    unset($this->taskMap[$tid]);

    // This is a bit ugly and could be optimized so it does not have to walk the queue,
    // but assuming that killing tasks is rather rare I won't bother with it now
    foreach ($this->taskQueue as $i => $task) {
        if ($task->getTaskId() === $tid) {
            unset($this->taskQueue[$i]);
            break;
        }
    }

    return true;
}



 

 

用來測試新功能的微腳本:  

 

復制代碼 代碼如下:


<?php

 

function childTask() {
    $tid = (yield getTaskId());
    while (true) {
        echo "Child task $tid still alive!\n";
        yield;
    }
}

function task() {
    $tid = (yield getTaskId());
    $childTid = (yield newTask(childTask()));

    for ($i = 1; $i <= 6; ++$i) {
        echo "Parent task $tid iteration $i.\n";
        yield;

        if ($i == 3) yield killTask($childTid);
    }
}

$scheduler = new Scheduler;
$scheduler->newTask(task());
$scheduler->run();

 

這段代碼將打印以下信息:

 

復制代碼 代碼如下:

Parent task 1 iteration 1.
Child task 2 still alive!
Parent task 1 iteration 2.
Child task 2 still alive!
Parent task 1 iteration 3.
Child task 2 still alive!
Parent task 1 iteration 4.
Parent task 1 iteration 5.
Parent task 1 iteration 6.

 

經過三次迭代以后子任務將被殺死,因此這就是"Child is still alive"消息結束的時候。可能應當指出的是這不是真正的父子關系。 因為甚至在父任務結束后子任務仍然可以運行。或者子任務可以殺死父任務。可以修改調度器使它具有更層級化的任務結構,不過 在這篇文章里我沒有這么做。 

你可以實現許多進程管理調用。例如 wait(它一直等待到任務結束運行時),exec(它替代當前任務)和fork(它創建一個 當前任務的克隆)。fork非常酷,而且你可以使用PHP的協程真正地實現它,因為它們都支持克隆。 

然而讓我們把這些留給有興趣的讀者吧,我們去看下一個議題。

幾點人
翻譯于 4天前
0人頂
頂 翻譯的不錯哦!
 

非阻塞IO
很明顯,我們的任務管理系統的真正很酷的應用是web服務器。它有一個任務是在套接字上偵聽是否有新連接,當有新連接要建立的時候  ,它創建一個新任務來處理新連接。
 web服務器最難的部分通常是像讀數據這樣的套接字操作是阻塞的。例如PHP將等待到客戶端完成發送為止。對一個WEB服務器來說,這 根本不行;這就意味著服務器在一個時間點上只能處理一個連接。

解決方案是確保在真正對套接字讀寫之前該套接字已經“準備就緒”。為了查找哪個套接字已經準備好讀或者寫了,可以使用 流選擇函數。 

首先,讓我們添加兩個新的 syscall,它們將等待直到指定 socket 準備好:

 

復制代碼 代碼如下:

 <?php     
 function waitForRead($socket) { 
     return new SystemCall( 
         function(Task $task, Scheduler $scheduler) use ($socket) { 
             $scheduler->waitForRead($socket, $task); 
         } 
     ); 
 } 

 function waitForWrite($socket) { 
     return new SystemCall( 
         function(Task $task, Scheduler $scheduler) use ($socket) { 
             $scheduler->waitForWrite($socket, $task); 
         } 
     ); 
 }


這些 syscall 只是在調度器中代理其各自的方法: 

 

 

復制代碼 代碼如下:

 <?php 

 // resourceID => [socket, tasks] 
 protected $waitingForRead = []; 
 protected $waitingForWrite = []; 

 public function waitForRead($socket, Task $task) { 
     if (isset($this->waitingForRead[(int) $socket])) { 
         $this->waitingForRead[(int) $socket][1][] = $task; 
     } else { 
         $this->waitingForRead[(int) $socket] = [$socket, [$task]]; 
     } 
 } 

 public function waitForWrite($socket, Task $task) { 
     if (isset($this->waitingForWrite[(int) $socket])) { 
         $this->waitingForWrite[(int) $socket][1][] = $task; 
     } else { 
         $this->waitingForWrite[(int) $socket] = [$socket, [$task]]; 
     } 
 }


waitingForRead 及 waitingForWrite 屬性是兩個承載等待的socket 及等待它們的任務的數組。有趣的部分在于下面的方法,它將檢查 socket 是否可用,并重新安排各自任務:

 

 

復制代碼 代碼如下:

 <?php 

 protected function ioPoll($timeout) { 
     $rSocks = []; 
     foreach ($this->waitingForRead as list($socket)) { 
         $rSocks[] = $socket; 
     } 

     $wSocks = []; 
     foreach ($this->waitingForWrite as list($socket)) { 
         $wSocks[] = $socket; 
     } 

     $eSocks = []; // dummy 

     if (!stream_select($rSocks, $wSocks, $eSocks, $timeout)) { 
         return; 
     } 

     foreach ($rSocks as $socket) { 
         list(, $tasks) = $this->waitingForRead[(int) $socket]; 
         unset($this->waitingForRead[(int) $socket]); 

         foreach ($tasks as $task) { 
             $this->schedule($task); 
         } 
     } 

     foreach ($wSocks as $socket) { 
         list(, $tasks) = $this->waitingForWrite[(int) $socket]; 
         unset($this->waitingForWrite[(int) $socket]); 

         foreach ($tasks as $task) { 
             $this->schedule($task); 
         } 
     } 
 }


stream_select 函數接受承載讀取、寫入以及待檢查的socket的數組(我們無需考慮最后一類)。數組將按引用傳遞,函數只會保留那些狀態改變了的數組元素。我們可以遍歷這些數組,并重新安排與之相關的任務。

延伸 · 閱讀

精彩推薦
Weibo Article 1 Weibo Article 2 Weibo Article 3 Weibo Article 4 Weibo Article 5 Weibo Article 6 Weibo Article 7 Weibo Article 8 Weibo Article 9 Weibo Article 10 Weibo Article 11 Weibo Article 12 Weibo Article 13 Weibo Article 14 Weibo Article 15 Weibo Article 16 Weibo Article 17 Weibo Article 18 Weibo Article 19 Weibo Article 20 Weibo Article 21 Weibo Article 22 Weibo Article 23 Weibo Article 24 Weibo Article 25 Weibo Article 26 Weibo Article 27 Weibo Article 28 Weibo Article 29 Weibo Article 30 Weibo Article 31 Weibo Article 32 Weibo Article 33 Weibo Article 34 Weibo Article 35 Weibo Article 36 Weibo Article 37 Weibo Article 38 Weibo Article 39 Weibo Article 40
主站蜘蛛池模板: 色综合99 | 成人在线网址 | 国产精品一区二区不卡 | 久热中文在线 | 国产精品亚洲第一区 | 亚州中文字幕 | 国产精品无码久久久久 | 色综合中文 | 五月天婷婷国产精品 | 国产精品天天干 | 青青久在线视频 | 亚洲男人的天堂网站 | 日韩精品影院 | 欧美日韩综合精品 | 黄色网页在线 | www.av在线播放 | 亚洲精品成人18久久久久 | 欧美一区二区在线播放 | 青青在线精品视频 | www.99re | 亚洲黄网在线观看 | 日韩毛片免费视频一级特黄 | 精品日韩| 日韩欧美成人影院 | 成人在线视频一区 | 亚洲va欧美va人人爽成人影院 | 久久伊人网视频 | 日韩av一区二区在线观看 | 午夜小视频在线观看 | 国产片在线观看 | 精品网站www | 国产精品一区二区视频 | 在线无码| 一区二区三区久久久 | 97精品国产一区二区三区 | 性大毛片视频 | 国产一区日韩精品 | 丰满白嫩老熟女毛片 | 亚洲午夜免费视频 | 亚洲一区国产 | 亚洲国产成人久久 |