*/ private $resolvers = []; private SplObjectStorage $connections; private SplObjectStorage $servers; public function __construct(?CommandResolverInterface $commands=null) { if ($commands) { $this->resolvers[] = $commands; } $this->connections = new SplObjectStorage(); $this->servers = new SplObjectStorage(); } /** * Get the command registry * * @deprecated Use getCommandNames() instead * @return null */ public function getRegistry(): ?CommandRegistry { return null; // $this->commandRegistry; } /** * Add a CommandResolver * * @param CommandResolverInterface $resolver * @return void */ public function addResolver(CommandResolverInterface $resolver): void { $this->resolvers[] = $resolver; } public function removeResolver(CommandResolverInterface $resolver): void { // FIXME implement } /** * Get the names of defined commands * * @return array */ public function getCommandNames(): array { $commands = []; foreach ($this->resolvers as $resolver) { $commands = array_merge($commands, $resolver->getCommandNames()); } sort($commands); return array_unique($commands); } /** * Find a command by searching through the resolvers * * @param string $command * @return Command|null */ public function findCommand(string $command): ?Command { foreach ($this->resolvers as $resolver) { if ($found = $resolver->findCommand($command)) return $found; } return null; } /** * Add a server listener * * @param ServerInterface $server * @return void */ public function addServer(ServerInterface $server): void { $this->servers->attach($server); $server->on('connection', $this->onServerConnection(...)); } /** * Remove a server listener * * @param ServerInterface $server * @return void */ public function removeServer(ServerInterface $server): void { $server->close(); $server->removeListener('connection', $this->onServerConnection(...)); $this->servers->detach($server); } /** * Close and shutdown the servers * * @return void */ public function close(): void { foreach ($this->servers as $server) { $this->removeServer($server); } } private function onServerConnection(DuplexStreamInterface $client): void { $this->connections->attach($client); $client->on('data', function ($data) use ($client) { try { $messages = Message::fromStringMulti($data); foreach ($messages as $message) { $this->onClientMessage($client, $message); } } catch (MessageException $e) { $client->end('{"msg":"error","data":{"error":"Bad message format"}}'); } }); $client->on('close', function () use ($client) { $this->connections->detach($client); }); } private function onClientMessage(DuplexStreamInterface $client, Message $message): void { switch ($message->getType()) { case Message::MSGTYPE_EXECUTE: // Client call to execute command $data = $message->getData(); $context = new Context($data['command'],$data['context']); $this->executeContext($context)->then( function ($result) use ($message, $client) { $client->write($message->asResult($result)->toJson()."\n"); }, function (\Throwable $error) use ($message, $client) { $client->write($message->asError($error->getMessage())->toJson()."\n"); } ); break; case Message::MSGTYPE_RESULT: // Result from execution on client // TODO implement me break; case Message::MSGTYPE_REGISTRY: // command registry actions // TODO implement me break; default: $client->end('{"msg":"error","data":{"error":"Unexpected message type"}}'); } } /** * {@inheritDoc} */ public function execute(string $command, array $context): PromiseInterface { $context = new Context($command, $context); return $this->executeContext($context); } private function executeContext(Context $context): PromiseInterface { $command = $this->findCommand($context->getCommandName()); if (!$command) return new Promise(function (callable $resolve) use ($context) { throw new \RuntimeException("Unable to resolve command: ".$context->getCommandName()); }); return $command->call($context); } /** * {@inheritDoc} */ public function notify(string $event, array $data): void { $this->emit(self::EVENT_NOTIFY, [ $event, $data ]); $json = (new Message(Message::MSGTYPE_NOTIFY, [ 'event' => $event, 'data' => $data ]))->toJson(); foreach ($this->connections as $client) { $client->write($json."\n"); } } }