Read config, handle lastEventId in topicmanager

This commit is contained in:
Chris 2024-03-11 22:12:01 +01:00
parent 8cbd12ee61
commit 99b5710c59
8 changed files with 71 additions and 13 deletions

View File

@ -12,9 +12,20 @@ if (isset($opts['c'])) {
$config = Configuration::fromFile($opts['c']); $config = Configuration::fromFile($opts['c']);
} else { } else {
$config = Configuration::createDefault() $config = Configuration::createDefault()
->addListener([
'address' => '127.0.0.1:8888',
'subscribe' => [
'anonymous' => true
]
])
->setAllowAnonymousSubscribe(true) ->setAllowAnonymousSubscribe(true)
->setJwtSecret("!ChangeThisMercureHubJWTSecretKey!"); ->setJwtSecret("!ChangeThisMercureHubJWTSecretKey!");
} }
if (count($config->getListeners()) == 0) {
fwrite(STDERR, "No listeners available\n");
exit(1);
}
$daemon = new Daemon($config); $daemon = new Daemon($config);
$daemon->start(); $daemon->start();

View File

@ -1,7 +1,7 @@
# Mercureact default configuration file # Mercureact default configuration file
# Please make a copy of me before editing # Please make a copy of me before editing
listen: listeners:
- address: 0.0.0.0:9000 - address: 0.0.0.0:9000
# Setup CORS headers # Setup CORS headers
@ -28,7 +28,7 @@ publish:
subscribe: subscribe:
# Allow anonymous subscription for public updates # Allow anonymous subscription for public updates
allow_anonymous: false allow_anonymous: true
security: security:
jwt_secret: "!ChangeThisMercureHubJWTSecretKey!" jwt_secret: "!ChangeThisMercureHubJWTSecretKey!"

View File

@ -9,6 +9,8 @@ class TopicManager
/** @var array<string,Topic> */ /** @var array<string,Topic> */
private array $topics = []; private array $topics = [];
private ?string $lastEventId = null;
private SplObjectStorage $subscribers; private SplObjectStorage $subscribers;
public function __construct() public function __construct()
@ -26,6 +28,7 @@ class TopicManager
public function publish(Message $message): void public function publish(Message $message): void
{ {
$this->lastEventId = $message->id;
foreach ($message->topic as $topic) { foreach ($message->topic as $topic) {
$this->getTopic($topic)->publish($message); $this->getTopic($topic)->publish($message);
} }
@ -66,7 +69,7 @@ class TopicManager
'topic' => $topic->getTopic(), 'topic' => $topic->getTopic(),
'subscriber' => $sub->getId(), 'subscriber' => $sub->getId(),
'active' => true, 'active' => true,
'payload' => null, 'payload' => null, // TODO populate from mercure.payload in JWT
]; ];
} }
} }
@ -74,6 +77,11 @@ class TopicManager
return $all; return $all;
} }
public function getLastEventId(): ?string
{
return ($this->lastEventId !== null)?("urn:uuid:".$this->lastEventId):null;
}
public function getTopicCount(): int public function getTopicCount(): int
{ {
return count($this->topics); return count($this->topics);

View File

@ -12,6 +12,8 @@ class Configuration
private bool $allowAnonymousSubscribe = false; private bool $allowAnonymousSubscribe = false;
private array $listeners = [];
public static function createDefault(): Configuration public static function createDefault(): Configuration
{ {
return new Configuration(); return new Configuration();
@ -35,6 +37,15 @@ class Configuration
$config->setAllowAnonymousSubscribe(boolval($subscribe['allow_anonymous'])); $config->setAllowAnonymousSubscribe(boolval($subscribe['allow_anonymous']));
} }
if (isset($yaml['listeners'])) {
foreach ($yaml['listeners'] as $listener) {
if (!is_array($listener)) {
throw new \Exception("Bad listener config");
}
$config->addListener($listener);
}
}
return $config; return $config;
} }
@ -70,5 +81,28 @@ class Configuration
$this->allowAnonymousSubscribe = $allowAnonymousSubscribe; $this->allowAnonymousSubscribe = $allowAnonymousSubscribe;
return $this; return $this;
} }
function addListener(array $config): self
{
$this->listeners[] = [
'address' => $config['address']??throw new \Exception("Address can't be empty"),
'cors' => isset($config['cors'])?[
'allow_origin' => $config['cors']['allow_origin']??'*',
'csp' => $config['cors']['csp']??'default-src * \'self\'',
]:[
'allow_origin' => '*',
'csp' => 'default-src * \'self\'',
],
'websocket' => isset($config['websocket'])?[
'enable' => $config['websocket']['enable']??false
]:null,
];
return $this;
}
public function getListeners(): array
{
return $this->listeners;
}
} }

View File

@ -25,9 +25,12 @@ class Daemon
{ {
$this->server = new Server($this->config, $this->loop); $this->server = new Server($this->config, $this->loop);
$socket = new SocketServer("tcp://0.0.0.0:9000"); $listeners = $this->config->getListeners();
foreach ($listeners as $listener) {
$socket = new SocketServer("tcp://".$listener['address']);
$this->server->listen($socket); $this->server->listen($socket);
} }
}
public function stop(): void public function stop(): void
{ {

View File

@ -66,7 +66,7 @@ class ApiHandler
$resolve($this->apiGetSubscriptions($topic, $subscription)); $resolve($this->apiGetSubscriptions($topic, $subscription));
return; return;
case preg_match('<^/.well-known/mercureact/status$>', $path): case preg_match('<^/.well-known/mercure/status$>', $path):
$resolve([ $resolve([
'server' => 'Mercureact/1.0', 'server' => 'Mercureact/1.0',
'topics' => $this->topicManager->getTopicCount(), 'topics' => $this->topicManager->getTopicCount(),
@ -75,10 +75,6 @@ class ApiHandler
'memoryUsage' => memory_get_usage(true) 'memoryUsage' => memory_get_usage(true)
]); ]);
return; return;
case preg_match('<^/.well-known/mercureact/status$>', $path):
$resolve([ 'version' => '1.0' ]);
return;
} }
$resolve($next($request)); $resolve($next($request));
@ -96,7 +92,7 @@ class ApiHandler
// TODO implement once we can enumerate topics and subscriptions // TODO implement once we can enumerate topics and subscriptions
$subscriptions = $this->topicManager->getSubscriptions(); $subscriptions = $this->topicManager->getSubscriptions();
$lastEventId = "urn:uuid:5e94c686-2c0b-4f9b-958c-92ccc3bbb4eb"; $lastEventId = $this->topicManager->getLastEventId();
$data = [ $data = [
"@context" => "https://mercure.rocks/", "@context" => "https://mercure.rocks/",
"id" => "/.well-known/mercure/subscriptions", "id" => "/.well-known/mercure/subscriptions",

View File

@ -17,6 +17,7 @@ use React\Http\Message\Response;
use React\Promise\Promise; use React\Promise\Promise;
use React\Promise\PromiseInterface; use React\Promise\PromiseInterface;
use React\Stream\ThroughStream; use React\Stream\ThroughStream;
use Rize\UriTemplate\UriTemplate;
use Symfony\Component\Uid\Uuid; use Symfony\Component\Uid\Uuid;
class MercureHandler class MercureHandler
@ -194,11 +195,15 @@ class MercureHandler
$matched = 0; $matched = 0;
foreach ((array)$topic as $match) { foreach ((array)$topic as $match) {
foreach ($claims as $claim) { foreach ($claims as $claim) {
// TODO implement matching of URI Templates
if (($claim === "*") || ($claim === $match)) { if (($claim === "*") || ($claim === $match)) {
$matched++; $matched++;
break; break;
} }
// TODO make sure that UriTemplate parsing works
if ((new UriTemplate())->extract($claim, $match, true)) {
$matched++;
break;
}
} }
} }
return ($matched == count($topic)); return ($matched == count($topic));

View File

@ -70,6 +70,7 @@ class Server
*/ */
public function listen(ServerInterface $socket): void public function listen(ServerInterface $socket): void
{ {
$this->logger->info("Listening on ".$socket->getAddress()."\n");
$this->server->listen($socket); $this->server->listen($socket);
} }