10 Commits

7 changed files with 82 additions and 14 deletions

View File

@ -2,7 +2,7 @@
## Features ## Features
* Can run monolithic (create a bus and use as is), or distributed (create bus and use clients), or in a hybrid setup. The CommandBus functions identical with or without clients. * Can run monolithic (create a bus and use as is), or distributed (create bus and use clients), or in a hybrid setup. The CommandBus functions identical with or without listeners and clients.
* All commands called asynchronously using promises and deferreds. * All commands called asynchronously using promises and deferreds.
* Push notifications from the bus to subscribers and listeners, such as progress or log/error messages. * Push notifications from the bus to subscribers and listeners, such as progress or log/error messages.

View File

@ -51,8 +51,10 @@ class CommandBus implements CommandBusInterface
$this->connections->attach($client); $this->connections->attach($client);
$client->on('data', function ($data) use ($client) { $client->on('data', function ($data) use ($client) {
try { try {
$message = Message::fromString($data); $messages = Message::fromStringMulti($data);
$this->onClientMessage($client, $message); foreach ($messages as $message) {
$this->onClientMessage($client, $message);
}
} catch (MessageException $e) { } catch (MessageException $e) {
$client->end('{"msg":"error","data":{"error":"Bad message format"}}'); $client->end('{"msg":"error","data":{"error":"Bad message format"}}');
} }
@ -71,6 +73,9 @@ class CommandBus implements CommandBusInterface
$this->executeContext($context)->then( $this->executeContext($context)->then(
function ($result) use ($message, $client) { function ($result) use ($message, $client) {
$client->write($message->asResult($result)->toJson()."\n"); $client->write($message->asResult($result)->toJson()."\n");
},
function (\Throwable $error) use ($message, $client) {
$client->write($message->asError($error->getMessage())->toJson()."\n");
} }
); );
break; break;

View File

@ -30,6 +30,10 @@ class CommandBusClient implements CommandBusInterface
private string $address; private string $address;
private bool $autoReconnect = true;
private bool $isReconnecting = false;
public function __construct(?CommandRegistry $commandRegistry = null, ?ConnectorInterface $connector = null) public function __construct(?CommandRegistry $commandRegistry = null, ?ConnectorInterface $connector = null)
{ {
$this->commandRegistry = $commandRegistry; $this->commandRegistry = $commandRegistry;
@ -48,34 +52,54 @@ class CommandBusClient implements CommandBusInterface
public function close(): void public function close(): void
{ {
$this->connection->close(); $this->autoReconnect = false;
$this->connection->end();
$this->connection->removeAllListeners(); $this->connection->removeAllListeners();
$this->connection = null; $this->connection = null;
} }
private function reconnect(): void private function reconnect(): void
{ {
if ($this->isReconnecting) return;
if ($this->connection) { if ($this->connection) {
$this->close(); $this->connection->end();
$this->connection->removeAllListeners();
$this->connection = null;
} }
$this->isReconnecting = true;
$this->connector->connect($this->address)->then( $this->connector->connect($this->address)->then(
function (DuplexStreamInterface $connection) { function (DuplexStreamInterface $connection) {
$this->isReconnecting = false;
$this->connection = $connection; $this->connection = $connection;
$this->emit(self::EVENT_CONNECTED); $this->emit(self::EVENT_CONNECTED);
$connection->on('error', function ($error) {
$this->emit(self::EVENT_ERROR, [ $error ]);
$this->emit(self::EVENT_DISCONNECTED);
if ($this->autoReconnect) {
Loop::addTimer(1, $this->reconnect(...));
}
});
$connection->on('close', function () { $connection->on('close', function () {
$this->emit(self::EVENT_DISCONNECTED); $this->emit(self::EVENT_DISCONNECTED);
if ($this->autoReconnect) {
Loop::addTimer(1, $this->reconnect(...));
}
}); });
$connection->on('data', function ($data) use ($connection) { $connection->on('data', function ($data) use ($connection) {
try { try {
$message = Message::fromString($data); $messages = Message::fromStringMulti($data);
$this->onServerMessage($message); foreach ($messages as $message) {
$this->onServerMessage($message);
}
} catch (MessageException $e) { } catch (MessageException $e) {
$connection->end('{"msg":"error","data":{"error":"Bad message format"}}'); $connection->end('{"msg":"error","data":{"error":"Bad message format"}}');
} }
}); });
}, },
function (Throwable $e) { function (Throwable $e) {
$this->isReconnecting = false;
$this->emit(self::EVENT_ERROR, [ $e->getMessage(), $e ]); $this->emit(self::EVENT_ERROR, [ $e->getMessage(), $e ]);
Loop::addTimer(5, $this->reconnect(...)); Loop::addTimer(5, $this->reconnect(...));
} }
@ -102,6 +126,17 @@ class CommandBusClient implements CommandBusInterface
$data = (array)($message->getData()['data']??[]); $data = (array)($message->getData()['data']??[]);
$this->emit(self::EVENT_NOTIFY, [ $event, $data ]); $this->emit(self::EVENT_NOTIFY, [ $event, $data ]);
break; break;
case Message::MSGTYPE_ERROR: // error
var_dump($message);
$uuid = $message->getUuid();
$error = $message->getData()['error'];
if ($uuid && array_key_exists($uuid, $this->pending)) {
$this->pending[$uuid]->reject(new \Exception($error));
unset($this->pending[$uuid]);
} else {
$this->emit('error', [ $error ]);
}
break;
default: default:
$this->connection->end('{"msg":"error","data":{"error":"Unexpected message type"}}'); $this->connection->end('{"msg":"error","data":{"error":"Unexpected message type"}}');
} }
@ -128,6 +163,13 @@ class CommandBusClient implements CommandBusInterface
*/ */
public function execute(string $command, array $context): PromiseInterface public function execute(string $command, array $context): PromiseInterface
{ {
if (!$this->connection || !$this->connection->isWritable()) {
$this->reconnect();
return new Promise(function (callable $resolve, callable $canceller) {
$canceller(new ConnectionException("Not connected to command bus."));
});
}
$deferred = new Deferred(); $deferred = new Deferred();
$message = new Message(Message::MSGTYPE_EXECUTE, [ $message = new Message(Message::MSGTYPE_EXECUTE, [

View File

@ -2,7 +2,7 @@
namespace NoccyLabs\React\CommandBus; namespace NoccyLabs\React\CommandBus;
interface CommandBusException class CommandBusException extends \RuntimeException
{ {
} }

View File

@ -0,0 +1,8 @@
<?php
namespace NoccyLabs\React\CommandBus;
class ConnectionException extends CommandBusException
{
}

View File

@ -15,6 +15,8 @@ class Message implements JsonSerializable
const MSGTYPE_EXECUTE = 'execute'; const MSGTYPE_EXECUTE = 'execute';
/** @var string Execute result */ /** @var string Execute result */
const MSGTYPE_RESULT = 'result'; const MSGTYPE_RESULT = 'result';
/** @var string Error message */
const MSGTYPE_ERROR = 'error';
/** @var string Notify event */ /** @var string Notify event */
const MSGTYPE_NOTIFY = 'notify'; const MSGTYPE_NOTIFY = 'notify';
/** @var string Registry update (command list set and update) */ /** @var string Registry update (command list set and update) */
@ -27,16 +29,16 @@ class Message implements JsonSerializable
private array $messageData; private array $messageData;
public function __construct(string $messageType, array $messageData = [], ?string $uuid = null) public function __construct(string $messageType, array $messageData = [], null|string|false $uuid = null)
{ {
$this->uuid = ($uuid && Uuid::isValid($uuid)) ? $uuid : (string)Uuid::v7(); $this->uuid = ($uuid===null) ? (string)Uuid::v7() : $uuid;
$this->messageType = $messageType; $this->messageType = $messageType;
$this->messageData = $messageData; $this->messageData = $messageData;
} }
public function getUuid(): string public function getUuid(): string
{ {
return $this->uuid; return $this->uuid ? $this->uuid : "";
} }
public function getType(): string public function getType(): string
@ -61,7 +63,12 @@ class Message implements JsonSerializable
if (!$json || empty($json['msg'])) { if (!$json || empty($json['msg'])) {
throw new MessageException("Invalid data"); throw new MessageException("Invalid data");
} }
return new Message($json['msg'], $json['data'], $json['uuid']); return new Message($json['msg'], $json['data'], $json['uuid']??false);
}
public static function fromStringMulti(string $data): array
{
return array_map(fn($v) => Message::fromString($v), array_filter(explode("\n", $data)));
} }
public function asResult($result): Message public function asResult($result): Message
@ -71,6 +78,13 @@ class Message implements JsonSerializable
], $this->uuid); ], $this->uuid);
} }
public function asError($error): Message
{
return new Message(self::MSGTYPE_ERROR, [
'error' => $error
], $this->uuid);
}
public function toJson(): string public function toJson(): string
{ {
return json_encode($this, JSON_UNESCAPED_SLASHES); return json_encode($this, JSON_UNESCAPED_SLASHES);

View File

@ -1,9 +1,8 @@
<?php <?php
namespace NoccyLabs\React\CommandBus; namespace NoccyLabs\React\CommandBus;
use RuntimeException;
class MessageException extends RuntimeException implements CommandBusException class MessageException extends CommandBusException
{ {
} }