loop = $loop ?? Loop::get(); } /** * Mecure handler middleware * * @param ServerRequestInterface $request * @param callable $next * @return PromiseInterface */ public function __invoke(ServerRequestInterface $request, callable $next): PromiseInterface { return new Promise( function (callable $resolve, callable $reject) use ($next, $request) { if ($request->getUri()->getPath() == "/.well-known/mercure") { if ($request->getMethod() == 'POST') { $resolve($this->handleMercurePublish($request)); return; } $resolve($this->handleMercureClient($request)); } else { $resolve($next($request)); } } ); } /** * * * @param ServerRequestInterface $request * @return ResponseInterface */ private function handleMercureClient(ServerRequestInterface $request): ResponseInterface { $tok = $request->getAttribute('authorization'); if ($tok instanceof JWTToken) { $claims = $tok->claims->getAll(); if (isset($claims['mercure']['subscribe'])) { $subscribeClaims = $claims['mercure']['subscribe']; // TODO check topic against subscribeClaims } } $responseStream = new ThroughStream(); $response = new Response( body: $responseStream ); $subscriber = new SseSubscriber($responseStream, $request); $query = $request->getUri()->getQuery(); $query = explode("&", $query); $topics = []; foreach ($query as $param) { if (!str_contains($param, "=")) throw new RequestException( message: "Invalid request data", code: RequestException::ERR_INVALID_REQUEST_DATA ); [ $name, $value ] = array_map('urldecode', explode("=", $param, 2)); if ($name === 'topic') $topics[] = $value; } // Grab the JWT token from the requests authorization attribute $tok = $request->getAttribute('authorization'); if ($tok instanceof JWTToken) { $claims = $tok->claims->getAll(); if (isset($claims['mercure']['subscribe'])) { $subscribeClaims = $claims['mercure']['subscribe']; if (!$this->checkTopicClaims($topics, $subscribeClaims)) { throw new SecurityException( message: "Insufficient permissions for subscribe", code: SecurityException::ERR_NO_PERMISSION ); } } } else { // Disallow if we don't allow anonymous subscribers. Note that anonymous // subscribers will not receive updates marked as private. if (!$this->config->getAllowAnonymousSubscribe()) { throw new SecurityException( message: "Anonymous access disallowed", code: SecurityException::ERR_ACCESS_DENIED ); } } $this->topicManager->subscribe($subscriber, $topics); $responseStream->on('close', function () use ($subscriber, $topics) { $this->topicManager->unsubscribe($subscriber, $topics); }); return $response ->withHeader("Cache-Control", "no-store") ->withHeader("Content-Type", "text/event-stream"); } /** * * * @param ServerRequestInterface $request * @return ResponseInterface */ private function handleMercurePublish(ServerRequestInterface $request): ResponseInterface { if ($request->getHeaderLine('content-type') !== 'application/x-www-form-urlencoded') { throw new \Exception("Invalid request"); } // Parse out the urlencoded body. Pretty sure there is a better way to do this? $body = explode("&", (string)$request->getBody()); $data = []; foreach ($body as $param) { if (!str_contains($param, "=")) throw new RequestException( message: "Invalid request data", code: RequestException::ERR_INVALID_REQUEST_DATA ); [ $name, $value ] = array_map('urldecode', explode("=", $param, 2)); if (in_array($name, [ 'topic' ])) { if (!isset($data[$name])) $data[$name] = []; $data[$name][] = $value; } else { $data[$name] = $value; } } // Grab the JWT token from the requests authorization attribute $tok = $request->getAttribute('authorization'); if ($tok instanceof JWTToken) { $claims = $tok->claims->getAll(); if (isset($claims['mercure']['publish'])) { $publishClaims = $claims['mercure']['publish']; // check topic against publishClaims if (!$this->checkTopicClaims($data['topic']??[], $publishClaims)) { throw new SecurityException( message: "Insufficient permissions for publish", code: SecurityException::ERR_NO_PERMISSION ); } } } else { // reject if access denied throw new SecurityException( message: "Access denied", code: SecurityException::ERR_ACCESS_DENIED ); } // Put an id in there if none already // TODO add a configurable for this if (!isset($data['id'])) { $data['id'] = (string)Uuid::v7(); } // Attempt to create the message $message = Message::fromData($data); $this->loop->futureTick(function () use ($message) { $this->publishMercureMessage($message); }); return Response::plaintext("urn:uuid:".$message->id."\n"); } private function checkTopicClaims(string|array $topic, array $claims): bool { $matched = 0; foreach ((array)$topic as $match) { foreach ($claims as $claim) { // TODO implement matching of URI Templates if (($claim === "*") || ($claim === $match)) { $matched++; break; } } } return ($matched == count($topic)); } /** * * * @param Message $message * @return void */ private function publishMercureMessage(Message $message): void { $this->topicManager->publish($message); } }