diff --git a/README.md b/README.md index b0a6fac..fc0ba3b 100644 --- a/README.md +++ b/README.md @@ -40,8 +40,8 @@ Take a look at `src/Daemon.php` and `src/Http/Server.php` depending on how much * [ ] WebSocket distributor * [ ] WebSocket authentication - * [ ] Setup subscriptions - * [ ] Dynamic subscriptions + * [x] Setup subscriptions + * [x] Dynamic subscriptions * [x] Distribute events over WS * [ ] HTTP middleware unittests * [ ] Replay missed events based on event id diff --git a/src/Broker/WsSubscriber.php b/src/Broker/WsSubscriber.php new file mode 100644 index 0000000..7e55ccd --- /dev/null +++ b/src/Broker/WsSubscriber.php @@ -0,0 +1,83 @@ +id = (string)Uuid::v7(); + $this->stream->on('text', $this->onWebSocketData(...)); + } + + private function onWebSocketData(string $data): void + { + $toks = str_getcsv($data, " "); + switch (array_shift($toks)) { + case 'auth': + $this->stream->write('{"ok":true}'); + break; + case 'subscribe': + $this->topicManager->subscribe($this, $toks); + $this->stream->write('{"ok":true}'); + break; + case 'unsubscribe': + $this->topicManager->unsubscribe($this, $toks); + $this->stream->write('{"ok":true}'); + break; + default: + $this->stream->write('{"ok":false}'); + } + } + + public function deliver(Message $message): void + { + $this->stream->write(json_encode([ + 'event' => $message->type, + 'topic' => $message->topic, + 'data' => $message->data + ], JSON_UNESCAPED_SLASHES)); + } + + public function isAuthorized(): bool + { + return $this->token && $this->token->isValid(); + } + + public function getPayload(): array + { + return $this->request->getAttribute('mercure.payload')??[]; + } + + public function getId(): string + { + return "urn:uuid:".$this->id; + } +} \ No newline at end of file diff --git a/src/Configuration.php b/src/Configuration.php index 0523882..555a11a 100644 --- a/src/Configuration.php +++ b/src/Configuration.php @@ -19,6 +19,7 @@ class Configuration "publish.overwrite_ids" => false, "publish.reject_duplicates" => false, "server.address" => "127.0.0.1:9000", + "server.websockets" => false, "server.enable_api" => true, "server.limits.max_concurrent" => 100, "server.limits.max_request_body" => 102400, diff --git a/src/Http/Middleware/WebSocketHandler.php b/src/Http/Middleware/WebSocketHandler.php index e9d0053..13d5c48 100644 --- a/src/Http/Middleware/WebSocketHandler.php +++ b/src/Http/Middleware/WebSocketHandler.php @@ -3,6 +3,7 @@ namespace NoccyLabs\Mercureact\Http\Middleware; use NoccyLabs\Mercureact\Broker\TopicManager; +use NoccyLabs\Mercureact\Broker\WsSubscriber; use NoccyLabs\Mercureact\Configuration; use NoccyLabs\React\WebSocket\WebSocketConnection; use NoccyLabs\React\WebSocket\WebSocketMiddleware; @@ -59,13 +60,16 @@ class WebSocketHandler { $this->webSocketClients->attach($connection); - $connection->on('close', function () use ($connection) { + $request = $connection->getServerRequest(); + $subscriber = new WsSubscriber($connection, $request, $this->topicManager); + + $connection->on('close', function () use ($connection, $subscriber) { $this->webSocketClients->detach($connection); + $this->topicManager->unsubscribe($subscriber); }); - $request = $connection->getServerRequest(); - $topic = $request->getQueryParams()['topic'][0]??''; - $connection->setGroup($topic); + $this->topicManager->subscribe($subscriber, []); + } } \ No newline at end of file