loop = $loop ?? Loop::get(); $this->webSocket = new WebSocketMiddleware(); $this->webSocket->on(WebSocketMiddleware::EVENT_CONNECTION, $this->onWebSocketConnection(...)); } /** * * * @param ServerRequestInterface $request * @param callable $next * @return PromiseInterface */ public function __invoke(ServerRequestInterface $request, callable $next): PromiseInterface { return new Promise( function (callable $resolve, callable $reject) use ($next, $request) { if ($request->getUri()->getPath() == "/.well-known/mercure") $resolve(call_user_func($this->webSocket, $request, $next)); else $resolve($next($request)); } ); } /** * * */ private function onWebSocketConnection(WebSocketConnection $connection) { $this->webSocketClients->attach($connection); $connection->on('close', function () use ($connection) { $this->webSocketClients->detach($connection); }); $request = $connection->getServerRequest(); $topic = $request->getQueryParams()['topic'][0]??''; $connection->setGroup($topic); } }