react-command-bus/src/CommandBusClient.php

151 lines
4.8 KiB
PHP

<?php
namespace NoccyLabs\React\CommandBus;
use Evenement\EventEmitterTrait;
use React\EventLoop\Loop;
use React\Promise\PromiseInterface;
use React\Promise\Promise;
use React\Promise\Deferred;
use React\Socket\ConnectorInterface;
use React\Socket\TcpConnector;
use React\Stream\DuplexStreamInterface;
use Throwable;
class CommandBusClient implements CommandBusInterface
{
use EventEmitterTrait;
const EVENT_ERROR = 'error';
const EVENT_CONNECTED = 'connected';
const EVENT_DISCONNECTED = 'disconnected';
private ?CommandRegistry $commandRegistry = null;
private ConnectorInterface $connector;
private ?DuplexStreamInterface $connection = null;
/** @var array<string,Deferred> */
private array $pending = [];
private string $address;
public function __construct(?CommandRegistry $commandRegistry = null, ?ConnectorInterface $connector = null)
{
$this->commandRegistry = $commandRegistry;
if ($commandRegistry) {
$commandRegistry->on(CommandRegistry::EVENT_REGISTERED, $this->onCommandRegistered(...));
$commandRegistry->on(CommandRegistry::EVENT_UNREGISTERED, $this->onCommandUnregistered(...));
}
$this->connector = $connector??new TcpConnector();
}
public function connect(string $address): void
{
$this->address = $address;
$this->reconnect();
}
public function close(): void
{
$this->connection->close();
$this->connection->removeAllListeners();
$this->connection = null;
}
private function reconnect(): void
{
if ($this->connection) {
$this->close();
}
$this->connector->connect($this->address)->then(
function (DuplexStreamInterface $connection) {
$this->connection = $connection;
$this->emit(self::EVENT_CONNECTED);
$connection->on('close', function () {
$this->emit(self::EVENT_DISCONNECTED);
});
$connection->on('data', function ($data) use ($connection) {
try {
$message = Message::fromString($data);
$this->onServerMessage($message);
} catch (MessageException $e) {
$connection->end('{"msg":"error","data":{"error":"Bad message format"}}');
}
});
},
function (Throwable $e) {
$this->emit(self::EVENT_ERROR, [ $e->getMessage(), $e ]);
Loop::addTimer(5, $this->reconnect(...));
}
);
}
private function onServerMessage(Message $message): void
{
// fprintf(STDERR, "onServerMessage: %s\n", $message->toJson());
switch ($message->getType()) {
case Message::MSGTYPE_EXECUTE: // server call to execute command
// TODO implement me
break;
case Message::MSGTYPE_RESULT: // result from server
$uuid = $message->getUuid();
$result = $message->getData()['result'];
if (array_key_exists($uuid, $this->pending)) {
$this->pending[$uuid]->resolve($result);
unset($this->pending[$uuid]);
}
break;
case Message::MSGTYPE_REGISTRY: // command registry actions
// TODO implement me
break;
default:
$this->connection->end('{"msg":"error","data":{"error":"Unexpected message type"}}');
}
}
private function onCommandRegistered(string $command): void
{
if ($this->connection && $this->connection->isWritable()) {
$msg = new Message(Message::MSGTYPE_REGISTRY, [ 'register' => [ $command ]]);
$this->connection->write($msg->toJson());
}
}
private function onCommandUnregistered(string $command): void
{
if ($this->connection && $this->connection->isWritable()) {
$msg = new Message(Message::MSGTYPE_REGISTRY, [ 'unregister' => [ $command ]]);
$this->connection->write($msg->toJson());
}
}
/**
* {@inheritDoc}
*/
public function execute(string $command, array $context): PromiseInterface
{
$deferred = new Deferred();
$message = new Message(Message::MSGTYPE_EXECUTE, [
'command' => $command,
'context' => $context
]);
$this->pending[$message->getUuid()] = $deferred;
// fprintf(STDERR, "write: %s\n", $message->toJson());
$this->connection->write($message->toJson()."\n");
return $deferred->promise();
}
/**
* {@inheritDoc}
*/
public function notify(string $event, array $data): void
{
}
}