Compare commits
9 Commits
Author | SHA1 | Date | |
---|---|---|---|
b2494c3163 | |||
c4f3e8ae50 | |||
9bd53062b0 | |||
1255db21ab | |||
9d3f6d9ddd | |||
324fa6e519 | |||
4cfb66756f | |||
3ea23d6533 | |||
e670d725e9 |
@ -2,7 +2,7 @@
|
|||||||
|
|
||||||
## Features
|
## Features
|
||||||
|
|
||||||
* Can run monolithic (create a bus and use as is), or distributed (create bus and use clients), or in a hybrid setup. The CommandBus functions identical with or without clients.
|
* Can run monolithic (create a bus and use as is), or distributed (create bus and use clients), or in a hybrid setup. The CommandBus functions identical with or without listeners and clients.
|
||||||
* All commands called asynchronously using promises and deferreds.
|
* All commands called asynchronously using promises and deferreds.
|
||||||
* Push notifications from the bus to subscribers and listeners, such as progress or log/error messages.
|
* Push notifications from the bus to subscribers and listeners, such as progress or log/error messages.
|
||||||
|
|
||||||
|
@ -51,8 +51,10 @@ class CommandBus implements CommandBusInterface
|
|||||||
$this->connections->attach($client);
|
$this->connections->attach($client);
|
||||||
$client->on('data', function ($data) use ($client) {
|
$client->on('data', function ($data) use ($client) {
|
||||||
try {
|
try {
|
||||||
$message = Message::fromString($data);
|
$messages = Message::fromStringMulti($data);
|
||||||
|
foreach ($messages as $message) {
|
||||||
$this->onClientMessage($client, $message);
|
$this->onClientMessage($client, $message);
|
||||||
|
}
|
||||||
} catch (MessageException $e) {
|
} catch (MessageException $e) {
|
||||||
$client->end('{"msg":"error","data":{"error":"Bad message format"}}');
|
$client->end('{"msg":"error","data":{"error":"Bad message format"}}');
|
||||||
}
|
}
|
||||||
@ -71,6 +73,9 @@ class CommandBus implements CommandBusInterface
|
|||||||
$this->executeContext($context)->then(
|
$this->executeContext($context)->then(
|
||||||
function ($result) use ($message, $client) {
|
function ($result) use ($message, $client) {
|
||||||
$client->write($message->asResult($result)->toJson()."\n");
|
$client->write($message->asResult($result)->toJson()."\n");
|
||||||
|
},
|
||||||
|
function (\Throwable $error) use ($message, $client) {
|
||||||
|
$client->write($message->asError($error->getMessage())->toJson()."\n");
|
||||||
}
|
}
|
||||||
);
|
);
|
||||||
break;
|
break;
|
||||||
|
@ -30,6 +30,10 @@ class CommandBusClient implements CommandBusInterface
|
|||||||
|
|
||||||
private string $address;
|
private string $address;
|
||||||
|
|
||||||
|
private bool $autoReconnect = true;
|
||||||
|
|
||||||
|
private bool $isReconnecting = false;
|
||||||
|
|
||||||
public function __construct(?CommandRegistry $commandRegistry = null, ?ConnectorInterface $connector = null)
|
public function __construct(?CommandRegistry $commandRegistry = null, ?ConnectorInterface $connector = null)
|
||||||
{
|
{
|
||||||
$this->commandRegistry = $commandRegistry;
|
$this->commandRegistry = $commandRegistry;
|
||||||
@ -48,28 +52,48 @@ class CommandBusClient implements CommandBusInterface
|
|||||||
|
|
||||||
public function close(): void
|
public function close(): void
|
||||||
{
|
{
|
||||||
$this->connection->close();
|
$this->autoReconnect = false;
|
||||||
|
$this->connection->end();
|
||||||
$this->connection->removeAllListeners();
|
$this->connection->removeAllListeners();
|
||||||
$this->connection = null;
|
$this->connection = null;
|
||||||
}
|
}
|
||||||
|
|
||||||
private function reconnect(): void
|
private function reconnect(): void
|
||||||
{
|
{
|
||||||
|
if ($this->isReconnecting) return;
|
||||||
|
|
||||||
if ($this->connection) {
|
if ($this->connection) {
|
||||||
$this->close();
|
$this->connection->end();
|
||||||
|
$this->connection->removeAllListeners();
|
||||||
|
$this->connection = null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
$this->isReconnecting = true;
|
||||||
$this->connector->connect($this->address)->then(
|
$this->connector->connect($this->address)->then(
|
||||||
function (DuplexStreamInterface $connection) {
|
function (DuplexStreamInterface $connection) {
|
||||||
$this->connection = $connection;
|
$this->connection = $connection;
|
||||||
$this->emit(self::EVENT_CONNECTED);
|
$this->emit(self::EVENT_CONNECTED);
|
||||||
|
$connection->on('error', function ($error) {
|
||||||
|
$this->emit(self::EVENT_ERROR, [ $error ]);
|
||||||
|
$this->emit(self::EVENT_DISCONNECTED);
|
||||||
|
if ($this->autoReconnect) {
|
||||||
|
Loop::addTimer(1, $this->reconnect(...));
|
||||||
|
}
|
||||||
|
$this->isReconnecting = false;
|
||||||
|
});
|
||||||
$connection->on('close', function () {
|
$connection->on('close', function () {
|
||||||
$this->emit(self::EVENT_DISCONNECTED);
|
$this->emit(self::EVENT_DISCONNECTED);
|
||||||
|
if ($this->autoReconnect) {
|
||||||
|
Loop::addTimer(1, $this->reconnect(...));
|
||||||
|
}
|
||||||
|
$this->isReconnecting = false;
|
||||||
});
|
});
|
||||||
$connection->on('data', function ($data) use ($connection) {
|
$connection->on('data', function ($data) use ($connection) {
|
||||||
try {
|
try {
|
||||||
$message = Message::fromString($data);
|
$messages = Message::fromStringMulti($data);
|
||||||
|
foreach ($messages as $message) {
|
||||||
$this->onServerMessage($message);
|
$this->onServerMessage($message);
|
||||||
|
}
|
||||||
} catch (MessageException $e) {
|
} catch (MessageException $e) {
|
||||||
$connection->end('{"msg":"error","data":{"error":"Bad message format"}}');
|
$connection->end('{"msg":"error","data":{"error":"Bad message format"}}');
|
||||||
}
|
}
|
||||||
@ -102,6 +126,17 @@ class CommandBusClient implements CommandBusInterface
|
|||||||
$data = (array)($message->getData()['data']??[]);
|
$data = (array)($message->getData()['data']??[]);
|
||||||
$this->emit(self::EVENT_NOTIFY, [ $event, $data ]);
|
$this->emit(self::EVENT_NOTIFY, [ $event, $data ]);
|
||||||
break;
|
break;
|
||||||
|
case Message::MSGTYPE_ERROR: // error
|
||||||
|
var_dump($message);
|
||||||
|
$uuid = $message->getUuid();
|
||||||
|
$error = $message->getData()['error'];
|
||||||
|
if ($uuid && array_key_exists($uuid, $this->pending)) {
|
||||||
|
$this->pending[$uuid]->reject(new \Exception($error));
|
||||||
|
unset($this->pending[$uuid]);
|
||||||
|
} else {
|
||||||
|
$this->emit('error', [ $error ]);
|
||||||
|
}
|
||||||
|
break;
|
||||||
default:
|
default:
|
||||||
$this->connection->end('{"msg":"error","data":{"error":"Unexpected message type"}}');
|
$this->connection->end('{"msg":"error","data":{"error":"Unexpected message type"}}');
|
||||||
}
|
}
|
||||||
@ -128,6 +163,13 @@ class CommandBusClient implements CommandBusInterface
|
|||||||
*/
|
*/
|
||||||
public function execute(string $command, array $context): PromiseInterface
|
public function execute(string $command, array $context): PromiseInterface
|
||||||
{
|
{
|
||||||
|
if (!$this->connection || !$this->connection->isWritable()) {
|
||||||
|
$this->reconnect();
|
||||||
|
return new Promise(function (callable $resolve, callable $canceller) {
|
||||||
|
$canceller(new ConnectionException("Not connected to command bus."));
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
$deferred = new Deferred();
|
$deferred = new Deferred();
|
||||||
|
|
||||||
$message = new Message(Message::MSGTYPE_EXECUTE, [
|
$message = new Message(Message::MSGTYPE_EXECUTE, [
|
||||||
|
@ -2,7 +2,7 @@
|
|||||||
|
|
||||||
namespace NoccyLabs\React\CommandBus;
|
namespace NoccyLabs\React\CommandBus;
|
||||||
|
|
||||||
interface CommandBusException
|
class CommandBusException extends \RuntimeException
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
|
8
src/ConnectionException.php
Normal file
8
src/ConnectionException.php
Normal file
@ -0,0 +1,8 @@
|
|||||||
|
<?php
|
||||||
|
|
||||||
|
namespace NoccyLabs\React\CommandBus;
|
||||||
|
|
||||||
|
class ConnectionException extends CommandBusException
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
@ -15,6 +15,8 @@ class Message implements JsonSerializable
|
|||||||
const MSGTYPE_EXECUTE = 'execute';
|
const MSGTYPE_EXECUTE = 'execute';
|
||||||
/** @var string Execute result */
|
/** @var string Execute result */
|
||||||
const MSGTYPE_RESULT = 'result';
|
const MSGTYPE_RESULT = 'result';
|
||||||
|
/** @var string Error message */
|
||||||
|
const MSGTYPE_ERROR = 'error';
|
||||||
/** @var string Notify event */
|
/** @var string Notify event */
|
||||||
const MSGTYPE_NOTIFY = 'notify';
|
const MSGTYPE_NOTIFY = 'notify';
|
||||||
/** @var string Registry update (command list set and update) */
|
/** @var string Registry update (command list set and update) */
|
||||||
@ -27,16 +29,16 @@ class Message implements JsonSerializable
|
|||||||
|
|
||||||
private array $messageData;
|
private array $messageData;
|
||||||
|
|
||||||
public function __construct(string $messageType, array $messageData = [], ?string $uuid = null)
|
public function __construct(string $messageType, array $messageData = [], null|string|false $uuid = null)
|
||||||
{
|
{
|
||||||
$this->uuid = ($uuid && Uuid::isValid($uuid)) ? $uuid : (string)Uuid::v7();
|
$this->uuid = ($uuid===null) ? (string)Uuid::v7() : $uuid;
|
||||||
$this->messageType = $messageType;
|
$this->messageType = $messageType;
|
||||||
$this->messageData = $messageData;
|
$this->messageData = $messageData;
|
||||||
}
|
}
|
||||||
|
|
||||||
public function getUuid(): string
|
public function getUuid(): string
|
||||||
{
|
{
|
||||||
return $this->uuid;
|
return $this->uuid ? $this->uuid : "";
|
||||||
}
|
}
|
||||||
|
|
||||||
public function getType(): string
|
public function getType(): string
|
||||||
@ -61,7 +63,12 @@ class Message implements JsonSerializable
|
|||||||
if (!$json || empty($json['msg'])) {
|
if (!$json || empty($json['msg'])) {
|
||||||
throw new MessageException("Invalid data");
|
throw new MessageException("Invalid data");
|
||||||
}
|
}
|
||||||
return new Message($json['msg'], $json['data'], $json['uuid']);
|
return new Message($json['msg'], $json['data'], $json['uuid']??false);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static function fromStringMulti(string $data): array
|
||||||
|
{
|
||||||
|
return array_map(fn($v) => Message::fromString($v), array_filter(explode("\n", $data)));
|
||||||
}
|
}
|
||||||
|
|
||||||
public function asResult($result): Message
|
public function asResult($result): Message
|
||||||
@ -71,6 +78,13 @@ class Message implements JsonSerializable
|
|||||||
], $this->uuid);
|
], $this->uuid);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public function asError($error): Message
|
||||||
|
{
|
||||||
|
return new Message(self::MSGTYPE_ERROR, [
|
||||||
|
'error' => $error
|
||||||
|
], $this->uuid);
|
||||||
|
}
|
||||||
|
|
||||||
public function toJson(): string
|
public function toJson(): string
|
||||||
{
|
{
|
||||||
return json_encode($this, JSON_UNESCAPED_SLASHES);
|
return json_encode($this, JSON_UNESCAPED_SLASHES);
|
||||||
|
@ -1,9 +1,8 @@
|
|||||||
<?php
|
<?php
|
||||||
|
|
||||||
namespace NoccyLabs\React\CommandBus;
|
namespace NoccyLabs\React\CommandBus;
|
||||||
use RuntimeException;
|
|
||||||
|
|
||||||
class MessageException extends RuntimeException implements CommandBusException
|
class MessageException extends CommandBusException
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user