loop = $loop ?? Loop::get(); $this->seenIdHistorySize = $this->config->getDuplicateIdHistorySize(); } /** * Mecure handler middleware * * @param ServerRequestInterface $request * @param callable $next * @return PromiseInterface */ public function __invoke(ServerRequestInterface $request, callable $next): PromiseInterface { return new Promise( function (callable $resolve, callable $reject) use ($next, $request) { if ($request->getUri()->getPath() == "/.well-known/mercure") { if ($request->getMethod() == 'POST') { $resolve($this->handleMercurePublish($request)); return; } $resolve($this->handleMercureClient($request)); } else { $resolve($next($request)); } } ); } /** * * * @param ServerRequestInterface $request * @return ResponseInterface */ private function handleMercureClient(ServerRequestInterface $request): ResponseInterface { $responseStream = new ThroughStream(); $response = new Response( body: $responseStream ); $subscriber = new SseSubscriber($responseStream, $request); $query = $request->getUri()->getQuery(); $query = explode("&", $query); $topics = []; foreach ($query as $param) { if (!str_contains($param, "=")) throw new RequestException( message: "Invalid request data", code: RequestException::ERR_INVALID_REQUEST_DATA ); [ $name, $value ] = array_map('urldecode', explode("=", $param, 2)); if ($name === 'topic' || $name === 'topic[]') $topics[] = $value; } // Grab the JWT token from the requests authorization attribute if ($request->getAttribute('authorized')) { $claims = $request->getAttribute('mercure.subscribe'); if (!$this->checkTopicClaims($topics, $claims)) { throw new SecurityException( message: "Insufficient permissions for subscribe", code: SecurityException::ERR_NO_PERMISSION ); } } else { // Disallow if we don't allow anonymous subscribers. Note that anonymous // subscribers will not receive updates marked as private. if (!$this->config->getAllowAnonymousSubscribe()) { throw new SecurityException( message: "Anonymous access disallowed", code: SecurityException::ERR_ACCESS_DENIED ); } } $this->topicManager->subscribe($subscriber, $topics); $responseStream->on('close', function () use ($subscriber, $topics) { $this->topicManager->unsubscribe($subscriber, $topics); }); return $response ->withHeader("Cache-Control", "no-store") ->withHeader("Content-Type", "text/event-stream"); } /** * * * @param ServerRequestInterface $request * @return ResponseInterface */ private function handleMercurePublish(ServerRequestInterface $request): ResponseInterface { if ($request->getHeaderLine('content-type') !== 'application/x-www-form-urlencoded') { throw new \Exception("Invalid request"); } // Parse out the urlencoded body. Pretty sure there is a better way to do this? $body = explode("&", (string)$request->getBody()); $data = [ 'topic' => [] ]; foreach ($body as $param) { if (!str_contains($param, "=")) throw new RequestException( message: "Invalid request data", code: RequestException::ERR_INVALID_REQUEST_DATA ); [ $name, $value ] = array_map('urldecode', explode("=", $param, 2)); if ($name === 'topic' || $name === 'topic[]') { $data['topic'][] = $value; } else { $data[$name] = $value; } } // Grab the JWT token from the requests authorization attribute if ($request->getAttribute('authorized')) { $claims = $request->getAttribute('mercure.publish'); // check topic against publishClaims if (!$this->checkTopicClaims($data['topic']??[], $claims)) { throw new SecurityException( message: "Insufficient permissions for publish", code: SecurityException::ERR_NO_PERMISSION ); } } else { // reject if access denied throw new SecurityException( message: "Access denied", code: SecurityException::ERR_ACCESS_DENIED ); } // Explicitly check for duplicate message IDs. if ($this->config->getRejectDuplicateMessages() && !empty($data['id'])) { if (in_array($data['id'], $this->seenMessageIds)) { return Response::plaintext("Duplicate message id")->withStatus(Response::STATUS_BAD_REQUEST); } array_push($this->seenMessageIds, $data['id']); while (count($this->seenMessageIds) > $this->seenIdHistorySize) array_shift($this->seenMessageIds); } // Put an id in there if none already if (empty($data['id']) || $this->config->getOverwriteMessageIds()) { $data['id'] = "urn:uuid:".(string)Uuid::v7(); } // Attempt to create the message $message = Message::fromData($data); $this->loop->futureTick(function () use ($message) { $this->topicManager->publish($message); }); return Response::plaintext($message->id."\n"); } private function checkTopicClaims(string|array $topic, array $claims): bool { $matched = 0; foreach ((array)$topic as $match) { foreach ($claims as $claim) { if (($claim === "*") || ($claim === $match)) { $matched++; break; } // TODO make sure that UriTemplate parsing works if ((new UriTemplate())->extract($claim, $match, true)) { $matched++; break; } } } return ($matched == count($topic)); } }