mercureact/src/Broker/Subscriber/WsSubscriber.php

85 lines
2.3 KiB
PHP

<?php
namespace NoccyLabs\Mercureact\Broker\Subscriber;
use Evenement\EventEmitterInterface;
use Evenement\EventEmitterTrait;
use NoccyLabs\Mercureact\Broker\Message;
use NoccyLabs\Mercureact\Broker\TopicManager;
use NoccyLabs\React\WebSocket\WebSocketConnection;
use NoccyLabs\SimpleJWT\JWTToken;
use Psr\Http\Message\ServerRequestInterface;
use React\Stream\WritableStreamInterface;
use Symfony\Component\Uid\Uuid;
class WsSubscriber implements SubscriberInterface, EventEmitterInterface
{
use EventEmitterTrait;
const EVENT_SUBSCRIBE = 'subscribe';
const EVENT_UNSUBSCRIBE = 'unsubscribe';
const EVENT_ERROR = 'error';
private string $id;
public function __construct(
private WebSocketConnection $stream,
private ServerRequestInterface $request,
private TopicManager $topicManager,
private ?JWTToken $token = null
)
{
$this->id = (string)Uuid::v7();
$this->stream->on('text', $this->onWebSocketData(...));
}
private function onWebSocketData(string $data): void
{
$toks = str_getcsv($data, " ");
switch (array_shift($toks)) {
case 'auth':
$this->stream->write('{"ok":true}');
break;
case 'subscribe':
$this->topicManager->subscribe($this, $toks);
$this->stream->write('{"ok":true}');
break;
case 'unsubscribe':
$this->topicManager->unsubscribe($this, $toks);
$this->stream->write('{"ok":true}');
break;
default:
$this->stream->write('{"ok":false}');
}
}
public function deliver(Message $message): void
{
$this->stream->write(json_encode([
'event' => $message->type,
'topic' => $message->topic,
'data' => $message->data
], JSON_UNESCAPED_SLASHES));
}
public function isAuthenticated(): bool
{
return $this->token && $this->token->isValid();
}
public function getMercureClaims(): ?array
{
return $this->request->getAttribute('mercure.claims');
}
public function getPayload(): array
{
return $this->request->getAttribute('mercure.payload')??[];
}
public function getId(): string
{
return "urn:uuid:".$this->id;
}
}