From 99b5710c59f03b679a55ecf790dcf140d21ce046 Mon Sep 17 00:00:00 2001 From: Christopher Vagnetoft Date: Mon, 11 Mar 2024 22:12:01 +0100 Subject: [PATCH] Read config, handle lastEventId in topicmanager --- bin/mercureactd | 11 +++++++++ mercureactd.conf.dist | 6 ++--- src/Broker/TopicManager.php | 10 +++++++- src/Configuration.php | 34 ++++++++++++++++++++++++++ src/Daemon.php | 7 ++++-- src/Http/Middleware/ApiHandler.php | 8 ++---- src/Http/Middleware/MercureHandler.php | 7 +++++- src/Http/Server.php | 1 + 8 files changed, 71 insertions(+), 13 deletions(-) diff --git a/bin/mercureactd b/bin/mercureactd index a256e02..fa5bfb9 100755 --- a/bin/mercureactd +++ b/bin/mercureactd @@ -12,9 +12,20 @@ if (isset($opts['c'])) { $config = Configuration::fromFile($opts['c']); } else { $config = Configuration::createDefault() + ->addListener([ + 'address' => '127.0.0.1:8888', + 'subscribe' => [ + 'anonymous' => true + ] + ]) ->setAllowAnonymousSubscribe(true) ->setJwtSecret("!ChangeThisMercureHubJWTSecretKey!"); } +if (count($config->getListeners()) == 0) { + fwrite(STDERR, "No listeners available\n"); + exit(1); +} + $daemon = new Daemon($config); $daemon->start(); \ No newline at end of file diff --git a/mercureactd.conf.dist b/mercureactd.conf.dist index 4272ec9..7b43037 100644 --- a/mercureactd.conf.dist +++ b/mercureactd.conf.dist @@ -1,7 +1,7 @@ # Mercureact default configuration file # Please make a copy of me before editing -listen: +listeners: - address: 0.0.0.0:9000 # Setup CORS headers @@ -28,7 +28,7 @@ publish: subscribe: # Allow anonymous subscription for public updates - allow_anonymous: false + allow_anonymous: true security: - jwt_secret: "!ChangeThisMercureHubJWTSecretKey!" \ No newline at end of file + jwt_secret: "!ChangeThisMercureHubJWTSecretKey!" diff --git a/src/Broker/TopicManager.php b/src/Broker/TopicManager.php index 924ea3e..ffc6ead 100644 --- a/src/Broker/TopicManager.php +++ b/src/Broker/TopicManager.php @@ -9,6 +9,8 @@ class TopicManager /** @var array */ private array $topics = []; + private ?string $lastEventId = null; + private SplObjectStorage $subscribers; public function __construct() @@ -26,6 +28,7 @@ class TopicManager public function publish(Message $message): void { + $this->lastEventId = $message->id; foreach ($message->topic as $topic) { $this->getTopic($topic)->publish($message); } @@ -66,7 +69,7 @@ class TopicManager 'topic' => $topic->getTopic(), 'subscriber' => $sub->getId(), 'active' => true, - 'payload' => null, + 'payload' => null, // TODO populate from mercure.payload in JWT ]; } } @@ -74,6 +77,11 @@ class TopicManager return $all; } + public function getLastEventId(): ?string + { + return ($this->lastEventId !== null)?("urn:uuid:".$this->lastEventId):null; + } + public function getTopicCount(): int { return count($this->topics); diff --git a/src/Configuration.php b/src/Configuration.php index 783fa23..7efb15d 100644 --- a/src/Configuration.php +++ b/src/Configuration.php @@ -12,6 +12,8 @@ class Configuration private bool $allowAnonymousSubscribe = false; + private array $listeners = []; + public static function createDefault(): Configuration { return new Configuration(); @@ -35,6 +37,15 @@ class Configuration $config->setAllowAnonymousSubscribe(boolval($subscribe['allow_anonymous'])); } + if (isset($yaml['listeners'])) { + foreach ($yaml['listeners'] as $listener) { + if (!is_array($listener)) { + throw new \Exception("Bad listener config"); + } + $config->addListener($listener); + } + } + return $config; } @@ -70,5 +81,28 @@ class Configuration $this->allowAnonymousSubscribe = $allowAnonymousSubscribe; return $this; } + + function addListener(array $config): self + { + $this->listeners[] = [ + 'address' => $config['address']??throw new \Exception("Address can't be empty"), + 'cors' => isset($config['cors'])?[ + 'allow_origin' => $config['cors']['allow_origin']??'*', + 'csp' => $config['cors']['csp']??'default-src * \'self\'', + ]:[ + 'allow_origin' => '*', + 'csp' => 'default-src * \'self\'', + ], + 'websocket' => isset($config['websocket'])?[ + 'enable' => $config['websocket']['enable']??false + ]:null, + ]; + return $this; + } + + public function getListeners(): array + { + return $this->listeners; + } } diff --git a/src/Daemon.php b/src/Daemon.php index c6602a0..261acba 100644 --- a/src/Daemon.php +++ b/src/Daemon.php @@ -25,8 +25,11 @@ class Daemon { $this->server = new Server($this->config, $this->loop); - $socket = new SocketServer("tcp://0.0.0.0:9000"); - $this->server->listen($socket); + $listeners = $this->config->getListeners(); + foreach ($listeners as $listener) { + $socket = new SocketServer("tcp://".$listener['address']); + $this->server->listen($socket); + } } public function stop(): void diff --git a/src/Http/Middleware/ApiHandler.php b/src/Http/Middleware/ApiHandler.php index 7bbc1fc..33b1772 100644 --- a/src/Http/Middleware/ApiHandler.php +++ b/src/Http/Middleware/ApiHandler.php @@ -66,7 +66,7 @@ class ApiHandler $resolve($this->apiGetSubscriptions($topic, $subscription)); return; - case preg_match('<^/.well-known/mercureact/status$>', $path): + case preg_match('<^/.well-known/mercure/status$>', $path): $resolve([ 'server' => 'Mercureact/1.0', 'topics' => $this->topicManager->getTopicCount(), @@ -75,10 +75,6 @@ class ApiHandler 'memoryUsage' => memory_get_usage(true) ]); return; - - case preg_match('<^/.well-known/mercureact/status$>', $path): - $resolve([ 'version' => '1.0' ]); - return; } $resolve($next($request)); @@ -96,7 +92,7 @@ class ApiHandler // TODO implement once we can enumerate topics and subscriptions $subscriptions = $this->topicManager->getSubscriptions(); - $lastEventId = "urn:uuid:5e94c686-2c0b-4f9b-958c-92ccc3bbb4eb"; + $lastEventId = $this->topicManager->getLastEventId(); $data = [ "@context" => "https://mercure.rocks/", "id" => "/.well-known/mercure/subscriptions", diff --git a/src/Http/Middleware/MercureHandler.php b/src/Http/Middleware/MercureHandler.php index 0e8d9ce..d68e3b1 100644 --- a/src/Http/Middleware/MercureHandler.php +++ b/src/Http/Middleware/MercureHandler.php @@ -17,6 +17,7 @@ use React\Http\Message\Response; use React\Promise\Promise; use React\Promise\PromiseInterface; use React\Stream\ThroughStream; +use Rize\UriTemplate\UriTemplate; use Symfony\Component\Uid\Uuid; class MercureHandler @@ -194,11 +195,15 @@ class MercureHandler $matched = 0; foreach ((array)$topic as $match) { foreach ($claims as $claim) { - // TODO implement matching of URI Templates if (($claim === "*") || ($claim === $match)) { $matched++; break; } + // TODO make sure that UriTemplate parsing works + if ((new UriTemplate())->extract($claim, $match, true)) { + $matched++; + break; + } } } return ($matched == count($topic)); diff --git a/src/Http/Server.php b/src/Http/Server.php index 464e6ff..b5a46d8 100644 --- a/src/Http/Server.php +++ b/src/Http/Server.php @@ -70,6 +70,7 @@ class Server */ public function listen(ServerInterface $socket): void { + $this->logger->info("Listening on ".$socket->getAddress()."\n"); $this->server->listen($socket); }