diff --git a/src/EventLoop/AwaitableInterface.php b/src/EventLoop/AwaitableInterface.php new file mode 100644 index 0000000..a1cc946 --- /dev/null +++ b/src/EventLoop/AwaitableInterface.php @@ -0,0 +1,35 @@ +main = new Task($this, $main); $this->tick = new \SplQueue(); $this->future = new \SplDoublyLinkedList(); + $this->timeouts = new \TimePriorityQueue(); $this->addFutureTask($this->main); } @@ -50,6 +58,13 @@ public function addReadStream($stream) array_push($this->readStreams, $stream); } + public function addTimeout(Process $process, $timeout) + { + $this->timeouts->insert($process, $timeout); + + // $this->timeouts-> wut wut wut; + } + /** * Starts the event loop. */ @@ -61,20 +76,14 @@ public function run() $this->waitForStreamActivity($timeout); $this->nextTick(); - $timeout = $this->tick->count() === 0 ? 1 : 0; + $timeout = $this->tick->count() === 0 ? 1 : 0; // take min from the shortest timeout and big default value $this->tick(); } } /** - * @return Task */ - private function nextTask() - { - return $this->tick->dequeue(); - } - private function nextTick() { $this->future->rewind(); @@ -93,7 +102,7 @@ private function nextTick() private function tick() { while (!$this->tick->isEmpty() && !$this->main->isFinished()) { - $task = $this->nextTask(); + $task = $this->tick->dequeue(); $task->run(); @@ -108,9 +117,14 @@ private function addFutureTask(Task $task) $this->future->push($task); } + private function getNextTimeout() + { + return + } + private function waitForStreamActivity($timeout = 0) { - $changed = @stream_select($read = $this->readStreams, $write = [], $except = [], $timeout); + $changed = @stream_select($read = $this->readStreams, $write = [], $except = [], 0, 1000 * $timeout); if ($changed > 0) { foreach ($this->future as $task) { diff --git a/src/EventLoop/Process.php b/src/EventLoop/Process.php new file mode 100644 index 0000000..6311efa --- /dev/null +++ b/src/EventLoop/Process.php @@ -0,0 +1,79 @@ +loop = $loop; + $this->func = $func; + $this->args = $args; + } + + public function shouldWait() + { + return $this->started + } + + public function isValid() + { + return !($this->generator && $this->generator->valid()); + } + + public function run() + { + if ($this->started) { + throw new \RuntimeException('Cannot start an already running process!'); + } + + $this->running = true; + + $routine = call_user_func($this->func, $this->args); + + if ($routine && $routine instanceof \Generator) { + $this-> + } + } + + public function resume() + { + if (!$this->isValid()) { + throw new \RuntimeException('Cannot resume a task that already finished execution.'); + } + + while () { + $this->resumeGenerator($this->generator); + } + } + + private function resumeGenerator(\Generator $generator) + { + if (!$generator->current() instanceof \Generator) { + return $generator->send($generator->current()); + } + + if ($generator->current()->valid()) { + $this->resume($generator->current()); + return; + } + + $generator->send($generator->current()->getReturn()); + } + + private function getCurrentValue(\Generator $generator) + { + if (!$generator->current() instanceof \Generator) { + return $generator->current(); + } + + return $this->getCurrentValue($generator->current()); + } +} diff --git a/src/Stream/StreamReader.php b/src/Stream/StreamReader.php new file mode 100644 index 0000000..c64e884 --- /dev/null +++ b/src/Stream/StreamReader.php @@ -0,0 +1,21 @@ +stream = $stream; + } + + public function setEventLoop(EventLoop $loop) + { + $this->loop = $loop; + } + + public function shouldWait() + { + return + } +} diff --git a/src/Time/Timeout.php b/src/Time/Timeout.php index 7e0d859..6bc9636 100644 --- a/src/Time/Timeout.php +++ b/src/Time/Timeout.php @@ -2,49 +2,28 @@ namespace CrystalPlanet\Redshift\Time; -use CrystalPlanet\Redshift\Channel\Channel; +use CrystalPlanet\Redshift\EventLoop\AwaitableInterface; -class Timeout +class Timeout implements AwaitableInterface { - /** - * @var Channel - */ - private $channel; - - /** - * @var int - */ private $timeout; - /** - * Creates a new timeout. - * - * @param int $milliseconds Time to wait in milliseconds. - * @param Channel $channel Channel to send the signal on. - */ - public function __construct($milliseconds, Channel $channel) + public function __construct($timeout = 0) { - $this->channel = $channel; + if (!is_int($timeout)) { + throw new \RuntimeException('$timeout must be a number!'); + } - $this->timeout = $this->now() + $milliseconds; + $this->timeout = $timeout + intval(microtime(true) * 1000); } - public function __invoke() + public function shouldWait() { - while ($this->now() < $this->timeout) { - yield; - } - - $this->channel->put(true); + return $this->timeout - intval(microtime(true) * 1000) < 1; } - /** - * Returns current time in milliseconds. - * - * @return int - */ - private function now() + public function await(Process $process) { - return round(microtime(true) * 1000); + $process->getEventLoop()->addTimeout($this->timeout, $process); } -} \ No newline at end of file +} diff --git a/src/Utility/TimePriorityQueue.php b/src/Utility/TimePriorityQueue.php new file mode 100644 index 0000000..f9c0b35 --- /dev/null +++ b/src/Utility/TimePriorityQueue.php @@ -0,0 +1,14 @@ +