From e670d725e945ff463e9f93f467eed79cb3489fdb Mon Sep 17 00:00:00 2001 From: Christopher Vagnetoft Date: Fri, 1 Mar 2024 18:19:25 +0100 Subject: [PATCH] Bugfixes, reconnection in client, readme --- README.md | 2 +- src/CommandBusClient.php | 22 +++++++++++++++++++++- src/Message.php | 4 +++- 3 files changed, 25 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index 14e2f2d..cef0ecb 100644 --- a/README.md +++ b/README.md @@ -2,7 +2,7 @@ ## Features -* Can run monolithic (create a bus and use as is), or distributed (create bus and use clients), or in a hybrid setup. The CommandBus functions identical with or without clients. +* Can run monolithic (create a bus and use as is), or distributed (create bus and use clients), or in a hybrid setup. The CommandBus functions identical with or without listeners and clients. * All commands called asynchronously using promises and deferreds. * Push notifications from the bus to subscribers and listeners, such as progress or log/error messages. diff --git a/src/CommandBusClient.php b/src/CommandBusClient.php index b099993..806e79e 100644 --- a/src/CommandBusClient.php +++ b/src/CommandBusClient.php @@ -30,6 +30,8 @@ class CommandBusClient implements CommandBusInterface private string $address; + private bool $autoReconnect = true; + public function __construct(?CommandRegistry $commandRegistry = null, ?ConnectorInterface $connector = null) { $this->commandRegistry = $commandRegistry; @@ -48,6 +50,7 @@ class CommandBusClient implements CommandBusInterface public function close(): void { + $this->autoReconnect = false; $this->connection->close(); $this->connection->removeAllListeners(); $this->connection = null; @@ -56,15 +59,26 @@ class CommandBusClient implements CommandBusInterface private function reconnect(): void { if ($this->connection) { - $this->close(); + $this->connection->close(); + $this->connection->removeAllListeners(); + $this->connection = null; } $this->connector->connect($this->address)->then( function (DuplexStreamInterface $connection) { $this->connection = $connection; $this->emit(self::EVENT_CONNECTED); + $connection->on('error', function () { + $this->emit(self::EVENT_DISCONNECTED); + if ($this->autoReconnect) { + Loop::addTimer(1, $this->reconnect(...)); + } + }); $connection->on('close', function () { $this->emit(self::EVENT_DISCONNECTED); + if ($this->autoReconnect) { + Loop::addTimer(1, $this->reconnect(...)); + } }); $connection->on('data', function ($data) use ($connection) { try { @@ -128,6 +142,12 @@ class CommandBusClient implements CommandBusInterface */ public function execute(string $command, array $context): PromiseInterface { + if (!$this->connection) { + return new Promise(function (callable $resolve) { + throw new \RuntimeException("Not connected to command bus."); + }); + } + $deferred = new Deferred(); $message = new Message(Message::MSGTYPE_EXECUTE, [ diff --git a/src/Message.php b/src/Message.php index 299e927..0309b9e 100644 --- a/src/Message.php +++ b/src/Message.php @@ -15,6 +15,8 @@ class Message implements JsonSerializable const MSGTYPE_EXECUTE = 'execute'; /** @var string Execute result */ const MSGTYPE_RESULT = 'result'; + /** @var string Error message */ + const MSGTYPE_ERROR = 'error'; /** @var string Notify event */ const MSGTYPE_NOTIFY = 'notify'; /** @var string Registry update (command list set and update) */ @@ -29,7 +31,7 @@ class Message implements JsonSerializable public function __construct(string $messageType, array $messageData = [], ?string $uuid = null) { - $this->uuid = ($uuid && Uuid::isValid($uuid)) ? $uuid : (string)Uuid::v7(); + $this->uuid = $uuid ?? (string)Uuid::v7(); $this->messageType = $messageType; $this->messageData = $messageData; }