WebSocket logic
This commit is contained in:
		@@ -40,8 +40,8 @@ Take a look at `src/Daemon.php` and `src/Http/Server.php` depending on how much
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
* [ ] WebSocket distributor
 | 
					* [ ] WebSocket distributor
 | 
				
			||||||
  * [ ] WebSocket authentication
 | 
					  * [ ] WebSocket authentication
 | 
				
			||||||
  * [ ] Setup subscriptions
 | 
					  * [x] Setup subscriptions
 | 
				
			||||||
  * [ ] Dynamic subscriptions
 | 
					  * [x] Dynamic subscriptions
 | 
				
			||||||
  * [x] Distribute events over WS
 | 
					  * [x] Distribute events over WS
 | 
				
			||||||
* [ ] HTTP middleware unittests
 | 
					* [ ] HTTP middleware unittests
 | 
				
			||||||
* [ ] Replay missed events based on event id
 | 
					* [ ] Replay missed events based on event id
 | 
				
			||||||
 
 | 
				
			|||||||
							
								
								
									
										83
									
								
								src/Broker/WsSubscriber.php
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										83
									
								
								src/Broker/WsSubscriber.php
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,83 @@
 | 
				
			|||||||
 | 
					<?php
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					namespace NoccyLabs\Mercureact\Broker;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					use Evenement\EventEmitterInterface;
 | 
				
			||||||
 | 
					use Evenement\EventEmitterTrait;
 | 
				
			||||||
 | 
					use NoccyLabs\React\WebSocket\WebSocketConnection;
 | 
				
			||||||
 | 
					use NoccyLabs\SimpleJWT\JWTToken;
 | 
				
			||||||
 | 
					use Psr\Http\Message\ServerRequestInterface;
 | 
				
			||||||
 | 
					use React\Stream\WritableStreamInterface;
 | 
				
			||||||
 | 
					use Symfony\Component\Uid\Uuid;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					class WsSubscriber implements SubscriberInterface, EventEmitterInterface
 | 
				
			||||||
 | 
					{
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    use EventEmitterTrait;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    const EVENT_SUBSCRIBE = 'subscribe';
 | 
				
			||||||
 | 
					    const EVENT_UNSUBSCRIBE = 'unsubscribe';
 | 
				
			||||||
 | 
					    const EVENT_ERROR = 'error';
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    const STATE_UNAUTHORIZED = 0;
 | 
				
			||||||
 | 
					    const STATE_AUTHORIZED = 1;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    private string $id;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    private int $state = self::STATE_UNAUTHORIZED;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    public function __construct(
 | 
				
			||||||
 | 
					        private WebSocketConnection $stream,
 | 
				
			||||||
 | 
					        private ServerRequestInterface $request,
 | 
				
			||||||
 | 
					        private TopicManager $topicManager,
 | 
				
			||||||
 | 
					        private ?JWTToken $token = null
 | 
				
			||||||
 | 
					    )
 | 
				
			||||||
 | 
					    {
 | 
				
			||||||
 | 
					        $this->id = (string)Uuid::v7();
 | 
				
			||||||
 | 
					        $this->stream->on('text', $this->onWebSocketData(...));
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    private function onWebSocketData(string $data): void
 | 
				
			||||||
 | 
					    {
 | 
				
			||||||
 | 
					        $toks = str_getcsv($data, " ");
 | 
				
			||||||
 | 
					        switch (array_shift($toks)) {
 | 
				
			||||||
 | 
					            case 'auth':
 | 
				
			||||||
 | 
					                $this->stream->write('{"ok":true}');
 | 
				
			||||||
 | 
					                break;
 | 
				
			||||||
 | 
					            case 'subscribe':
 | 
				
			||||||
 | 
					                $this->topicManager->subscribe($this, $toks);
 | 
				
			||||||
 | 
					                $this->stream->write('{"ok":true}');
 | 
				
			||||||
 | 
					                break;
 | 
				
			||||||
 | 
					            case 'unsubscribe':
 | 
				
			||||||
 | 
					                $this->topicManager->unsubscribe($this, $toks);
 | 
				
			||||||
 | 
					                $this->stream->write('{"ok":true}');
 | 
				
			||||||
 | 
					                break;
 | 
				
			||||||
 | 
					            default:
 | 
				
			||||||
 | 
					                $this->stream->write('{"ok":false}');                
 | 
				
			||||||
 | 
					        }
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    public function deliver(Message $message): void
 | 
				
			||||||
 | 
					    {
 | 
				
			||||||
 | 
					        $this->stream->write(json_encode([
 | 
				
			||||||
 | 
					            'event' => $message->type,
 | 
				
			||||||
 | 
					            'topic' => $message->topic,
 | 
				
			||||||
 | 
					            'data' => $message->data
 | 
				
			||||||
 | 
					        ], JSON_UNESCAPED_SLASHES));
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    public function isAuthorized(): bool
 | 
				
			||||||
 | 
					    {
 | 
				
			||||||
 | 
					        return $this->token && $this->token->isValid();
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    public function getPayload(): array
 | 
				
			||||||
 | 
					    {
 | 
				
			||||||
 | 
					        return $this->request->getAttribute('mercure.payload')??[];
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    public function getId(): string
 | 
				
			||||||
 | 
					    {
 | 
				
			||||||
 | 
					        return "urn:uuid:".$this->id;
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
@@ -19,6 +19,7 @@ class Configuration
 | 
				
			|||||||
            "publish.overwrite_ids" => false,
 | 
					            "publish.overwrite_ids" => false,
 | 
				
			||||||
            "publish.reject_duplicates" => false,
 | 
					            "publish.reject_duplicates" => false,
 | 
				
			||||||
            "server.address" => "127.0.0.1:9000",
 | 
					            "server.address" => "127.0.0.1:9000",
 | 
				
			||||||
 | 
					            "server.websockets" => false,
 | 
				
			||||||
            "server.enable_api" => true,
 | 
					            "server.enable_api" => true,
 | 
				
			||||||
            "server.limits.max_concurrent" => 100,
 | 
					            "server.limits.max_concurrent" => 100,
 | 
				
			||||||
            "server.limits.max_request_body" => 102400,
 | 
					            "server.limits.max_request_body" => 102400,
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -3,6 +3,7 @@
 | 
				
			|||||||
namespace NoccyLabs\Mercureact\Http\Middleware;
 | 
					namespace NoccyLabs\Mercureact\Http\Middleware;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
use NoccyLabs\Mercureact\Broker\TopicManager;
 | 
					use NoccyLabs\Mercureact\Broker\TopicManager;
 | 
				
			||||||
 | 
					use NoccyLabs\Mercureact\Broker\WsSubscriber;
 | 
				
			||||||
use NoccyLabs\Mercureact\Configuration;
 | 
					use NoccyLabs\Mercureact\Configuration;
 | 
				
			||||||
use NoccyLabs\React\WebSocket\WebSocketConnection;
 | 
					use NoccyLabs\React\WebSocket\WebSocketConnection;
 | 
				
			||||||
use NoccyLabs\React\WebSocket\WebSocketMiddleware;
 | 
					use NoccyLabs\React\WebSocket\WebSocketMiddleware;
 | 
				
			||||||
@@ -59,13 +60,16 @@ class WebSocketHandler
 | 
				
			|||||||
    {
 | 
					    {
 | 
				
			||||||
        $this->webSocketClients->attach($connection);
 | 
					        $this->webSocketClients->attach($connection);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        $connection->on('close', function () use ($connection) {
 | 
					        $request = $connection->getServerRequest();
 | 
				
			||||||
 | 
					        $subscriber = new WsSubscriber($connection, $request, $this->topicManager);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        $connection->on('close', function () use ($connection, $subscriber) {
 | 
				
			||||||
            $this->webSocketClients->detach($connection);
 | 
					            $this->webSocketClients->detach($connection);
 | 
				
			||||||
 | 
					            $this->topicManager->unsubscribe($subscriber);
 | 
				
			||||||
        });
 | 
					        });
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        $request = $connection->getServerRequest();
 | 
					        $this->topicManager->subscribe($subscriber, []);
 | 
				
			||||||
        $topic = $request->getQueryParams()['topic'][0]??'';
 | 
					        
 | 
				
			||||||
        $connection->setGroup($topic);
 | 
					 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
		Reference in New Issue
	
	Block a user