Bugfixes, reconnection in client, readme
This commit is contained in:
parent
6f49b69a66
commit
e670d725e9
@ -2,7 +2,7 @@
|
|||||||
|
|
||||||
## Features
|
## Features
|
||||||
|
|
||||||
* Can run monolithic (create a bus and use as is), or distributed (create bus and use clients), or in a hybrid setup. The CommandBus functions identical with or without clients.
|
* Can run monolithic (create a bus and use as is), or distributed (create bus and use clients), or in a hybrid setup. The CommandBus functions identical with or without listeners and clients.
|
||||||
* All commands called asynchronously using promises and deferreds.
|
* All commands called asynchronously using promises and deferreds.
|
||||||
* Push notifications from the bus to subscribers and listeners, such as progress or log/error messages.
|
* Push notifications from the bus to subscribers and listeners, such as progress or log/error messages.
|
||||||
|
|
||||||
|
@ -30,6 +30,8 @@ class CommandBusClient implements CommandBusInterface
|
|||||||
|
|
||||||
private string $address;
|
private string $address;
|
||||||
|
|
||||||
|
private bool $autoReconnect = true;
|
||||||
|
|
||||||
public function __construct(?CommandRegistry $commandRegistry = null, ?ConnectorInterface $connector = null)
|
public function __construct(?CommandRegistry $commandRegistry = null, ?ConnectorInterface $connector = null)
|
||||||
{
|
{
|
||||||
$this->commandRegistry = $commandRegistry;
|
$this->commandRegistry = $commandRegistry;
|
||||||
@ -48,6 +50,7 @@ class CommandBusClient implements CommandBusInterface
|
|||||||
|
|
||||||
public function close(): void
|
public function close(): void
|
||||||
{
|
{
|
||||||
|
$this->autoReconnect = false;
|
||||||
$this->connection->close();
|
$this->connection->close();
|
||||||
$this->connection->removeAllListeners();
|
$this->connection->removeAllListeners();
|
||||||
$this->connection = null;
|
$this->connection = null;
|
||||||
@ -56,15 +59,26 @@ class CommandBusClient implements CommandBusInterface
|
|||||||
private function reconnect(): void
|
private function reconnect(): void
|
||||||
{
|
{
|
||||||
if ($this->connection) {
|
if ($this->connection) {
|
||||||
$this->close();
|
$this->connection->close();
|
||||||
|
$this->connection->removeAllListeners();
|
||||||
|
$this->connection = null;
|
||||||
}
|
}
|
||||||
|
|
||||||
$this->connector->connect($this->address)->then(
|
$this->connector->connect($this->address)->then(
|
||||||
function (DuplexStreamInterface $connection) {
|
function (DuplexStreamInterface $connection) {
|
||||||
$this->connection = $connection;
|
$this->connection = $connection;
|
||||||
$this->emit(self::EVENT_CONNECTED);
|
$this->emit(self::EVENT_CONNECTED);
|
||||||
|
$connection->on('error', function () {
|
||||||
|
$this->emit(self::EVENT_DISCONNECTED);
|
||||||
|
if ($this->autoReconnect) {
|
||||||
|
Loop::addTimer(1, $this->reconnect(...));
|
||||||
|
}
|
||||||
|
});
|
||||||
$connection->on('close', function () {
|
$connection->on('close', function () {
|
||||||
$this->emit(self::EVENT_DISCONNECTED);
|
$this->emit(self::EVENT_DISCONNECTED);
|
||||||
|
if ($this->autoReconnect) {
|
||||||
|
Loop::addTimer(1, $this->reconnect(...));
|
||||||
|
}
|
||||||
});
|
});
|
||||||
$connection->on('data', function ($data) use ($connection) {
|
$connection->on('data', function ($data) use ($connection) {
|
||||||
try {
|
try {
|
||||||
@ -128,6 +142,12 @@ class CommandBusClient implements CommandBusInterface
|
|||||||
*/
|
*/
|
||||||
public function execute(string $command, array $context): PromiseInterface
|
public function execute(string $command, array $context): PromiseInterface
|
||||||
{
|
{
|
||||||
|
if (!$this->connection) {
|
||||||
|
return new Promise(function (callable $resolve) {
|
||||||
|
throw new \RuntimeException("Not connected to command bus.");
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
$deferred = new Deferred();
|
$deferred = new Deferred();
|
||||||
|
|
||||||
$message = new Message(Message::MSGTYPE_EXECUTE, [
|
$message = new Message(Message::MSGTYPE_EXECUTE, [
|
||||||
|
@ -15,6 +15,8 @@ class Message implements JsonSerializable
|
|||||||
const MSGTYPE_EXECUTE = 'execute';
|
const MSGTYPE_EXECUTE = 'execute';
|
||||||
/** @var string Execute result */
|
/** @var string Execute result */
|
||||||
const MSGTYPE_RESULT = 'result';
|
const MSGTYPE_RESULT = 'result';
|
||||||
|
/** @var string Error message */
|
||||||
|
const MSGTYPE_ERROR = 'error';
|
||||||
/** @var string Notify event */
|
/** @var string Notify event */
|
||||||
const MSGTYPE_NOTIFY = 'notify';
|
const MSGTYPE_NOTIFY = 'notify';
|
||||||
/** @var string Registry update (command list set and update) */
|
/** @var string Registry update (command list set and update) */
|
||||||
@ -29,7 +31,7 @@ class Message implements JsonSerializable
|
|||||||
|
|
||||||
public function __construct(string $messageType, array $messageData = [], ?string $uuid = null)
|
public function __construct(string $messageType, array $messageData = [], ?string $uuid = null)
|
||||||
{
|
{
|
||||||
$this->uuid = ($uuid && Uuid::isValid($uuid)) ? $uuid : (string)Uuid::v7();
|
$this->uuid = $uuid ?? (string)Uuid::v7();
|
||||||
$this->messageType = $messageType;
|
$this->messageType = $messageType;
|
||||||
$this->messageData = $messageData;
|
$this->messageData = $messageData;
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user