Subscription enumeration, tweaks

This commit is contained in:
Chris 2024-03-11 14:39:58 +01:00
parent 2747b59abc
commit 8cbd12ee61
6 changed files with 76 additions and 23 deletions

View File

@ -53,3 +53,5 @@ $ ./mercureact.phar -c mercureact.conf
* [x] Break out HTTP middleware into classes * [x] Break out HTTP middleware into classes
* [ ] HTTP middleware unittests * [ ] HTTP middleware unittests
* [ ] Replay missed events based on event id * [ ] Replay missed events based on event id
* [ ] Figure out how to determine last event IDs
* [ ] Metrics endpoint

View File

@ -5,14 +5,18 @@ namespace NoccyLabs\Mercureact\Broker;
use NoccyLabs\SimpleJWT\JWTToken; use NoccyLabs\SimpleJWT\JWTToken;
use Psr\Http\Message\ServerRequestInterface; use Psr\Http\Message\ServerRequestInterface;
use React\Stream\WritableStreamInterface; use React\Stream\WritableStreamInterface;
use Symfony\Component\Uid\Uuid;
class SseSubscriber implements SubscriberInterface class SseSubscriber implements SubscriberInterface
{ {
private string $id;
public function __construct( public function __construct(
private WritableStreamInterface $stream, private WritableStreamInterface $stream,
private ServerRequestInterface $request, private ServerRequestInterface $request,
) )
{ {
$this->id = (string)Uuid::v7();
} }
public function deliver(Message $message): void public function deliver(Message $message): void
@ -24,4 +28,9 @@ class SseSubscriber implements SubscriberInterface
{ {
return $this->request->getAttribute('authorization') instanceof JWTToken; return $this->request->getAttribute('authorization') instanceof JWTToken;
} }
public function getId(): string
{
return "urn:uuid:".$this->id;
}
} }

View File

@ -14,6 +14,8 @@ class Topic
/** @var array<string,Message> */ /** @var array<string,Message> */
private array $messages = []; private array $messages = [];
private ?string $lastEventId = null;
/** @var int Creation unixtime */ /** @var int Creation unixtime */
private int $created; private int $created;
@ -69,6 +71,21 @@ class Topic
$this->subscribers->detach($subscriber); $this->subscribers->detach($subscriber);
} }
public function getSubscribers(): array
{
return iterator_to_array($this->subscribers);
}
public function getTopic(): string
{
return $this->topic;
}
public function getLastEventId(): ?string
{
return $this->lastEventId;
}
/** /**
* Garbage collect histry * Garbage collect histry
* *

View File

@ -53,6 +53,27 @@ class TopicManager
} }
} }
public function getSubscriptions(): array
{
$all = [];
foreach ($this->topics as $topic) {
$subs = $topic->getSubscribers();
foreach ($subs as $sub) {
$all[] = [
'id' => './well-known/mercure/subsciptions/'.urlencode($topic->getTopic())."/".urlencode($sub->getId()),
'type' => "Subscription",
'topic' => $topic->getTopic(),
'subscriber' => $sub->getId(),
'active' => true,
'payload' => null,
];
}
}
return $all;
}
public function getTopicCount(): int public function getTopicCount(): int
{ {
return count($this->topics); return count($this->topics);

View File

@ -13,8 +13,6 @@ use React\Promise\PromiseInterface;
class ApiHandler class ApiHandler
{ {
public static string $indexPage;
public function __construct( public function __construct(
private Configuration $config, private Configuration $config,
private TopicManager $topicManager private TopicManager $topicManager
@ -36,12 +34,32 @@ class ApiHandler
$path = $request->getUri()->getPath(); $path = $request->getUri()->getPath();
// FIXME remove this when done debugging
if ($path === "/index.html") { if ($path === "/index.html") {
$resolve(Response::html(self::$indexPage)); $resolve(Response::html(
<<<ENDHTML
<html>
<head>
</head>
<body>
<script type="text/javascript">
const events = new EventSource("http://127.0.0.1:9000/.well-known/mercure?topic=https://example.com/books/1");
events.onmessage = function (msg) {
console.log(msg);
const message = document.createElement('div');
message.innerText = msg.data;
document.getElementById('messages').appendChild(message);
};
</script>
<div id="messages">
</body>
</html>
ENDHTML
));
} }
switch (true) { switch (true) {
case preg_match('<^/.well-known/mercure/subscriptions(/.+?)$>', $path, $m): case preg_match('<^/.well-known/mercure/subscriptions(/.+?){0,1}$>', $path, $m):
$query = explode("/", trim($m[1]??null, "/")); $query = explode("/", trim($m[1]??null, "/"));
$topic = array_shift($query); $topic = array_shift($query);
$subscription = array_shift($query); $subscription = array_shift($query);
@ -77,14 +95,14 @@ class ApiHandler
{ {
// TODO implement once we can enumerate topics and subscriptions // TODO implement once we can enumerate topics and subscriptions
// mock data $subscriptions = $this->topicManager->getSubscriptions();
$lastEventId = "urn:uuid:5e94c686-2c0b-4f9b-958c-92ccc3bbb4eb"; $lastEventId = "urn:uuid:5e94c686-2c0b-4f9b-958c-92ccc3bbb4eb";
$data = [ $data = [
"@context" => "https://mercure.rocks/", "@context" => "https://mercure.rocks/",
"id" => "/.well-known/mercure/subscriptions", "id" => "/.well-known/mercure/subscriptions",
"type" => "Subscriptions", "type" => "Subscriptions",
"lastEventID" => $lastEventId, "lastEventID" => $lastEventId,
"subscriptions" => [] "subscriptions" => $subscriptions
]; ];
return Response::json($data) return Response::json($data)
@ -94,17 +112,3 @@ class ApiHandler
} }
ApiHandler::$indexPage = <<<ENDHTML
<html>
<head>
<meta http-equiv="Content-Security-Policy" content="default-src 'self' http: 'unsafe-eval' 'unsafe-inline'; style-src 'self';">
</head>
<body>
<script type="text/javascript">
const events = new EventSource("http://127.0.0.1:9000/.well-known/mercure?topic=https://example.com/books/1");
events.onmessage = msg => console.log(msg);
</script>
</body>
</html>
ENDHTML;

View File

@ -30,8 +30,8 @@ class Server
private SplObjectStorage $webSocketClients; private SplObjectStorage $webSocketClients;
private TopicManager $topicManager; private TopicManager $topicManager;
private Logger|LoggerInterface $logger; private Logger $logger;
private ResponseMiddleware $responseMiddleware; private ResponseMiddleware $responseMiddleware;
private SecurityMiddleware $securityMiddleware; private SecurityMiddleware $securityMiddleware;
@ -73,7 +73,7 @@ class Server
$this->server->listen($socket); $this->server->listen($socket);
} }
private function createLogger(): LoggerInterface private function createLogger(): Logger
{ {
$handlers = [ $handlers = [
new StreamHandler(STDOUT) new StreamHandler(STDOUT)