151 lines
4.8 KiB
PHP
151 lines
4.8 KiB
PHP
|
<?php
|
||
|
|
||
|
namespace NoccyLabs\React\CommandBus;
|
||
|
use Evenement\EventEmitterTrait;
|
||
|
use React\EventLoop\Loop;
|
||
|
use React\Promise\PromiseInterface;
|
||
|
use React\Promise\Promise;
|
||
|
use React\Promise\Deferred;
|
||
|
use React\Socket\ConnectorInterface;
|
||
|
use React\Socket\TcpConnector;
|
||
|
use React\Stream\DuplexStreamInterface;
|
||
|
use Throwable;
|
||
|
|
||
|
class CommandBusClient implements CommandBusInterface
|
||
|
{
|
||
|
use EventEmitterTrait;
|
||
|
|
||
|
const EVENT_ERROR = 'error';
|
||
|
const EVENT_CONNECTED = 'connected';
|
||
|
const EVENT_DISCONNECTED = 'disconnected';
|
||
|
|
||
|
private ?CommandRegistry $commandRegistry = null;
|
||
|
|
||
|
private ConnectorInterface $connector;
|
||
|
|
||
|
private ?DuplexStreamInterface $connection = null;
|
||
|
|
||
|
/** @var array<string,Deferred> */
|
||
|
private array $pending = [];
|
||
|
|
||
|
private string $address;
|
||
|
|
||
|
public function __construct(?CommandRegistry $commandRegistry = null, ?ConnectorInterface $connector = null)
|
||
|
{
|
||
|
$this->commandRegistry = $commandRegistry;
|
||
|
if ($commandRegistry) {
|
||
|
$commandRegistry->on(CommandRegistry::EVENT_REGISTERED, $this->onCommandRegistered(...));
|
||
|
$commandRegistry->on(CommandRegistry::EVENT_UNREGISTERED, $this->onCommandUnregistered(...));
|
||
|
}
|
||
|
$this->connector = $connector??new TcpConnector();
|
||
|
}
|
||
|
|
||
|
public function connect(string $address): void
|
||
|
{
|
||
|
$this->address = $address;
|
||
|
$this->reconnect();
|
||
|
}
|
||
|
|
||
|
public function close(): void
|
||
|
{
|
||
|
$this->connection->close();
|
||
|
$this->connection->removeAllListeners();
|
||
|
$this->connection = null;
|
||
|
}
|
||
|
|
||
|
private function reconnect(): void
|
||
|
{
|
||
|
if ($this->connection) {
|
||
|
$this->close();
|
||
|
}
|
||
|
|
||
|
$this->connector->connect($this->address)->then(
|
||
|
function (DuplexStreamInterface $connection) {
|
||
|
$this->connection = $connection;
|
||
|
$this->emit(self::EVENT_CONNECTED);
|
||
|
$connection->on('close', function () {
|
||
|
$this->emit(self::EVENT_DISCONNECTED);
|
||
|
});
|
||
|
$connection->on('data', function ($data) use ($connection) {
|
||
|
try {
|
||
|
$message = Message::fromString($data);
|
||
|
$this->onServerMessage($message);
|
||
|
} catch (MessageException $e) {
|
||
|
$connection->end('{"msg":"error","data":{"error":"Bad message format"}}');
|
||
|
}
|
||
|
});
|
||
|
},
|
||
|
function (Throwable $e) {
|
||
|
$this->emit(self::EVENT_ERROR, [ $e->getMessage(), $e ]);
|
||
|
Loop::addTimer(5, $this->reconnect(...));
|
||
|
}
|
||
|
);
|
||
|
}
|
||
|
|
||
|
private function onServerMessage(Message $message): void
|
||
|
{
|
||
|
// fprintf(STDERR, "onServerMessage: %s\n", $message->toJson());
|
||
|
switch ($message->getType()) {
|
||
|
case Message::MSGTYPE_EXECUTE: // server call to execute command
|
||
|
// TODO implement me
|
||
|
break;
|
||
|
case Message::MSGTYPE_RESULT: // result from server
|
||
|
$uuid = $message->getUuid();
|
||
|
$result = $message->getData()['result'];
|
||
|
if (array_key_exists($uuid, $this->pending)) {
|
||
|
$this->pending[$uuid]->resolve($result);
|
||
|
unset($this->pending[$uuid]);
|
||
|
}
|
||
|
break;
|
||
|
case Message::MSGTYPE_REGISTRY: // command registry actions
|
||
|
// TODO implement me
|
||
|
break;
|
||
|
default:
|
||
|
$this->connection->end('{"msg":"error","data":{"error":"Unexpected message type"}}');
|
||
|
}
|
||
|
}
|
||
|
|
||
|
private function onCommandRegistered(string $command): void
|
||
|
{
|
||
|
if ($this->connection && $this->connection->isWritable()) {
|
||
|
$msg = new Message(Message::MSGTYPE_REGISTRY, [ 'register' => [ $command ]]);
|
||
|
$this->connection->write($msg->toJson());
|
||
|
}
|
||
|
}
|
||
|
|
||
|
private function onCommandUnregistered(string $command): void
|
||
|
{
|
||
|
if ($this->connection && $this->connection->isWritable()) {
|
||
|
$msg = new Message(Message::MSGTYPE_REGISTRY, [ 'unregister' => [ $command ]]);
|
||
|
$this->connection->write($msg->toJson());
|
||
|
}
|
||
|
}
|
||
|
|
||
|
/**
|
||
|
* {@inheritDoc}
|
||
|
*/
|
||
|
public function execute(string $command, array $context): PromiseInterface
|
||
|
{
|
||
|
$deferred = new Deferred();
|
||
|
|
||
|
$message = new Message(Message::MSGTYPE_EXECUTE, [
|
||
|
'command' => $command,
|
||
|
'context' => $context
|
||
|
]);
|
||
|
$this->pending[$message->getUuid()] = $deferred;
|
||
|
|
||
|
// fprintf(STDERR, "write: %s\n", $message->toJson());
|
||
|
$this->connection->write($message->toJson()."\n");
|
||
|
|
||
|
return $deferred->promise();
|
||
|
}
|
||
|
|
||
|
/**
|
||
|
* {@inheritDoc}
|
||
|
*/
|
||
|
public function notify(string $event, array $data): void
|
||
|
{
|
||
|
}
|
||
|
}
|
||
|
|