From 39869d605c7262d14b12930f5431ea005b2b64bd Mon Sep 17 00:00:00 2001 From: Christopher Vagnetoft Date: Sun, 10 Mar 2024 20:22:28 +0100 Subject: [PATCH] Extracted middleware and handlers --- src/Broker/SubscriptionList.php | 10 +- src/Http/Middleware/ApiHandler.php | 110 +++++ src/Http/Middleware/MercureHandler.php | 191 +++++++++ src/Http/Middleware/NotFoundHandler.php | 27 ++ src/Http/Middleware/ResponseMiddleware.php | 80 ++++ src/Http/Middleware/SecurityMiddleware.php | 68 ++++ src/Http/Middleware/WebSocketHandler.php | 71 ++++ src/Http/Server.php | 451 ++------------------- 8 files changed, 592 insertions(+), 416 deletions(-) create mode 100644 src/Http/Middleware/ApiHandler.php create mode 100644 src/Http/Middleware/MercureHandler.php create mode 100644 src/Http/Middleware/NotFoundHandler.php create mode 100644 src/Http/Middleware/ResponseMiddleware.php create mode 100644 src/Http/Middleware/SecurityMiddleware.php create mode 100644 src/Http/Middleware/WebSocketHandler.php diff --git a/src/Broker/SubscriptionList.php b/src/Broker/SubscriptionList.php index c9409bb..a1ba88a 100644 --- a/src/Broker/SubscriptionList.php +++ b/src/Broker/SubscriptionList.php @@ -2,10 +2,13 @@ namespace NoccyLabs\Mercureact\Broker; +use ArrayIterator; use Countable; +use IteratorAggregate; use SplObjectStorage; +use Traversable; -class SubscriptionList implements Countable +class SubscriptionList implements Countable, IteratorAggregate { private array $subscriptions = []; @@ -16,5 +19,10 @@ class SubscriptionList implements Countable { return count($this->subscriptions); } + + public function getIterator(): Traversable + { + return new ArrayIterator($this->subscriptions); + } } diff --git a/src/Http/Middleware/ApiHandler.php b/src/Http/Middleware/ApiHandler.php new file mode 100644 index 0000000..adff4b3 --- /dev/null +++ b/src/Http/Middleware/ApiHandler.php @@ -0,0 +1,110 @@ +getUri()->getPath(); + + if ($path === "/index.html") { + $resolve(Response::html(self::$indexPage)); + } + + switch (true) { + case preg_match('<^/.well-known/mercure/subscriptions(/.+?)$>', $path, $m): + $query = explode("/", trim($m[1]??null, "/")); + $topic = array_shift($query); + $subscription = array_shift($query); + $resolve($this->apiGetSubscriptions($topic, $subscription)); + return; + + case preg_match('<^/.well-known/mercureact/status$>', $path): + $resolve([ + 'server' => 'Mercureact/1.0', + 'topics' => $this->topicManager->getTopicCount(), + 'subscriptions' => $this->topicManager->getSubscriberCount(), + 'memoryPeak' => memory_get_peak_usage(true), + 'memoryUsage' => memory_get_usage(true) + ]); + return; + + case preg_match('<^/.well-known/mercureact/status$>', $path): + $resolve([ 'version' => '1.0' ]); + return; + } + + $resolve($next($request)); + } + ); + } + + /** + * + * + * @return ResponseInterface + */ + private function apiGetSubscriptions(string|null $topic, string|null $subscription): ResponseInterface + { + // TODO implement once we can enumerate topics and subscriptions + + // mock data + $lastEventId = "urn:uuid:5e94c686-2c0b-4f9b-958c-92ccc3bbb4eb"; + $data = [ + "@context" => "https://mercure.rocks/", + "id" => "/.well-known/mercure/subscriptions", + "type" => "Subscriptions", + "lastEventID" => $lastEventId, + "subscriptions" => [] + ]; + + return Response::json($data) + ->withHeader('Content-Type', 'application/ld+json') + ->withHeader('ETag', $lastEventId); + } + +} + + +ApiHandler::$indexPage = << + + + + + + + +ENDHTML; \ No newline at end of file diff --git a/src/Http/Middleware/MercureHandler.php b/src/Http/Middleware/MercureHandler.php new file mode 100644 index 0000000..ad0e89b --- /dev/null +++ b/src/Http/Middleware/MercureHandler.php @@ -0,0 +1,191 @@ +loop = $loop ?? Loop::get(); + } + + /** + * Mecure handler middleware + * + * @param ServerRequestInterface $request + * @param callable $next + * @return PromiseInterface + */ + public function __invoke(ServerRequestInterface $request, callable $next): PromiseInterface + { + return new Promise( + function (callable $resolve, callable $reject) use ($next, $request) { + if ($request->getUri()->getPath() == "/.well-known/mercure") { + if ($request->getMethod() == 'POST') { + $resolve($this->handleMercurePublish($request)); + return; + } + $resolve($this->handleMercureClient($request)); + } else { + $resolve($next($request)); + } + } + ); + } + + /** + * + * + * @param ServerRequestInterface $request + * @return ResponseInterface + */ + private function handleMercureClient(ServerRequestInterface $request): ResponseInterface + { + $tok = $request->getAttribute('authorization'); + if ($tok instanceof JWTToken) { + $claims = $tok->claims->getAll(); + if (isset($claims['mercure']['subscribe'])) { + $subscribeClaims = $claims['mercure']['subscribe']; + // TODO check topic against subscribeClaims + } + } + + $responseStream = new ThroughStream(); + + $response = new Response( + body: $responseStream + ); + + $this->eventClients->attach($responseStream, $request); + $responseStream->on('close', function () use ($responseStream) { + $this->eventClients->detach($responseStream);; + }); + + return $response + ->withHeader("Cache-Control", "no-store") + ->withHeader("Content-Type", "text/event-stream"); + } + + /** + * + * + * @param ServerRequestInterface $request + * @return ResponseInterface + */ + private function handleMercurePublish(ServerRequestInterface $request): ResponseInterface + { + if ($request->getHeaderLine('content-type') !== 'application/x-www-form-urlencoded') { + throw new \Exception("Invalid request"); + } + + // Parse out the urlencoded body. Pretty sure there is a better way to do this? + $body = explode("&", (string)$request->getBody()); + $data = []; + foreach ($body as $param) { + if (!str_contains($param, "=")) + throw new RequestException("Invalid request data", RequestException::ERR_INVALID_REQUEST_DATA); + [ $name, $value ] = array_map('urldecode', explode("=", $param, 2)); + if (in_array($name, [ 'topic' ])) { + if (!isset($data[$name])) + $data[$name] = []; + $data[$name][] = $value; + } else { + $data[$name] = $value; + } + } + + // Grab the JWT token from the requests authorization attribute + $tok = $request->getAttribute('authorization'); + if ($tok instanceof JWTToken) { + $claims = $tok->claims->getAll(); + if (isset($claims['mercure']['publish'])) { + $publishClaims = $claims['mercure']['publish']; + // TODO check topic against publishClaims + if (!$this->checkTopicClaims($data['topic']??[], $publishClaims)) { + throw new SecurityException("Insufficient permissions for publish", SecurityException::ERR_NO_PERMISSION); + } + } + } else { + // FIXME reject if access denied + } + + // Put an id in there if none already + // TODO add a configurable for this + if (!isset($data['id'])) { + $data['id'] = (string)Uuid::v7(); + } + + // Attempt to create the message + $message = Message::fromData($data); + + $this->loop->futureTick(function () use ($message) { + $this->publishMercureMessage($message); + }); + + return Response::plaintext("urn:uuid:".$message->id."\n"); + } + + private function checkTopicClaims(string|array $topic, array $claims): bool + { + foreach ((array)$topic as $match) { + foreach ($claims as $claim) { + if ($claim === "*") return true; + if ($claim === $match) return true; + // TODO implement full matching + } + } + return false; + } + + /** + * + * + * @param Message $message + * @return void + */ + private function publishMercureMessage(Message $message): void + { + // foreach ($this->webSocketClients as $webSocket) { + // $webSocket->write(json_encode([ + // 'type' => $message->type, + // 'topic' => $message->topic, + // 'data' => (@json_decode($message->data))??$message->data + // ])); + // } + + $sseMessage = ""; + if ($message->type) { + $sseMessage .= "event: ".$message->type."\n"; + } + $sseMessage .= "data: ".$message->data."\n\n"; + + foreach ($this->eventClients as $client) { + $client->write($sseMessage); + } + } + +} \ No newline at end of file diff --git a/src/Http/Middleware/NotFoundHandler.php b/src/Http/Middleware/NotFoundHandler.php new file mode 100644 index 0000000..1a706fd --- /dev/null +++ b/src/Http/Middleware/NotFoundHandler.php @@ -0,0 +1,27 @@ +withStatus(404)); + } + ); + } + +} \ No newline at end of file diff --git a/src/Http/Middleware/ResponseMiddleware.php b/src/Http/Middleware/ResponseMiddleware.php new file mode 100644 index 0000000..d4d3b0c --- /dev/null +++ b/src/Http/Middleware/ResponseMiddleware.php @@ -0,0 +1,80 @@ +then( + function ($response) { + if ($response instanceof ResponseInterface) { + return $response; + } + if (is_array($response)) { + return Response::json($response); + } + if (is_string($response)) { + return Response::plaintext($response); + } + return Response::plaintext((string)$response); + }, + function (Throwable $t) { + if ($t instanceof SecurityException) { + return Response::plaintext("Access Denied")->withStatus(Response::STATUS_UNAUTHORIZED); + } + return Response::plaintext("500: Internal Server Error (".$t->getMessage().")\n")->withStatus(500); + } + )->then( + function ($response) use ($request) { + assert("\$response instanceof ResponseInterface"); + $host = ($request->getServerParams()['SERVER_ADDR']??""); + //. ":" . ($request->getServerParams()['SERVER_PORT']??"80"); + fprintf(STDOUT, "%s %3d %s %s %d\n", + $request->getServerParams()['REMOTE_ADDR'], + $response->getStatusCode(), + $request->getMethod(), + $request->getUri()->getPath(), + strlen($response->getBody()) + ); + return $response + ->withAddedHeader('Link', '; rel="mercure"') + ->withAddedHeader('Link', '; rel="mercure+ws"') + ->withHeader('Access-Control-Allow-Origin', '*') + ->withHeader('Content-Security-Policy', "default-src * 'self' http: 'unsafe-eval' 'unsafe-inline'; connect-src * 'self'") + ->withHeader('Cache-Control', 'must-revalidate') + ->withHeader('Server', 'Mercureact/0.1.0'); + } + ); + } +} \ No newline at end of file diff --git a/src/Http/Middleware/SecurityMiddleware.php b/src/Http/Middleware/SecurityMiddleware.php new file mode 100644 index 0000000..88737d7 --- /dev/null +++ b/src/Http/Middleware/SecurityMiddleware.php @@ -0,0 +1,68 @@ +checkAuthorization($request); + + $resolve($next($request)); + } + ); + } + + /** + * + * + * @param ServerRequestInterface $request + * @return ServerRequestInterface + */ + private function checkAuthorization(ServerRequestInterface $request): ServerRequestInterface + { + $authorization = $request->getHeaderLine('authorization'); + if (str_starts_with(strtolower($authorization), "bearer ")) { + $jwt = substr($authorization, strpos($authorization, " ")+1); + $key = new JWTPlaintextKey($this->config->getJwtSecret()); + $tok = new JWTToken($key, $jwt); + if (!$tok->isValid()) { + throw new SecurityException(message:"Invalid token", code:SecurityException::ERR_ACCESS_DENIED); + } + $mercureClaims = $tok->claims->get('mercure'); + return $request + ->withAttribute('authorization', $tok); + } else { + return $request + ->withAttribute('authorization', null); + + } + } + +} \ No newline at end of file diff --git a/src/Http/Middleware/WebSocketHandler.php b/src/Http/Middleware/WebSocketHandler.php new file mode 100644 index 0000000..e9d0053 --- /dev/null +++ b/src/Http/Middleware/WebSocketHandler.php @@ -0,0 +1,71 @@ +loop = $loop ?? Loop::get(); + + $this->webSocket = new WebSocketMiddleware(); + $this->webSocket->on(WebSocketMiddleware::EVENT_CONNECTION, $this->onWebSocketConnection(...)); + } + + /** + * + * + * @param ServerRequestInterface $request + * @param callable $next + * @return PromiseInterface + */ + public function __invoke(ServerRequestInterface $request, callable $next): PromiseInterface + { + return new Promise( + function (callable $resolve, callable $reject) use ($next, $request) { + if ($request->getUri()->getPath() == "/.well-known/mercure") + $resolve(call_user_func($this->webSocket, $request, $next)); + else + $resolve($next($request)); + } + ); + } + + /** + * + * + */ + private function onWebSocketConnection(WebSocketConnection $connection) + { + $this->webSocketClients->attach($connection); + + $connection->on('close', function () use ($connection) { + $this->webSocketClients->detach($connection); + }); + + $request = $connection->getServerRequest(); + $topic = $request->getQueryParams()['topic'][0]??''; + $connection->setGroup($topic); + } + +} \ No newline at end of file diff --git a/src/Http/Server.php b/src/Http/Server.php index b598a00..027a2f8 100644 --- a/src/Http/Server.php +++ b/src/Http/Server.php @@ -2,28 +2,19 @@ namespace NoccyLabs\Mercureact\Http; -use NoccyLabs\Mercureact\Broker\Message; use NoccyLabs\Mercureact\Broker\TopicManager; use NoccyLabs\Mercureact\Configuration; -use NoccyLabs\Mercureact\Http\Exeption\RequestException; -use NoccyLabs\Mercureact\Http\Exeption\SecurityException; -use NoccyLabs\React\WebSocket\WebSocketConnection; -use NoccyLabs\React\WebSocket\WebSocketMiddleware; -use NoccyLabs\SimpleJWT\JWTToken; -use NoccyLabs\SimpleJWT\Key\JWTPlaintextKey; -use Psr\Http\Message\ResponseInterface; -use Psr\Http\Message\ServerRequestInterface; +use NoccyLabs\Mercureact\Http\Middleware\ApiHandler; +use NoccyLabs\Mercureact\Http\Middleware\MercureHandler; +use NoccyLabs\Mercureact\Http\Middleware\NotFoundHandler; +use NoccyLabs\Mercureact\Http\Middleware\ResponseMiddleware; +use NoccyLabs\Mercureact\Http\Middleware\SecurityMiddleware; +use NoccyLabs\Mercureact\Http\Middleware\WebSocketHandler; use React\EventLoop\Loop; use React\EventLoop\LoopInterface; use React\Http\HttpServer; -use React\Http\Message\Response; -use React\Promise\Promise; -use React\Promise\PromiseInterface; use React\Socket\ServerInterface; -use React\Stream\ThroughStream; use SplObjectStorage; -use Symfony\Component\Uid\Uuid; -use Throwable; class Server { @@ -33,15 +24,19 @@ class Server private HttpServer $server; - private WebSocketMiddleware $webSocket; - private SplObjectStorage $webSocketClients; private SplObjectStorage $eventClients; private TopicManager $topicManager; - public static string $indexPage; + private ResponseMiddleware $responseMiddleware; + private SecurityMiddleware $securityMiddleware; + private WebSocketHandler $webSocketHandler; + private MercureHandler $mercureHandler; + private ApiHandler $apiRequestHandler; + private NotFoundHandler $notFoundHandler; + /** * @@ -54,14 +49,11 @@ class Server $this->config = $config; $this->topicManager = new TopicManager(); - - $this->server = $this->createHttpServer($options); - $this->webSocket = new WebSocketMiddleware(); - $this->eventClients = new SplObjectStorage(); $this->webSocketClients = new SplObjectStorage(); - $this->webSocket->on(WebSocketMiddleware::EVENT_CONNECTION, $this->onWebSocketConnection(...)); + + $this->server = $this->createHttpServer($options); } /** @@ -80,401 +72,30 @@ class Server */ private function createHttpServer(array $options): HttpServer { - // TODO break out the middleware to facilitate testing return new HttpServer( - $this->rejectionWrappingMiddleware(...), - $this->checkRequestSecurityMiddleware(...), - $this->handleWebSocketRequest(...), - $this->handleMercureRequest(...), - $this->handleApiRequest(...), - $this->handleNotFound(...) + $this->responseMiddleware = new ResponseMiddleware( + config: $this->config + ), + $this->securityMiddleware = new SecurityMiddleware( + config: $this->config + ), + $this->webSocketHandler = new WebSocketHandler( + config: $this->config, + webSocketClients: $this->webSocketClients, + topicManager: $this->topicManager + ), + $this->mercureHandler = new MercureHandler( + config: $this->config, + eventClients: $this->eventClients, + topicManager: $this->topicManager + ), + $this->apiRequestHandler = new ApiHandler( + config: $this->config, + topicManager: $this->topicManager + ), + $this->notFoundHandler = new NotFoundHandler() ); } - /** - * Resolves unhandled requests with a 404 error - * - * @param ServerRequestInterface $request - * @return PromiseInterface - */ - private function handleNotFound(ServerRequestInterface $request): PromiseInterface - { - return new Promise( - function ($resolve) { - $resolve(Response::plaintext("Not found")->withStatus(404)); - } - ); - } - /** - * Wraps rejections into error messages, and also does some sanity checks on the returned - * data, making sure it is a response. - * - * @param ServerRequestInterface $request - * @param callable $next - * @return PromiseInterface - */ - private function rejectionWrappingMiddleware(ServerRequestInterface $request, callable $next): PromiseInterface - { - $promise = new Promise( - function (callable $resolve) use ($request, $next) { - $resolve($next($request)); - } - ); - return $promise->then( - function ($response) { - if ($response instanceof ResponseInterface) { - return $response; - } - if (is_array($response)) { - return Response::json($response); - } - if (is_string($response)) { - return Response::plaintext($response); - } - return Response::plaintext((string)$response); - }, - function (Throwable $t) { - if ($t instanceof SecurityException) { - return Response::plaintext("Access Denied")->withStatus(Response::STATUS_UNAUTHORIZED); - } - return Response::plaintext("500: Internal Server Error (".$t->getMessage().")\n")->withStatus(500); - } - )->then( - function ($response) use ($request) { - assert("\$response instanceof ResponseInterface"); - $host = ($request->getServerParams()['SERVER_ADDR']??""); - //. ":" . ($request->getServerParams()['SERVER_PORT']??"80"); - fprintf(STDOUT, "%s %3d %s %s %d\n", - $request->getServerParams()['REMOTE_ADDR'], - $response->getStatusCode(), - $request->getMethod(), - $request->getUri()->getPath(), - strlen($response->getBody()) - ); - return $response - ->withAddedHeader('Link', '; rel="mercure"') - ->withAddedHeader('Link', '; rel="mercure+ws"') - ->withHeader('Access-Control-Allow-Origin', '*') - ->withHeader('Content-Security-Policy', "default-src * 'self' http: 'unsafe-eval' 'unsafe-inline'; connect-src * 'self'") - ->withHeader('Cache-Control', 'must-revalidate') - ->withHeader('Server', 'Mercureact/0.1.0'); - } - ); - } - - /** - * - * - * @param ServerRequestInterface $request - * @param callable $next - * @return PromiseInterface - */ - private function checkRequestSecurityMiddleware(ServerRequestInterface $request, callable $next): PromiseInterface - { - return new Promise( - function (callable $resolve, callable $reject) use ($request, $next) { - // Check JWT in authorization header or authorization query param - $request = $this->checkAuthorization($request); - - $resolve($next($request)); - } - ); - } - - /** - * - * - * @param ServerRequestInterface $request - * @return ServerRequestInterface - */ - private function checkAuthorization(ServerRequestInterface $request): ServerRequestInterface - { - $authorization = $request->getHeaderLine('authorization'); - if (str_starts_with(strtolower($authorization), "bearer ")) { - $jwt = substr($authorization, strpos($authorization, " ")+1); - $key = new JWTPlaintextKey($this->config->getJwtSecret()); - $tok = new JWTToken($key, $jwt); - if (!$tok->isValid()) { - throw new SecurityException(message:"Invalid token", code:SecurityException::ERR_ACCESS_DENIED); - } - $mercureClaims = $tok->claims->get('mercure'); - return $request - ->withAttribute('authorization', $tok); - } else { - return $request - ->withAttribute('authorization', null); - - } - } - - /** - * - * - * @param ServerRequestInterface $request - * @param callable $next - * @return PromiseInterface - */ - private function handleMercureRequest(ServerRequestInterface $request, callable $next): PromiseInterface - { - return new Promise( - function (callable $resolve, callable $reject) use ($next, $request) { - if ($request->getUri()->getPath() == "/.well-known/mercure") { - if ($request->getMethod() == 'POST') { - $resolve($this->handleMercurePublish($request)); - return; - } - $resolve($this->handleMercureClient($request)); - } else { - $resolve($next($request)); - } - } - ); - } - - /** - * - * - * @param ServerRequestInterface $request - * @return ResponseInterface - */ - private function handleMercureClient(ServerRequestInterface $request): ResponseInterface - { - $tok = $request->getAttribute('authorization'); - if ($tok instanceof JWTToken) { - $claims = $tok->claims->getAll(); - if (isset($claims['mercure']['subscribe'])) { - $subscribeClaims = $claims['mercure']['subscribe']; - // TODO check topic against subscribeClaims - } - } - - $responseStream = new ThroughStream(); - - $response = new Response( - body: $responseStream - ); - - $this->eventClients->attach($responseStream, $request); - $responseStream->on('close', function () use ($responseStream) { - $this->eventClients->detach($responseStream);; - }); - - return $response - ->withHeader("Cache-Control", "no-store") - ->withHeader("Content-Type", "text/event-stream"); - } - - /** - * - * - * @param ServerRequestInterface $request - * @return ResponseInterface - */ - private function handleMercurePublish(ServerRequestInterface $request): ResponseInterface - { - if ($request->getHeaderLine('content-type') !== 'application/x-www-form-urlencoded') { - throw new \Exception("Invalid request"); - } - - // Parse out the urlencoded body. Pretty sure there is a better way to do this? - $body = explode("&", (string)$request->getBody()); - $data = []; - foreach ($body as $param) { - if (!str_contains($param, "=")) - throw new RequestException("Invalid request data", RequestException::ERR_INVALID_REQUEST_DATA); - [ $name, $value ] = array_map('urldecode', explode("=", $param, 2)); - if (in_array($name, [ 'topic' ])) { - if (!isset($data[$name])) - $data[$name] = []; - $data[$name][] = $value; - } else { - $data[$name] = $value; - } - } - - // Grab the JWT token from the requests authorization attribute - $tok = $request->getAttribute('authorization'); - if ($tok instanceof JWTToken) { - $claims = $tok->claims->getAll(); - if (isset($claims['mercure']['publish'])) { - $publishClaims = $claims['mercure']['publish']; - // TODO check topic against publishClaims - if (!$this->checkTopicClaims($data['topic']??[], $publishClaims)) { - throw new SecurityException("Insufficient permissions for publish", SecurityException::ERR_NO_PERMISSION); - } - } - } - - // Put an id in there if none already - // TODO add a configurable for this - if (!isset($data['id'])) { - $data['id'] = (string)Uuid::v7(); - } - - $message = Message::fromData($data); - - $this->loop->futureTick(function () use ($message) { - $this->publishMercureMessage($message); - }); - - return Response::plaintext("urn:uuid:".$message->id."\n"); - } - - private function checkTopicClaims(string|array $topic, array $claims): bool - { - foreach ((array)$topic as $match) { - foreach ($claims as $claim) { - if ($claim === "*") return true; - if ($claim === $match) return true; - // TODO implement full matching - } - } - return false; - } - - /** - * - * - * @param Message $message - * @return void - */ - private function publishMercureMessage(Message $message): void - { - foreach ($this->webSocketClients as $webSocket) { - $webSocket->write(json_encode([ - 'type' => $message->type, - 'topic' => $message->topic, - 'data' => (@json_decode($message->data))??$message->data - ])); - } - - $sseMessage = ""; - if ($message->type) { - $sseMessage .= "event: ".$message->type."\n"; - } - $sseMessage .= "data: ".$message->data."\n\n"; - - foreach ($this->eventClients as $client) { - $client->write($sseMessage); - } - } - - /** - * - * - * @param ServerRequestInterface $request - * @param callable $next - * @return PromiseInterface - */ - private function handleWebSocketRequest(ServerRequestInterface $request, callable $next): PromiseInterface - { - return new Promise( - function (callable $resolve, callable $reject) use ($next, $request) { - if ($request->getUri()->getPath() == "/.well-known/mercure") - $resolve(call_user_func($this->webSocket, $request, $next)); - else - $resolve($next($request)); - } - ); - } - - /** - * - * - */ - private function onWebSocketConnection(WebSocketConnection $connection) - { - $this->webSocketClients->attach($connection); - - $connection->on('close', function () use ($connection) { - $this->webSocketClients->detach($connection); - }); - - $request = $connection->getServerRequest(); - $topic = $request->getQueryParams()['topic'][0]??''; - $connection->setGroup($topic); - } - - /** - * - * - * @param ServerRequestInterface $request - * @param callable $next - * @return PromiseInterface - */ - private function handleApiRequest(ServerRequestInterface $request, callable $next): PromiseInterface - { - return new Promise( - function (callable $resolve, callable $reject) use ($next, $request) { - - $path = $request->getUri()->getPath(); - - if ($path === "/index.html") { - $resolve(Response::html(self::$indexPage)); - } - - switch (true) { - case preg_match('<^/.well-known/mercure/subscriptions(/.+?)$>', $path, $m): - $query = explode("/", trim($m[1]??null, "/")); - $topic = array_shift($query); - $subscription = array_shift($query); - $resolve($this->apiGetSubscriptions($topic, $subscription)); - return; - - case preg_match('<^/.well-known/mercureact/status$>', $path): - $resolve([ - 'server' => 'Mercureact/1.0', - 'topics' => $this->topicManager->getTopicCount(), - 'subscriptions' => $this->topicManager->getSubscriberCount(), - 'memoryPeak' => memory_get_peak_usage(true), - 'memoryUsage' => memory_get_usage(true) - ]); - return; - - case preg_match('<^/.well-known/mercureact/status$>', $path): - $resolve([ 'version' => '1.0' ]); - return; - } - - $resolve($next($request)); - } - ); - } - - /** - * - * - * @return ResponseInterface - */ - private function apiGetSubscriptions(string|null $topic, string|null $subscription): ResponseInterface - { - $lastEventId = "urn:uuid:5e94c686-2c0b-4f9b-958c-92ccc3bbb4eb"; - - $data = [ - "@context" => "https://mercure.rocks/", - "id" => "/.well-known/mercure/subscriptions", - "type" => "Subscriptions", - "lastEventID" => $lastEventId, - "subscriptions" => [] - ]; - - return Response::json($data) - ->withHeader('Content-Type', 'application/ld+json') - ->withHeader('ETag', $lastEventId); - } } - -Server::$indexPage = << - - - - - - - -ENDHTML; \ No newline at end of file