From 87d47f8ce80937a303a13e316d69986164f30d1f Mon Sep 17 00:00:00 2001 From: Christopher Vagnetoft Date: Sun, 10 Mar 2024 23:06:00 +0100 Subject: [PATCH] Implemented subscription logic --- src/Broker/Message.php | 10 +++++++ src/Broker/SseSubscriber.php | 24 ++++++++++++++++ src/Broker/SubscriberInterface.php | 10 +++++++ src/Broker/SubscriptionList.php | 28 ------------------ src/Broker/Topic.php | 18 ++++++++++-- src/Broker/TopicManager.php | 28 +++++++++++++++++- src/Http/Exception/RequestException.php | 2 +- src/Http/Exception/SecurityException.php | 2 +- src/Http/Middleware/ApiHandler.php | 2 +- src/Http/Middleware/MercureHandler.php | 33 ++++++++++++++++++---- src/Http/Middleware/SecurityMiddleware.php | 11 ++++++-- 11 files changed, 125 insertions(+), 43 deletions(-) create mode 100644 src/Broker/SseSubscriber.php create mode 100644 src/Broker/SubscriberInterface.php delete mode 100644 src/Broker/SubscriptionList.php diff --git a/src/Broker/Message.php b/src/Broker/Message.php index a8a7194..a77fee7 100644 --- a/src/Broker/Message.php +++ b/src/Broker/Message.php @@ -44,6 +44,16 @@ class Message return time() - $this->created; } + public function toString(): string + { + $msg = []; + if ($this->type) $msg[] = "event: ".$this->type; + if ($this->retry) $msg[] = "retry: ".$this->retry; + if ($this->id) $msg[] = "id: ".$this->id; + if ($this->data) $msg[] = "data: ".$this->data; + return join("\n", $msg)."\n\n"; + } + public static function fromData(array $data): Message { return new Message( diff --git a/src/Broker/SseSubscriber.php b/src/Broker/SseSubscriber.php new file mode 100644 index 0000000..51a8fca --- /dev/null +++ b/src/Broker/SseSubscriber.php @@ -0,0 +1,24 @@ +stream->write($message->toString()); + } + + public function isAuthorized(string $topics): bool + { + return true; + } +} \ No newline at end of file diff --git a/src/Broker/SubscriberInterface.php b/src/Broker/SubscriberInterface.php new file mode 100644 index 0000000..54c2322 --- /dev/null +++ b/src/Broker/SubscriberInterface.php @@ -0,0 +1,10 @@ +subscriptions); - } - - public function getIterator(): Traversable - { - return new ArrayIterator($this->subscriptions); - } -} - diff --git a/src/Broker/Topic.php b/src/Broker/Topic.php index 345a0da..2ac2148 100644 --- a/src/Broker/Topic.php +++ b/src/Broker/Topic.php @@ -10,17 +10,19 @@ class Topic /** @var string Topic name */ private string $topic; + /** @var array */ private array $messages = []; + /** @var int Creation unixtime */ private int $created; - private SubscriptionList $subscribers; + private SplObjectStorage $subscribers; public function __construct(string $topic) { $this->topic = $topic; - $this->subscribers = new SubscriptionList(); + $this->subscribers = new SplObjectStorage(); $this->created = time(); } @@ -31,8 +33,10 @@ class Topic foreach ($this->subscribers as $subscriber) { if ($message->private === true) { // TODO check subscriber access + $subscriber->deliver($message); } else { // TODO deliver to subscriber + $subscriber->deliver($message); } } } @@ -52,6 +56,16 @@ class Topic return count($this->subscribers); } + public function addSubscriber(SubscriberInterface $subscriber) + { + $this->subscribers->attach($subscriber); + } + + public function removeSubscriber(SubscriberInterface $subscriber) + { + $this->subscribers->detach($subscriber); + } + /** * Garbage collect histry * diff --git a/src/Broker/TopicManager.php b/src/Broker/TopicManager.php index b5e77ce..7cad74f 100644 --- a/src/Broker/TopicManager.php +++ b/src/Broker/TopicManager.php @@ -9,6 +9,12 @@ class TopicManager /** @var array */ private array $topics = []; + private SplObjectStorage $subscribers; + + public function __construct() + { + $this->subscribers = new SplObjectStorage(); + } public function getTopic(string $topic): Topic { @@ -25,6 +31,26 @@ class TopicManager } } + public function subscribe(SubscriberInterface $subscriber, array $topics): void + { + foreach ($topics as $topic) { + $this->getTopic($topic)->addSubscriber($subscriber); + } + } + + public function unsubscribe(SubscriberInterface $subscriber, ?array $topics=null): void + { + if (!$topics) { + foreach ($this->topics as $topic) { + $topic->removeSubscriber($subscriber); + } + return; + } + foreach ($topics as $topic) { + $this->getTopic($topic)->removeSubscriber($subscriber); + } + } + public function getTopicCount(): int { return count($this->topics); @@ -32,7 +58,7 @@ class TopicManager public function getSubscriberCount(): int { - return array_sum(array_map(fn($t) => $t->getSubscriberCount(), $this->topics)); + return count($this->subscribers); } public function garbageCollect(): void diff --git a/src/Http/Exception/RequestException.php b/src/Http/Exception/RequestException.php index 1174849..0037f06 100644 --- a/src/Http/Exception/RequestException.php +++ b/src/Http/Exception/RequestException.php @@ -1,6 +1,6 @@ diff --git a/src/Http/Middleware/MercureHandler.php b/src/Http/Middleware/MercureHandler.php index ad0e89b..6ad4fc6 100644 --- a/src/Http/Middleware/MercureHandler.php +++ b/src/Http/Middleware/MercureHandler.php @@ -3,10 +3,11 @@ namespace NoccyLabs\Mercureact\Http\Middleware; use NoccyLabs\Mercureact\Broker\Message; +use NoccyLabs\Mercureact\Broker\SseSubscriber; use NoccyLabs\Mercureact\Broker\TopicManager; use NoccyLabs\Mercureact\Configuration; -use NoccyLabs\Mercureact\Http\Exeption\RequestException; -use NoccyLabs\Mercureact\Http\Exeption\SecurityException; +use NoccyLabs\Mercureact\Http\Exception\RequestException; +use NoccyLabs\Mercureact\Http\Exception\SecurityException; use NoccyLabs\SimpleJWT\JWTToken; use Psr\Http\Message\ResponseInterface; use Psr\Http\Message\ServerRequestInterface; @@ -80,9 +81,26 @@ class MercureHandler body: $responseStream ); - $this->eventClients->attach($responseStream, $request); - $responseStream->on('close', function () use ($responseStream) { - $this->eventClients->detach($responseStream);; + $subscriber = new SseSubscriber($responseStream, $request); + + $query = $request->getUri()->getQuery(); + $query = explode("&", $query); + $topics = []; + foreach ($query as $param) { + if (!str_contains($param, "=")) + throw new RequestException( + message: "Invalid request data", + code: RequestException::ERR_INVALID_REQUEST_DATA + ); + [ $name, $value ] = array_map('urldecode', explode("=", $param, 2)); + if ($name === 'topic') $topics[] = $value; + // TODO check claims for access + } + + $this->topicManager->subscribe($subscriber, $topics); + //$this->eventClients->attach($responseStream, $request); + $responseStream->on('close', function () use ($subscriber, $topics) { + $this->topicManager->unsubscribe($subscriber, $topics); }); return $response @@ -149,8 +167,9 @@ class MercureHandler return Response::plaintext("urn:uuid:".$message->id."\n"); } - private function checkTopicClaims(string|array $topic, array $claims): bool + private function checkTopicClaims(string|array $topic, array $claims, bool $all=false): bool { + // TODO match all topics if $all, reject on mismatch foreach ((array)$topic as $match) { foreach ($claims as $claim) { if ($claim === "*") return true; @@ -169,6 +188,8 @@ class MercureHandler */ private function publishMercureMessage(Message $message): void { + $this->topicManager->publish($message); + // foreach ($this->webSocketClients as $webSocket) { // $webSocket->write(json_encode([ // 'type' => $message->type, diff --git a/src/Http/Middleware/SecurityMiddleware.php b/src/Http/Middleware/SecurityMiddleware.php index 88737d7..1fe1cb5 100644 --- a/src/Http/Middleware/SecurityMiddleware.php +++ b/src/Http/Middleware/SecurityMiddleware.php @@ -3,9 +3,10 @@ namespace NoccyLabs\Mercureact\Http\Middleware; use NoccyLabs\Mercureact\Configuration; -use NoccyLabs\Mercureact\Http\Exeption\SecurityException; +use NoccyLabs\Mercureact\Http\Exception\SecurityException; use NoccyLabs\SimpleJWT\JWTToken; use NoccyLabs\SimpleJWT\Key\JWTPlaintextKey; +use NoccyLabs\SimpleJWT\Validator\JWTValidator; use Psr\Http\Message\ServerRequestInterface; use React\Promise\Promise; use React\Promise\PromiseInterface; @@ -40,7 +41,9 @@ class SecurityMiddleware } /** + * Check authorization and return a new request with added attributes: * + * 'authorization' => JWTToken * * @param ServerRequestInterface $request * @return ServerRequestInterface @@ -53,9 +56,11 @@ class SecurityMiddleware $key = new JWTPlaintextKey($this->config->getJwtSecret()); $tok = new JWTToken($key, $jwt); if (!$tok->isValid()) { - throw new SecurityException(message:"Invalid token", code:SecurityException::ERR_ACCESS_DENIED); + throw new SecurityException( + message: "Invalid token", + code: SecurityException::ERR_ACCESS_DENIED + ); } - $mercureClaims = $tok->claims->get('mercure'); return $request ->withAttribute('authorization', $tok); } else {