loop = $loop??Loop::get(); $this->config = $config; $this->logger = $logger ?? new NullLogger(); if ($logger instanceof Logger) { $topicLogger = $logger->withName("broker"); } else { $topicLogger = $this->logger; } $this->topicManager = new TopicManager($topicLogger); $this->loop->addPeriodicTimer(30, function () { $this->topicManager->garbageCollect(); }); $this->webSocketClients = new SplObjectStorage(); $this->server = $this->createHttpServer(); } /** * * * @return void */ public function listen(ServerInterface $socket): void { $this->server->listen($socket); $this->logger->info(sprintf( "Listening on %s", str_replace("tcp://",($socket instanceof SecureServer?"https://":"http://"),$socket->getAddress()) )); } /** * * @return HttpServer */ private function createHttpServer(): HttpServer { $stack = [ $this->responseMiddleware = new ResponseMiddleware( config: $this->config, logger: $this->logger->withName("http"), ), $this->securityMiddleware = new SecurityMiddleware( config: $this->config ), ]; if ($this->config->getEnableWebSockets()) { $stack = [ ...$stack, $this->webSocketHandler = new WebSocketHandler( config: $this->config, webSocketClients: $this->webSocketClients, topicManager: $this->topicManager ), ]; $this->logger->warning("The WebSocket support is incomplete and insecure, but enabling it as requested."); } $stack = [ ...$stack, $this->mercureHandler = new MercureHandler( config: $this->config, topicManager: $this->topicManager ), $this->apiRequestHandler = new ApiHandler( config: $this->config, topicManager: $this->topicManager ), $this->notFoundHandler = new NotFoundHandler() ]; return new HttpServer(...$stack); } }