From 8cbd12ee61de65628c85fed3565f48dd4cd99834 Mon Sep 17 00:00:00 2001 From: Christopher Vagnetoft Date: Mon, 11 Mar 2024 14:39:58 +0100 Subject: [PATCH] Subscription enumeration, tweaks --- README.md | 2 ++ src/Broker/SseSubscriber.php | 9 ++++++ src/Broker/Topic.php | 17 ++++++++++++ src/Broker/TopicManager.php | 21 ++++++++++++++ src/Http/Middleware/ApiHandler.php | 44 ++++++++++++++++-------------- src/Http/Server.php | 6 ++-- 6 files changed, 76 insertions(+), 23 deletions(-) diff --git a/README.md b/README.md index 381690f..67b7dba 100644 --- a/README.md +++ b/README.md @@ -53,3 +53,5 @@ $ ./mercureact.phar -c mercureact.conf * [x] Break out HTTP middleware into classes * [ ] HTTP middleware unittests * [ ] Replay missed events based on event id +* [ ] Figure out how to determine last event IDs +* [ ] Metrics endpoint diff --git a/src/Broker/SseSubscriber.php b/src/Broker/SseSubscriber.php index d778e78..68745df 100644 --- a/src/Broker/SseSubscriber.php +++ b/src/Broker/SseSubscriber.php @@ -5,14 +5,18 @@ namespace NoccyLabs\Mercureact\Broker; use NoccyLabs\SimpleJWT\JWTToken; use Psr\Http\Message\ServerRequestInterface; use React\Stream\WritableStreamInterface; +use Symfony\Component\Uid\Uuid; class SseSubscriber implements SubscriberInterface { + private string $id; + public function __construct( private WritableStreamInterface $stream, private ServerRequestInterface $request, ) { + $this->id = (string)Uuid::v7(); } public function deliver(Message $message): void @@ -24,4 +28,9 @@ class SseSubscriber implements SubscriberInterface { return $this->request->getAttribute('authorization') instanceof JWTToken; } + + public function getId(): string + { + return "urn:uuid:".$this->id; + } } \ No newline at end of file diff --git a/src/Broker/Topic.php b/src/Broker/Topic.php index 20564da..4d57e06 100644 --- a/src/Broker/Topic.php +++ b/src/Broker/Topic.php @@ -14,6 +14,8 @@ class Topic /** @var array */ private array $messages = []; + private ?string $lastEventId = null; + /** @var int Creation unixtime */ private int $created; @@ -69,6 +71,21 @@ class Topic $this->subscribers->detach($subscriber); } + public function getSubscribers(): array + { + return iterator_to_array($this->subscribers); + } + + public function getTopic(): string + { + return $this->topic; + } + + public function getLastEventId(): ?string + { + return $this->lastEventId; + } + /** * Garbage collect histry * diff --git a/src/Broker/TopicManager.php b/src/Broker/TopicManager.php index 88b2cfd..924ea3e 100644 --- a/src/Broker/TopicManager.php +++ b/src/Broker/TopicManager.php @@ -53,6 +53,27 @@ class TopicManager } } + public function getSubscriptions(): array + { + $all = []; + + foreach ($this->topics as $topic) { + $subs = $topic->getSubscribers(); + foreach ($subs as $sub) { + $all[] = [ + 'id' => './well-known/mercure/subsciptions/'.urlencode($topic->getTopic())."/".urlencode($sub->getId()), + 'type' => "Subscription", + 'topic' => $topic->getTopic(), + 'subscriber' => $sub->getId(), + 'active' => true, + 'payload' => null, + ]; + } + } + + return $all; + } + public function getTopicCount(): int { return count($this->topics); diff --git a/src/Http/Middleware/ApiHandler.php b/src/Http/Middleware/ApiHandler.php index 587da4d..7bbc1fc 100644 --- a/src/Http/Middleware/ApiHandler.php +++ b/src/Http/Middleware/ApiHandler.php @@ -13,8 +13,6 @@ use React\Promise\PromiseInterface; class ApiHandler { - public static string $indexPage; - public function __construct( private Configuration $config, private TopicManager $topicManager @@ -36,12 +34,32 @@ class ApiHandler $path = $request->getUri()->getPath(); + // FIXME remove this when done debugging if ($path === "/index.html") { - $resolve(Response::html(self::$indexPage)); + $resolve(Response::html( + << + + + + +
+ + + ENDHTML + )); } switch (true) { - case preg_match('<^/.well-known/mercure/subscriptions(/.+?)$>', $path, $m): + case preg_match('<^/.well-known/mercure/subscriptions(/.+?){0,1}$>', $path, $m): $query = explode("/", trim($m[1]??null, "/")); $topic = array_shift($query); $subscription = array_shift($query); @@ -77,14 +95,14 @@ class ApiHandler { // TODO implement once we can enumerate topics and subscriptions - // mock data + $subscriptions = $this->topicManager->getSubscriptions(); $lastEventId = "urn:uuid:5e94c686-2c0b-4f9b-958c-92ccc3bbb4eb"; $data = [ "@context" => "https://mercure.rocks/", "id" => "/.well-known/mercure/subscriptions", "type" => "Subscriptions", "lastEventID" => $lastEventId, - "subscriptions" => [] + "subscriptions" => $subscriptions ]; return Response::json($data) @@ -94,17 +112,3 @@ class ApiHandler } - -ApiHandler::$indexPage = << - - - - - - - -ENDHTML; \ No newline at end of file diff --git a/src/Http/Server.php b/src/Http/Server.php index 8f0a15c..464e6ff 100644 --- a/src/Http/Server.php +++ b/src/Http/Server.php @@ -30,8 +30,8 @@ class Server private SplObjectStorage $webSocketClients; private TopicManager $topicManager; - - private Logger|LoggerInterface $logger; + + private Logger $logger; private ResponseMiddleware $responseMiddleware; private SecurityMiddleware $securityMiddleware; @@ -73,7 +73,7 @@ class Server $this->server->listen($socket); } - private function createLogger(): LoggerInterface + private function createLogger(): Logger { $handlers = [ new StreamHandler(STDOUT)