loop = $loop ?? Loop::get(); } /** * Mecure handler middleware * * @param ServerRequestInterface $request * @param callable $next * @return PromiseInterface */ public function __invoke(ServerRequestInterface $request, callable $next): PromiseInterface { return new Promise( function (callable $resolve, callable $reject) use ($next, $request) { if ($request->getUri()->getPath() == "/.well-known/mercure") { if ($request->getMethod() == 'POST') { $resolve($this->handleMercurePublish($request)); return; } $resolve($this->handleMercureClient($request)); } else { $resolve($next($request)); } } ); } /** * * * @param ServerRequestInterface $request * @return ResponseInterface */ private function handleMercureClient(ServerRequestInterface $request): ResponseInterface { $tok = $request->getAttribute('authorization'); if ($tok instanceof JWTToken) { $claims = $tok->claims->getAll(); if (isset($claims['mercure']['subscribe'])) { $subscribeClaims = $claims['mercure']['subscribe']; // TODO check topic against subscribeClaims } } $responseStream = new ThroughStream(); $response = new Response( body: $responseStream ); $this->eventClients->attach($responseStream, $request); $responseStream->on('close', function () use ($responseStream) { $this->eventClients->detach($responseStream);; }); return $response ->withHeader("Cache-Control", "no-store") ->withHeader("Content-Type", "text/event-stream"); } /** * * * @param ServerRequestInterface $request * @return ResponseInterface */ private function handleMercurePublish(ServerRequestInterface $request): ResponseInterface { if ($request->getHeaderLine('content-type') !== 'application/x-www-form-urlencoded') { throw new \Exception("Invalid request"); } // Parse out the urlencoded body. Pretty sure there is a better way to do this? $body = explode("&", (string)$request->getBody()); $data = []; foreach ($body as $param) { if (!str_contains($param, "=")) throw new RequestException("Invalid request data", RequestException::ERR_INVALID_REQUEST_DATA); [ $name, $value ] = array_map('urldecode', explode("=", $param, 2)); if (in_array($name, [ 'topic' ])) { if (!isset($data[$name])) $data[$name] = []; $data[$name][] = $value; } else { $data[$name] = $value; } } // Grab the JWT token from the requests authorization attribute $tok = $request->getAttribute('authorization'); if ($tok instanceof JWTToken) { $claims = $tok->claims->getAll(); if (isset($claims['mercure']['publish'])) { $publishClaims = $claims['mercure']['publish']; // TODO check topic against publishClaims if (!$this->checkTopicClaims($data['topic']??[], $publishClaims)) { throw new SecurityException("Insufficient permissions for publish", SecurityException::ERR_NO_PERMISSION); } } } else { // FIXME reject if access denied } // Put an id in there if none already // TODO add a configurable for this if (!isset($data['id'])) { $data['id'] = (string)Uuid::v7(); } // Attempt to create the message $message = Message::fromData($data); $this->loop->futureTick(function () use ($message) { $this->publishMercureMessage($message); }); return Response::plaintext("urn:uuid:".$message->id."\n"); } private function checkTopicClaims(string|array $topic, array $claims): bool { foreach ((array)$topic as $match) { foreach ($claims as $claim) { if ($claim === "*") return true; if ($claim === $match) return true; // TODO implement full matching } } return false; } /** * * * @param Message $message * @return void */ private function publishMercureMessage(Message $message): void { // foreach ($this->webSocketClients as $webSocket) { // $webSocket->write(json_encode([ // 'type' => $message->type, // 'topic' => $message->topic, // 'data' => (@json_decode($message->data))??$message->data // ])); // } $sseMessage = ""; if ($message->type) { $sseMessage .= "event: ".$message->type."\n"; } $sseMessage .= "data: ".$message->data."\n\n"; foreach ($this->eventClients as $client) { $client->write($sseMessage); } } }