Skip to content

Commit

Permalink
update v3
Browse files Browse the repository at this point in the history
  • Loading branch information
millken committed Sep 18, 2018
1 parent f340eee commit c3bb4d5
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 12 deletions.
21 changes: 19 additions & 2 deletions src/Application/Swoole.php
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,14 @@ class Swoole

public function build()
{
$address = '127.0.0.1';
$port = 7000;
$container = Application::getContainer();
$swoole = $container->has('swoole') ? $container->get('swoole') : [];
$address = $swoole['server']['address'] ?? '127.0.0.1';
$port = $swoole['server']['port'] ?? $this->getRandomPort($address);
$options = $swoole['options'] ?? [];
$this->server = new SwooleHttpServer($address, $port, SWOOLE_PROCESS, SWOOLE_TCP);

$this->server->set($options);
static::$instances = &$this;

return $this;
Expand Down Expand Up @@ -136,4 +140,17 @@ public function run(): void
$this->server->on('Start', [$this, 'onStart']);
$this->server->start();
}

private function getRandomPort($address): int
{
while (true) {
$port = mt_rand(1025, 65000);
$fp = @fsockopen($address, $port, $errno, $errstr, 0.1);
if (!$fp) {
break;
}
}

return $port;
}
}
17 changes: 17 additions & 0 deletions src/Controller/CronWorker.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
<?php

declare(strict_types=1);

namespace Ypf\Controller;

use Ypf\Application;

abstract class CronWorker
{
public function __get($name)
{
return Application::getContainer()->get($name);
}

abstract protected function run();
}
47 changes: 37 additions & 10 deletions src/Swoole/CronManager.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,43 +4,70 @@

namespace Ypf\Swoole;

use Exception;
use ReflectionClass;
use Ypf\Application;
use Ypf\Controller\CronWorker;
use Cron\CronExpression;
use Swoole\Process as SwooleProcess;

class CronManager
{
private $queue;
private $ready;
private $job;

public function process()
{
echo 'cronmanager';
$process = new SwooleProcess([$this, 'start'], false, 1);
$pid = $process->start();
$process->start();
}

public function start(SwooleProcess $worker)
{
global $argv;
$processName = "php {$argv[0]}: cron manager";
\swoole_set_process_name($processName);
$this->queue = Application::getContainer()->get('workers');
$workers = Application::getContainer()->get('workers');
foreach ($workers as $worker) {
$className = $worker[0];
$classReflection = new ReflectionClass($className);
if (!$classReflection->isSubclassOf(CronWorker::class)) {
throw new Exception('cron worker mustbe extends '.CronWorker::class);
}
if (!isset($worker[1])) {
go(function () use ($classReflection) {
$classReflection->newInstance()->run();
});
} else {
$this->queue[] = [$classReflection->newInstance(), $worker[1]];
}
}
\swoole_timer_tick(1000, [$this, 'tick']);
}

public function tick()
{
$queue = $this->queue;
foreach ($queue as $key => $val) {
\swoole_timer_tick(1000 * $val['cron'], function () use ($key, $val) {
$crontab = CronExpression::isValidExpression($val[1]);
if (!$crontab) {
$timeSecond = intval($val[1]);
} else {
$cron = CronExpression::factory($val[1]);
$nextRunTime = $cron->getNextRunDate()->getTimestamp();
$timeSecond = intval($nextRunTime - time());
}
if ($timeSecond < 1) {
continue;
}

\swoole_timer_after(1000 * $timeSecond, function () use ($key, $val) {
$this->queue[$key] = $val;
unset($this->ready[$key]);
$obj = new $val['class']();
$obj->$val['method']();
unset($this->job[$key]);
go(function () use ($val) {$val[0]->run(); });
});
unset($this->queue[$key]);
$this->ready[$key] = $val;
$this->job[$key] = $val;
}
echo 'tick'.PHP_EOL;
}
}

0 comments on commit c3bb4d5

Please sign in to comment.