From 8be2e8105423b5e66b296732f459b1ddc6162501 Mon Sep 17 00:00:00 2001 From: Christopher Vagnetoft Date: Tue, 12 Mar 2024 01:45:21 +0100 Subject: [PATCH] First unit tests, misc fixes --- bin/mercureactd | 2 +- mercureactd.conf.dist | 2 +- src/Broker/Message.php | 12 ++--- src/Broker/SubscriberInterface.php | 2 + src/Configuration.php | 14 ++++++ src/Http/Middleware/MercureHandler.php | 30 +++++------ tests/Broker/MessageTest.php | 70 ++++++++++++++++++++++++++ tests/Broker/TopicTest.php | 63 +++++++++++++++++++++++ 8 files changed, 173 insertions(+), 22 deletions(-) create mode 100644 tests/Broker/MessageTest.php create mode 100644 tests/Broker/TopicTest.php diff --git a/bin/mercureactd b/bin/mercureactd index 8987e4e..caac44c 100755 --- a/bin/mercureactd +++ b/bin/mercureactd @@ -53,7 +53,7 @@ if (isset($opts['C'])) { key: foo.key publish: - overwrite_id: false + overwrite_ids: false reject_duplicates: true subscribe: diff --git a/mercureactd.conf.dist b/mercureactd.conf.dist index e621256..a93c7c7 100644 --- a/mercureactd.conf.dist +++ b/mercureactd.conf.dist @@ -21,7 +21,7 @@ server: publish: # Assign a UUID to published messages even if one is already set in the message - overwrite_id: false + overwrite_ids: false # Reject messages with previously seen IDs reject_duplicates: true diff --git a/src/Broker/Message.php b/src/Broker/Message.php index a77fee7..e9326c5 100644 --- a/src/Broker/Message.php +++ b/src/Broker/Message.php @@ -23,11 +23,11 @@ class Message public function __construct( array $topic, - ?string $type, - ?string $data, - ?bool $private, - ?string $id, - ?int $retry + ?string $data=null, + ?string $type=null, + ?bool $private=null, + ?string $id=null, + ?int $retry=null ) { $this->topic = $topic; @@ -60,7 +60,7 @@ class Message topic: (array)$data['topic'], type: $data['type']??null, data: $data['data']??null, - private: match ($data['private']??null) { "on" => true, null => null, default => false }, + private: match ($data['private']??null) { "on" => true, true => true, null => null, default => false }, id: $data['id']??null, retry: $data['retry']??null, ); diff --git a/src/Broker/SubscriberInterface.php b/src/Broker/SubscriberInterface.php index af1107c..5f24d59 100644 --- a/src/Broker/SubscriberInterface.php +++ b/src/Broker/SubscriberInterface.php @@ -7,4 +7,6 @@ interface SubscriberInterface public function deliver(Message $message): void; public function isAuthorized(): bool; + + public function getPayload(): ?array; } \ No newline at end of file diff --git a/src/Configuration.php b/src/Configuration.php index 92cc390..1c65b22 100644 --- a/src/Configuration.php +++ b/src/Configuration.php @@ -108,5 +108,19 @@ class Configuration return $this; } + public function getOverwriteMessageIds(): bool + { + return $this->config['publish.overwrite_ids']??true; + } + + public function getRejectDuplicateMessages(): bool + { + return $this->config['publish.reject_duplicates']??true; + } + + public function getDuplicateIdHistorySize(): int + { + return 50; + } } diff --git a/src/Http/Middleware/MercureHandler.php b/src/Http/Middleware/MercureHandler.php index 684d3bd..d8456fe 100644 --- a/src/Http/Middleware/MercureHandler.php +++ b/src/Http/Middleware/MercureHandler.php @@ -2,6 +2,7 @@ namespace NoccyLabs\Mercureact\Http\Middleware; +use LDAP\Result; use NoccyLabs\Mercureact\Broker\Message; use NoccyLabs\Mercureact\Broker\SseSubscriber; use NoccyLabs\Mercureact\Broker\TopicManager; @@ -24,6 +25,10 @@ class MercureHandler { private LoopInterface $loop; + private array $seenMessageIds = []; + + private int $seenIdHistorySize = 100; + public function __construct( private Configuration $config, private TopicManager $topicManager, @@ -31,6 +36,7 @@ class MercureHandler ) { $this->loop = $loop ?? Loop::get(); + $this->seenIdHistorySize = $this->config->getDuplicateIdHistorySize(); } /** @@ -166,9 +172,16 @@ class MercureHandler ); } + if ($this->config->getRejectDuplicateMessages() && !empty($data['id'])) { + if (in_array($data['id'], $this->seenMessageIds)) { + return Response::plaintext("Duplicate message id")->withStatus(Response::STATUS_BAD_REQUEST); + } + array_push($this->seenMessageIds, $data['id']); + $this->seenMessageIds = array_slice($this->seenMessageIds, -100, 100); + } + // Put an id in there if none already - // TODO add a configurable for this - if (!isset($data['id'])) { + if (empty($data['id']) || $this->config->getOverwriteMessageIds()) { $data['id'] = (string)Uuid::v7(); } @@ -176,7 +189,7 @@ class MercureHandler $message = Message::fromData($data); $this->loop->futureTick(function () use ($message) { - $this->publishMercureMessage($message); + $this->topicManager->publish($message); }); return Response::plaintext("urn:uuid:".$message->id."\n"); @@ -201,15 +214,4 @@ class MercureHandler return ($matched == count($topic)); } - /** - * - * - * @param Message $message - * @return void - */ - private function publishMercureMessage(Message $message): void - { - $this->topicManager->publish($message); - } - } \ No newline at end of file diff --git a/tests/Broker/MessageTest.php b/tests/Broker/MessageTest.php new file mode 100644 index 0000000..4ea20fe --- /dev/null +++ b/tests/Broker/MessageTest.php @@ -0,0 +1,70 @@ + '1a', + 'topic' => '2b', + 'data' => '3c', + 'type' => '4d', + 'retry' => 42, + 'private' => true, + ]); + + $this->assertEquals("1a", $message->id); + $this->assertEquals(["2b"], $message->topic); + $this->assertEquals("3c", $message->data); + $this->assertEquals("4d", $message->type); + $this->assertEquals(42, $message->retry); + $this->assertEquals(true, $message->private); + + $message = Message::fromData([ + 'id' => '1a', + 'topic' => ['2b'], + 'data' => '3c', + 'type' => '4d', + 'retry' => 42, + 'private' => true, + ]); + + $this->assertEquals("1a", $message->id); + $this->assertEquals(["2b"], $message->topic); + $this->assertEquals("3c", $message->data); + $this->assertEquals("4d", $message->type); + $this->assertEquals(42, $message->retry); + $this->assertEquals(true, $message->private); + + } + + public function testCreatingSseFromMessage() + { + $message = Message::fromData([ + 'id' => '1a', + 'topic' => '2b', + 'data' => '3c', + 'type' => '4d', + 'retry' => 42, + 'private' => true, + ]); + $sse = $message->toString(); + + $this->assertEquals(<<messages[] = $message; } + public function getPayload(): ?array { return null; } + }; + $unauthorizedSubscriber = new class implements SubscriberInterface { + public array $messages = []; + public function isAuthorized():bool { return false; } + public function deliver(Message $message):void { $this->messages[] = $message; } + public function getPayload(): ?array { return null; } + }; + + $topic = new Topic("foo"); + $topic->addSubscriber($authorizedSubscriber); + $topic->addSubscriber($unauthorizedSubscriber); + + $message = new Message(topic:["foo"], data:"test", private:false); + $topic->publish($message); + + $this->assertEquals(1, count($authorizedSubscriber->messages)); + $this->assertEquals(1, count($unauthorizedSubscriber->messages)); + } + + public function testPrivateMessagesAreNotDeliveredToUnauthorizedSubscribers() + { + $authorizedSubscriber = new class implements SubscriberInterface { + public array $messages = []; + public function isAuthorized():bool { return true; } + public function deliver(Message $message):void { $this->messages[] = $message; } + public function getPayload(): ?array { return null; } + }; + $unauthorizedSubscriber = new class implements SubscriberInterface { + public array $messages = []; + public function isAuthorized():bool { return false; } + public function deliver(Message $message):void { $this->messages[] = $message; } + public function getPayload(): ?array { return null; } + }; + + $topic = new Topic("foo"); + $topic->addSubscriber($authorizedSubscriber); + $topic->addSubscriber($unauthorizedSubscriber); + + $message = new Message(topic:["foo"], data:"test", private:true); + $topic->publish($message); + + $this->assertEquals(1, count($authorizedSubscriber->messages)); + $this->assertEquals(0, count($unauthorizedSubscriber->messages)); + } + +} \ No newline at end of file