Implemented subscription logic

This commit is contained in:
Chris 2024-03-10 23:06:00 +01:00
parent 39869d605c
commit 87d47f8ce8
11 changed files with 125 additions and 43 deletions

View File

@ -44,6 +44,16 @@ class Message
return time() - $this->created; return time() - $this->created;
} }
public function toString(): string
{
$msg = [];
if ($this->type) $msg[] = "event: ".$this->type;
if ($this->retry) $msg[] = "retry: ".$this->retry;
if ($this->id) $msg[] = "id: ".$this->id;
if ($this->data) $msg[] = "data: ".$this->data;
return join("\n", $msg)."\n\n";
}
public static function fromData(array $data): Message public static function fromData(array $data): Message
{ {
return new Message( return new Message(

View File

@ -0,0 +1,24 @@
<?php
namespace NoccyLabs\Mercureact\Broker;
use React\Stream\WritableStreamInterface;
class SseSubscriber implements SubscriberInterface
{
public function __construct(
private WritableStreamInterface $stream
)
{
}
public function deliver(Message $message): void
{
$this->stream->write($message->toString());
}
public function isAuthorized(string $topics): bool
{
return true;
}
}

View File

@ -0,0 +1,10 @@
<?php
namespace NoccyLabs\Mercureact\Broker;
interface SubscriberInterface
{
public function deliver(Message $message): void;
public function isAuthorized(string $topics): bool;
}

View File

@ -1,28 +0,0 @@
<?php
namespace NoccyLabs\Mercureact\Broker;
use ArrayIterator;
use Countable;
use IteratorAggregate;
use SplObjectStorage;
use Traversable;
class SubscriptionList implements Countable, IteratorAggregate
{
private array $subscriptions = [];
public function count(): int
{
return count($this->subscriptions);
}
public function getIterator(): Traversable
{
return new ArrayIterator($this->subscriptions);
}
}

View File

@ -10,17 +10,19 @@ class Topic
/** @var string Topic name */ /** @var string Topic name */
private string $topic; private string $topic;
/** @var array<string,Message> */ /** @var array<string,Message> */
private array $messages = []; private array $messages = [];
/** @var int Creation unixtime */ /** @var int Creation unixtime */
private int $created; private int $created;
private SubscriptionList $subscribers; private SplObjectStorage $subscribers;
public function __construct(string $topic) public function __construct(string $topic)
{ {
$this->topic = $topic; $this->topic = $topic;
$this->subscribers = new SubscriptionList(); $this->subscribers = new SplObjectStorage();
$this->created = time(); $this->created = time();
} }
@ -31,8 +33,10 @@ class Topic
foreach ($this->subscribers as $subscriber) { foreach ($this->subscribers as $subscriber) {
if ($message->private === true) { if ($message->private === true) {
// TODO check subscriber access // TODO check subscriber access
$subscriber->deliver($message);
} else { } else {
// TODO deliver to subscriber // TODO deliver to subscriber
$subscriber->deliver($message);
} }
} }
} }
@ -52,6 +56,16 @@ class Topic
return count($this->subscribers); return count($this->subscribers);
} }
public function addSubscriber(SubscriberInterface $subscriber)
{
$this->subscribers->attach($subscriber);
}
public function removeSubscriber(SubscriberInterface $subscriber)
{
$this->subscribers->detach($subscriber);
}
/** /**
* Garbage collect histry * Garbage collect histry
* *

View File

@ -9,6 +9,12 @@ class TopicManager
/** @var array<string,Topic> */ /** @var array<string,Topic> */
private array $topics = []; private array $topics = [];
private SplObjectStorage $subscribers;
public function __construct()
{
$this->subscribers = new SplObjectStorage();
}
public function getTopic(string $topic): Topic public function getTopic(string $topic): Topic
{ {
@ -25,6 +31,26 @@ class TopicManager
} }
} }
public function subscribe(SubscriberInterface $subscriber, array $topics): void
{
foreach ($topics as $topic) {
$this->getTopic($topic)->addSubscriber($subscriber);
}
}
public function unsubscribe(SubscriberInterface $subscriber, ?array $topics=null): void
{
if (!$topics) {
foreach ($this->topics as $topic) {
$topic->removeSubscriber($subscriber);
}
return;
}
foreach ($topics as $topic) {
$this->getTopic($topic)->removeSubscriber($subscriber);
}
}
public function getTopicCount(): int public function getTopicCount(): int
{ {
return count($this->topics); return count($this->topics);
@ -32,7 +58,7 @@ class TopicManager
public function getSubscriberCount(): int public function getSubscriberCount(): int
{ {
return array_sum(array_map(fn($t) => $t->getSubscriberCount(), $this->topics)); return count($this->subscribers);
} }
public function garbageCollect(): void public function garbageCollect(): void

View File

@ -1,6 +1,6 @@
<?php <?php
namespace NoccyLabs\Mercureact\Http\Exeption; namespace NoccyLabs\Mercureact\Http\Exception;
use Exception; use Exception;

View File

@ -1,6 +1,6 @@
<?php <?php
namespace NoccyLabs\Mercureact\Http\Exeption; namespace NoccyLabs\Mercureact\Http\Exception;
use Exception; use Exception;

View File

@ -102,7 +102,7 @@ ApiHandler::$indexPage = <<<ENDHTML
</head> </head>
<body> <body>
<script type="text/javascript"> <script type="text/javascript">
const events = new EventSource("http://127.0.0.1:9000/.well-known/mercure"); const events = new EventSource("http://127.0.0.1:9000/.well-known/mercure?topic=https://example.com/books/1");
events.onmessage = msg => console.log(msg); events.onmessage = msg => console.log(msg);
</script> </script>
</body> </body>

View File

@ -3,10 +3,11 @@
namespace NoccyLabs\Mercureact\Http\Middleware; namespace NoccyLabs\Mercureact\Http\Middleware;
use NoccyLabs\Mercureact\Broker\Message; use NoccyLabs\Mercureact\Broker\Message;
use NoccyLabs\Mercureact\Broker\SseSubscriber;
use NoccyLabs\Mercureact\Broker\TopicManager; use NoccyLabs\Mercureact\Broker\TopicManager;
use NoccyLabs\Mercureact\Configuration; use NoccyLabs\Mercureact\Configuration;
use NoccyLabs\Mercureact\Http\Exeption\RequestException; use NoccyLabs\Mercureact\Http\Exception\RequestException;
use NoccyLabs\Mercureact\Http\Exeption\SecurityException; use NoccyLabs\Mercureact\Http\Exception\SecurityException;
use NoccyLabs\SimpleJWT\JWTToken; use NoccyLabs\SimpleJWT\JWTToken;
use Psr\Http\Message\ResponseInterface; use Psr\Http\Message\ResponseInterface;
use Psr\Http\Message\ServerRequestInterface; use Psr\Http\Message\ServerRequestInterface;
@ -80,9 +81,26 @@ class MercureHandler
body: $responseStream body: $responseStream
); );
$this->eventClients->attach($responseStream, $request); $subscriber = new SseSubscriber($responseStream, $request);
$responseStream->on('close', function () use ($responseStream) {
$this->eventClients->detach($responseStream);; $query = $request->getUri()->getQuery();
$query = explode("&", $query);
$topics = [];
foreach ($query as $param) {
if (!str_contains($param, "="))
throw new RequestException(
message: "Invalid request data",
code: RequestException::ERR_INVALID_REQUEST_DATA
);
[ $name, $value ] = array_map('urldecode', explode("=", $param, 2));
if ($name === 'topic') $topics[] = $value;
// TODO check claims for access
}
$this->topicManager->subscribe($subscriber, $topics);
//$this->eventClients->attach($responseStream, $request);
$responseStream->on('close', function () use ($subscriber, $topics) {
$this->topicManager->unsubscribe($subscriber, $topics);
}); });
return $response return $response
@ -149,8 +167,9 @@ class MercureHandler
return Response::plaintext("urn:uuid:".$message->id."\n"); return Response::plaintext("urn:uuid:".$message->id."\n");
} }
private function checkTopicClaims(string|array $topic, array $claims): bool private function checkTopicClaims(string|array $topic, array $claims, bool $all=false): bool
{ {
// TODO match all topics if $all, reject on mismatch
foreach ((array)$topic as $match) { foreach ((array)$topic as $match) {
foreach ($claims as $claim) { foreach ($claims as $claim) {
if ($claim === "*") return true; if ($claim === "*") return true;
@ -169,6 +188,8 @@ class MercureHandler
*/ */
private function publishMercureMessage(Message $message): void private function publishMercureMessage(Message $message): void
{ {
$this->topicManager->publish($message);
// foreach ($this->webSocketClients as $webSocket) { // foreach ($this->webSocketClients as $webSocket) {
// $webSocket->write(json_encode([ // $webSocket->write(json_encode([
// 'type' => $message->type, // 'type' => $message->type,

View File

@ -3,9 +3,10 @@
namespace NoccyLabs\Mercureact\Http\Middleware; namespace NoccyLabs\Mercureact\Http\Middleware;
use NoccyLabs\Mercureact\Configuration; use NoccyLabs\Mercureact\Configuration;
use NoccyLabs\Mercureact\Http\Exeption\SecurityException; use NoccyLabs\Mercureact\Http\Exception\SecurityException;
use NoccyLabs\SimpleJWT\JWTToken; use NoccyLabs\SimpleJWT\JWTToken;
use NoccyLabs\SimpleJWT\Key\JWTPlaintextKey; use NoccyLabs\SimpleJWT\Key\JWTPlaintextKey;
use NoccyLabs\SimpleJWT\Validator\JWTValidator;
use Psr\Http\Message\ServerRequestInterface; use Psr\Http\Message\ServerRequestInterface;
use React\Promise\Promise; use React\Promise\Promise;
use React\Promise\PromiseInterface; use React\Promise\PromiseInterface;
@ -40,7 +41,9 @@ class SecurityMiddleware
} }
/** /**
* Check authorization and return a new request with added attributes:
* *
* 'authorization' => JWTToken
* *
* @param ServerRequestInterface $request * @param ServerRequestInterface $request
* @return ServerRequestInterface * @return ServerRequestInterface
@ -53,9 +56,11 @@ class SecurityMiddleware
$key = new JWTPlaintextKey($this->config->getJwtSecret()); $key = new JWTPlaintextKey($this->config->getJwtSecret());
$tok = new JWTToken($key, $jwt); $tok = new JWTToken($key, $jwt);
if (!$tok->isValid()) { if (!$tok->isValid()) {
throw new SecurityException(message:"Invalid token", code:SecurityException::ERR_ACCESS_DENIED); throw new SecurityException(
message: "Invalid token",
code: SecurityException::ERR_ACCESS_DENIED
);
} }
$mercureClaims = $tok->claims->get('mercure');
return $request return $request
->withAttribute('authorization', $tok); ->withAttribute('authorization', $tok);
} else { } else {