Added a ConnectionException, misc fixes

This commit is contained in:
Chris 2024-03-01 19:10:47 +01:00
parent 324fa6e519
commit 9d3f6d9ddd
4 changed files with 19 additions and 6 deletions

View File

@ -32,6 +32,8 @@ class CommandBusClient implements CommandBusInterface
private bool $autoReconnect = true; private bool $autoReconnect = true;
private bool $isReconnecting = false;
public function __construct(?CommandRegistry $commandRegistry = null, ?ConnectorInterface $connector = null) public function __construct(?CommandRegistry $commandRegistry = null, ?ConnectorInterface $connector = null)
{ {
$this->commandRegistry = $commandRegistry; $this->commandRegistry = $commandRegistry;
@ -64,6 +66,7 @@ class CommandBusClient implements CommandBusInterface
$this->connection = null; $this->connection = null;
} }
$this->isReconnecting = true;
$this->connector->connect($this->address)->then( $this->connector->connect($this->address)->then(
function (DuplexStreamInterface $connection) { function (DuplexStreamInterface $connection) {
$this->connection = $connection; $this->connection = $connection;
@ -73,12 +76,14 @@ class CommandBusClient implements CommandBusInterface
if ($this->autoReconnect) { if ($this->autoReconnect) {
Loop::addTimer(1, $this->reconnect(...)); Loop::addTimer(1, $this->reconnect(...));
} }
$this->isReconnecting = false;
}); });
$connection->on('close', function () { $connection->on('close', function () {
$this->emit(self::EVENT_DISCONNECTED); $this->emit(self::EVENT_DISCONNECTED);
if ($this->autoReconnect) { if ($this->autoReconnect) {
Loop::addTimer(1, $this->reconnect(...)); Loop::addTimer(1, $this->reconnect(...));
} }
$this->isReconnecting = false;
}); });
$connection->on('data', function ($data) use ($connection) { $connection->on('data', function ($data) use ($connection) {
try { try {
@ -142,9 +147,10 @@ class CommandBusClient implements CommandBusInterface
*/ */
public function execute(string $command, array $context): PromiseInterface public function execute(string $command, array $context): PromiseInterface
{ {
if (!$this->connection) { if (!$this->connection || !$this->connection->isWritable()) {
$this->reconnect();
return new Promise(function (callable $resolve, callable $canceller) { return new Promise(function (callable $resolve, callable $canceller) {
$canceller(new \RuntimeException("Not connected to command bus.")); $canceller(new ConnectionException("Not connected to command bus."));
}); });
} }

View File

@ -2,7 +2,7 @@
namespace NoccyLabs\React\CommandBus; namespace NoccyLabs\React\CommandBus;
interface CommandBusException class CommandBusException extends \RuntimeException
{ {
} }

View File

@ -0,0 +1,8 @@
<?php
namespace NoccyLabs\React\CommandBus;
class ConnectionException extends CommandBusException
{
}

View File

@ -1,9 +1,8 @@
<?php <?php
namespace NoccyLabs\React\CommandBus; namespace NoccyLabs\React\CommandBus;
use RuntimeException;
class MessageException extends RuntimeException implements CommandBusException class MessageException extends CommandBusException
{ {
} }