loop = $loop??Loop::get(); $this->config = $config; $this->logger = $logger ?? new Logger("main"); if ($logger instanceof Logger) { $topicLogger = $logger->withName("broker"); } else { $topicLogger = $this->logger; } $this->topicManager = new TopicManager($topicLogger); $this->loop->addPeriodicTimer(30, function () { $this->topicManager->garbageCollect(); }); $this->webSocketClients = new SplObjectStorage(); $this->server = $this->createHttpServer(); } /** * * * @return void */ public function listen(ServerInterface $socket): void { $this->server->listen($socket); $this->logger->info(sprintf( "Listening on %s", strtr($socket->getAddress(), [ "tcp://"=>"http://", "tls://"=>"https://"]) )); } /** * * @return HttpServer */ private function createHttpServer(): HttpServer { $stack = []; $maxConcurrent = $this->config->get("server.limits.max_concurrent", 100); $maxRequestBody = $this->config->get("server.limits.max_request_body", 102400); $stack[] = new LimitConcurrentRequestsMiddleware($maxConcurrent); $stack[] = new RequestBodyBufferMiddleware($maxRequestBody); $stack = [ ...$stack, $this->responseMiddleware = new ResponseMiddleware( config: $this->config, logger: $this->logger->withName("http"), ), $this->securityMiddleware = new SecurityMiddleware( config: $this->config ), ]; if ($this->config->getEnableWebSockets()) { $stack = [ ...$stack, $this->webSocketHandler = new WebSocketHandler( config: $this->config, webSocketClients: $this->webSocketClients, topicManager: $this->topicManager ), ]; $this->logger->warning("The WebSocket support is incomplete and insecure, but enabling it as requested."); } $stack = [ ...$stack, $this->mercureHandler = new MercureHandler( config: $this->config, topicManager: $this->topicManager ), $this->apiRequestHandler = new ApiHandler( config: $this->config, topicManager: $this->topicManager ), $this->notFoundHandler = new NotFoundHandler() ]; return new HttpServer(...$stack); } }