commandRegistry = $commandRegistry; $this->connections = new SplObjectStorage(); $this->servers = new SplObjectStorage(); } public function addServer(ServerInterface $server): void { $this->servers->attach($server); $server->on('connection', $this->onServerConnection(...)); } public function removeServer(ServerInterface $server): void { $server->close(); $server->removeListener('connection', $this->onServerConnection(...)); $this->servers->detach($server); } public function close(): void { foreach ($this->servers as $server) { $this->removeServer($server); } } private function onServerConnection(DuplexStreamInterface $client): void { $this->connections->attach($client); $client->on('data', function ($data) use ($client) { try { $messages = Message::fromStringMulti($data); foreach ($messages as $message) { $this->onClientMessage($client, $message); } } catch (MessageException $e) { $client->end('{"msg":"error","data":{"error":"Bad message format"}}'); } }); $client->on('close', function () use ($client) { $this->connections->detach($client); }); } private function onClientMessage(DuplexStreamInterface $client, Message $message): void { switch ($message->getType()) { case Message::MSGTYPE_EXECUTE: // Client call to execute command $data = $message->getData(); $context = new Context($data['command'],$data['context']); $this->executeContext($context)->then( function ($result) use ($message, $client) { $client->write($message->asResult($result)->toJson()."\n"); }, function (\Throwable $error) use ($message, $client) { $client->write($message->asError($error->getMessage())->toJson()."\n"); } ); break; case Message::MSGTYPE_RESULT: // Result from execution on client // TODO implement me break; case Message::MSGTYPE_REGISTRY: // command registry actions // TODO implement me break; default: $client->end('{"msg":"error","data":{"error":"Unexpected message type"}}'); } } /** * {@inheritDoc} */ public function execute(string $command, array $context): PromiseInterface { $context = new Context($command, $context); return $this->executeContext($context); } private function executeContext(Context $context): PromiseInterface { $command = $this->commandRegistry->find($context->getCommandName()); if (!$command) return new Promise(function (callable $resolve) use ($context) { throw new \RuntimeException("Unable to resolve command: ".$context->getCommandName()); }); return $command->call($context); } /** * {@inheritDoc} */ public function notify(string $event, array $data): void { $this->emit(self::EVENT_NOTIFY, [ $event, $data ]); $json = (new Message(Message::MSGTYPE_NOTIFY, [ 'event' => $event, 'data' => $data ]))->toJson(); foreach ($this->connections as $client) { $client->write($json."\n"); } } }