*/ private array $pending = []; private string $address; private bool $autoReconnect = true; private bool $isReconnecting = false; public function __construct(?CommandRegistry $commandRegistry = null, ?ConnectorInterface $connector = null) { $this->commandRegistry = $commandRegistry; if ($commandRegistry) { $commandRegistry->on(CommandRegistry::EVENT_REGISTERED, $this->onCommandRegistered(...)); $commandRegistry->on(CommandRegistry::EVENT_UNREGISTERED, $this->onCommandUnregistered(...)); } $this->connector = $connector??new TcpConnector(); } public function connect(string $address): void { $this->address = $address; $this->reconnect(); } public function close(): void { $this->autoReconnect = false; $this->connection->end(); $this->connection->removeAllListeners(); $this->connection = null; } private function reconnect(): void { if ($this->isReconnecting) return; if ($this->connection) { $this->connection->end(); $this->connection->removeAllListeners(); $this->connection = null; } $this->isReconnecting = true; $this->connector->connect($this->address)->then( function (DuplexStreamInterface $connection) { $this->isReconnecting = false; $this->connection = $connection; $this->emit(self::EVENT_CONNECTED); $connection->on('error', function ($error) { $this->emit(self::EVENT_ERROR, [ $error ]); $this->emit(self::EVENT_DISCONNECTED); if ($this->autoReconnect) { Loop::addTimer(1, $this->reconnect(...)); } }); $connection->on('close', function () { $this->emit(self::EVENT_DISCONNECTED); if ($this->autoReconnect) { Loop::addTimer(1, $this->reconnect(...)); } }); $connection->on('data', function ($data) use ($connection) { try { $messages = Message::fromStringMulti($data); foreach ($messages as $message) { $this->onServerMessage($message); } } catch (MessageException $e) { $connection->end('{"msg":"error","data":{"error":"Bad message format"}}'); } }); }, function (Throwable $e) { $this->isReconnecting = false; $this->emit(self::EVENT_ERROR, [ $e->getMessage(), $e ]); Loop::addTimer(5, $this->reconnect(...)); } ); } private function onServerMessage(Message $message): void { //fprintf(STDERR, "onServerMessage: %s\n", $message->toJson()); switch ($message->getType()) { case Message::MSGTYPE_EXECUTE: // server call to execute command // TODO implement me break; case Message::MSGTYPE_RESULT: // result from server $uuid = $message->getUuid(); $result = $message->getData()['result']; if (array_key_exists($uuid, $this->pending)) { $this->pending[$uuid]->resolve($result); unset($this->pending[$uuid]); } break; case Message::MSGTYPE_NOTIFY: // notify $event = $message->getData()['event']??null; $data = (array)($message->getData()['data']??[]); $this->emit(self::EVENT_NOTIFY, [ $event, $data ]); break; case Message::MSGTYPE_ERROR: // error //var_dump($message); $uuid = $message->getUuid(); $error = $message->getData()['error']; if ($uuid && array_key_exists($uuid, $this->pending)) { $this->pending[$uuid]->reject(new \Exception($error)); unset($this->pending[$uuid]); } else { $this->emit('error', [ $error ]); } break; default: $this->connection->end('{"msg":"error","data":{"error":"Unexpected message type"}}'); } } private function onCommandRegistered(string $command): void { if ($this->connection && $this->connection->isWritable()) { $msg = new Message(Message::MSGTYPE_REGISTRY, [ 'register' => [ $command ]]); $this->connection->write($msg->toJson()); } } private function onCommandUnregistered(string $command): void { if ($this->connection && $this->connection->isWritable()) { $msg = new Message(Message::MSGTYPE_REGISTRY, [ 'unregister' => [ $command ]]); $this->connection->write($msg->toJson()); } } /** * {@inheritDoc} */ public function execute(string $command, array $context): PromiseInterface { if (!$this->connection || !$this->connection->isWritable()) { $this->reconnect(); return new Promise(function (callable $resolve, callable $canceller) { $canceller(new ConnectionException("Not connected to command bus.")); }); } $deferred = new Deferred(); $message = new Message(Message::MSGTYPE_EXECUTE, [ 'command' => $command, 'context' => $context ]); $this->pending[$message->getUuid()] = $deferred; // fprintf(STDERR, "write: %s\n", $message->toJson()); $this->connection->write($message->toJson()."\n"); return $deferred->promise(); } /** * {@inheritDoc} */ public function notify(string $event, array $data): void { } }