loop = $loop??Loop::get(); $this->config = $config; $this->logger = $logger ?? new Logger("main"); if ($logger instanceof Logger) { $topicLogger = $logger->withName("broker"); } else { $topicLogger = $this->logger; } $this->topicManager = new TopicManager($topicLogger); $this->loop->addPeriodicTimer(30, function () { $this->topicManager->garbageCollect(); }); $this->webSocketClients = new SplObjectStorage(); } /** * * * @return void */ public function listen(ServerInterface $socket): void { if (!$this->server) { $this->server = $this->createHttpServer(); } $this->server->listen($socket); $this->logger->info(sprintf( "Listening on %s", strtr($socket->getAddress(), [ "tcp://"=>"http://", "tls://"=>"https://"]) )); } /** * * @return HttpServer */ private function createHttpServer(): HttpServer { $stack = []; $maxConcurrent = $this->config->get("server.limits.max_concurrent", 100); $maxRequestBody = $this->config->get("server.limits.max_request_body", 102400); $stack[] = new LimitConcurrentRequestsMiddleware($maxConcurrent); $stack[] = new RequestBodyBufferMiddleware($maxRequestBody); $stack[] = new ResponseMiddleware( config: $this->config, logger: $this->logger->withName("http"), ); $stack[] = new SecurityMiddleware( config: $this->config ); if ($this->config->getEnableWebSockets()) { $stack[] = $webSocketHandler = new WebSocketHandler( config: $this->config, webSocketClients: $this->webSocketClients, topicManager: $this->topicManager ); $this->logger->warning("The WebSocket support is incomplete and insecure, but enabling it as requested."); } $stack[] = new MercureHandler( config: $this->config, topicManager: $this->topicManager ); if ($this->config->get('server.enable_api', true)) { $stack[] = new ApiHandler( config: $this->config, topicManager: $this->topicManager ); $this->logger->info("Enabling the API middleware"); } $stack[] = new NotFoundHandler(); return new HttpServer(...$stack); } }