Initial commit
This commit is contained in:
		
							
								
								
									
										59
									
								
								src/Broker/Message.php
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										59
									
								
								src/Broker/Message.php
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,59 @@
 | 
			
		||||
<?php
 | 
			
		||||
 | 
			
		||||
namespace NoccyLabs\Mercureact\Broker;
 | 
			
		||||
 | 
			
		||||
use SplObjectStorage;
 | 
			
		||||
 | 
			
		||||
class Message
 | 
			
		||||
{
 | 
			
		||||
    /** @var array<int,string> The topic, if more than one the first is canonical, but all receive delivery */
 | 
			
		||||
    public readonly array $topic;
 | 
			
		||||
    /** @var string|null The SSE event type */
 | 
			
		||||
    public readonly ?string $type;
 | 
			
		||||
    /** @var string|null Message data */
 | 
			
		||||
    public readonly ?string $data;
 | 
			
		||||
    /** @var bool|null Private update*/
 | 
			
		||||
    public readonly ?bool $private;
 | 
			
		||||
    /** @var string|null Message ID */
 | 
			
		||||
    public readonly ?string $id;
 | 
			
		||||
    /** @var int SSE retry interval */
 | 
			
		||||
    public readonly ?int $retry;
 | 
			
		||||
 | 
			
		||||
    private readonly int $created;
 | 
			
		||||
 | 
			
		||||
    public function __construct(
 | 
			
		||||
        array $topic,
 | 
			
		||||
        ?string $type,
 | 
			
		||||
        ?string $data,
 | 
			
		||||
        ?bool $private,
 | 
			
		||||
        ?string $id,
 | 
			
		||||
        ?int $retry
 | 
			
		||||
    )
 | 
			
		||||
    {
 | 
			
		||||
        $this->topic = $topic;
 | 
			
		||||
        $this->type = $type;
 | 
			
		||||
        $this->data = $data;
 | 
			
		||||
        $this->private = $private;
 | 
			
		||||
        $this->id = $id;
 | 
			
		||||
        $this->retry = $retry;
 | 
			
		||||
        $this->created = time();
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    public function getAge(): int
 | 
			
		||||
    {
 | 
			
		||||
        return time() - $this->created;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    public static function fromData(array $data): Message
 | 
			
		||||
    {
 | 
			
		||||
        return new Message(
 | 
			
		||||
            topic: (array)$data['topic'],
 | 
			
		||||
            type: $data['type']??null,
 | 
			
		||||
            data: $data['data']??null,
 | 
			
		||||
            private: match ($data['private']??null) { "on" => true, null => null, default => false },
 | 
			
		||||
            id: $data['id']??null,
 | 
			
		||||
            retry: $data['retry']??null,
 | 
			
		||||
        );
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
							
								
								
									
										20
									
								
								src/Broker/SubscriptionList.php
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										20
									
								
								src/Broker/SubscriptionList.php
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,20 @@
 | 
			
		||||
<?php
 | 
			
		||||
 | 
			
		||||
namespace NoccyLabs\Mercureact\Broker;
 | 
			
		||||
 | 
			
		||||
use Countable;
 | 
			
		||||
use SplObjectStorage;
 | 
			
		||||
 | 
			
		||||
class SubscriptionList implements Countable
 | 
			
		||||
{
 | 
			
		||||
 | 
			
		||||
    private array $subscriptions = [];
 | 
			
		||||
 | 
			
		||||
    
 | 
			
		||||
 | 
			
		||||
    public function count(): int
 | 
			
		||||
    {
 | 
			
		||||
        return count($this->subscriptions);
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
							
								
								
									
										67
									
								
								src/Broker/Topic.php
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										67
									
								
								src/Broker/Topic.php
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,67 @@
 | 
			
		||||
<?php
 | 
			
		||||
 | 
			
		||||
namespace NoccyLabs\Mercureact\Broker;
 | 
			
		||||
 | 
			
		||||
use SplObjectStorage;
 | 
			
		||||
 | 
			
		||||
class Topic
 | 
			
		||||
{
 | 
			
		||||
    const MAX_HISTORY_AGE = 180;
 | 
			
		||||
 | 
			
		||||
    /** @var string Topic name */
 | 
			
		||||
    private string $topic;
 | 
			
		||||
    /** @var array<string,Message> */
 | 
			
		||||
    private array $messages = [];
 | 
			
		||||
    /** @var int Creation unixtime */
 | 
			
		||||
    private int $created;
 | 
			
		||||
 | 
			
		||||
    private SubscriptionList $subscribers;
 | 
			
		||||
 | 
			
		||||
    public function __construct(string $topic)
 | 
			
		||||
    {
 | 
			
		||||
        $this->topic = $topic;
 | 
			
		||||
        $this->subscribers = new SubscriptionList();
 | 
			
		||||
        $this->created = time();
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    public function publish(Message $message)
 | 
			
		||||
    {
 | 
			
		||||
        // TODO check if message id has already been published
 | 
			
		||||
 | 
			
		||||
        foreach ($this->subscribers as $subscriber) {
 | 
			
		||||
            if ($message->private === true) {
 | 
			
		||||
                // TODO check subscriber access
 | 
			
		||||
            } else {
 | 
			
		||||
                // TODO deliver to subscriber
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    public function getAge(): int
 | 
			
		||||
    {
 | 
			
		||||
        return time() - $this->created;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    public function getHistorySize(): int
 | 
			
		||||
    {
 | 
			
		||||
        return count($this->messages);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    public function getSubscriberCount(): int
 | 
			
		||||
    {
 | 
			
		||||
        return count($this->subscribers);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /**
 | 
			
		||||
     * Garbage collect histry
 | 
			
		||||
     * 
 | 
			
		||||
     */
 | 
			
		||||
    public function garbageCollect(): void
 | 
			
		||||
    {
 | 
			
		||||
        $this->messages = array_filter(
 | 
			
		||||
            $this->messages,
 | 
			
		||||
            fn($message) => $message->getAge() < self::MAX_HISTORY_AGE
 | 
			
		||||
        );
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
							
								
								
									
										50
									
								
								src/Broker/TopicManager.php
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										50
									
								
								src/Broker/TopicManager.php
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,50 @@
 | 
			
		||||
<?php
 | 
			
		||||
 | 
			
		||||
namespace NoccyLabs\Mercureact\Broker;
 | 
			
		||||
 | 
			
		||||
use SplObjectStorage;
 | 
			
		||||
 | 
			
		||||
class TopicManager
 | 
			
		||||
{
 | 
			
		||||
    /** @var array<string,Topic> */
 | 
			
		||||
    private array $topics = [];
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
    public function getTopic(string $topic): Topic
 | 
			
		||||
    {
 | 
			
		||||
        if (!isset($this->topics[$topic])) {
 | 
			
		||||
            $this->topics[$topic] = new Topic($topic);
 | 
			
		||||
        }
 | 
			
		||||
        return $this->topics[$topic];
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    public function publish(Message $message): void
 | 
			
		||||
    {
 | 
			
		||||
        foreach ($message->topic as $topic) {
 | 
			
		||||
            $this->getTopic($topic)->publish($message);
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    public function getTopicCount(): int
 | 
			
		||||
    {
 | 
			
		||||
        return count($this->topics);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    public function getSubscriberCount(): int
 | 
			
		||||
    {
 | 
			
		||||
        return array_sum(array_map(fn($t) => $t->getSubscriberCount(), $this->topics));
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    public function garbageCollect(): void
 | 
			
		||||
    {
 | 
			
		||||
        $this->topics = array_filter(
 | 
			
		||||
            $this->topics,
 | 
			
		||||
            function (Topic $topic) {
 | 
			
		||||
                $topic->garbageCollect();
 | 
			
		||||
                return ($topic->getHistorySize() > 0 && $topic->getSubscriberCount() > 0) || ($topic->getAge() < 60);
 | 
			
		||||
            }
 | 
			
		||||
        );
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
							
								
								
									
										38
									
								
								src/Configuration.php
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										38
									
								
								src/Configuration.php
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,38 @@
 | 
			
		||||
<?php
 | 
			
		||||
 | 
			
		||||
namespace NoccyLabs\Mercureact;
 | 
			
		||||
 | 
			
		||||
class Configuration
 | 
			
		||||
{
 | 
			
		||||
    private ?string $publicUrl = null;
 | 
			
		||||
 | 
			
		||||
    private ?string $jwtSecret = null;
 | 
			
		||||
 | 
			
		||||
    public static function createDefault(): Configuration
 | 
			
		||||
    {
 | 
			
		||||
        return new Configuration();
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    public function setPublicUrl(string $publicUrl): self
 | 
			
		||||
    {
 | 
			
		||||
        $this->publicUrl = $publicUrl;
 | 
			
		||||
        return $this;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    public function getPublicUrl(): ?string
 | 
			
		||||
    {
 | 
			
		||||
        return $this->publicUrl;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    public function setJwtSecret(string $secret): self
 | 
			
		||||
    {
 | 
			
		||||
        $this->jwtSecret = $secret;
 | 
			
		||||
        return $this;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    public function getJwtSecret(): ?string
 | 
			
		||||
    {
 | 
			
		||||
        return $this->jwtSecret;
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
							
								
								
									
										48
									
								
								src/Daemon.php
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										48
									
								
								src/Daemon.php
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,48 @@
 | 
			
		||||
<?php
 | 
			
		||||
 | 
			
		||||
namespace NoccyLabs\Mercureact;
 | 
			
		||||
 | 
			
		||||
use NoccyLabs\Mercureact\Http\Server;
 | 
			
		||||
use React\EventLoop\Loop;
 | 
			
		||||
use React\EventLoop\LoopInterface;
 | 
			
		||||
use React\Socket\SocketServer;
 | 
			
		||||
 | 
			
		||||
class Daemon
 | 
			
		||||
{
 | 
			
		||||
    private Configuration $config;
 | 
			
		||||
 | 
			
		||||
    private LoopInterface $loop;
 | 
			
		||||
 | 
			
		||||
    private Server $server;
 | 
			
		||||
 | 
			
		||||
    public function __construct(Configuration $config, ?LoopInterface $loop=null)
 | 
			
		||||
    {
 | 
			
		||||
        $this->config = $config;
 | 
			
		||||
        $this->loop = $loop??Loop::get();
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    public function start(): void
 | 
			
		||||
    {
 | 
			
		||||
        $this->server = new Server($this->config, []);
 | 
			
		||||
 | 
			
		||||
        $socket = new SocketServer("tcp://0.0.0.0:9000");
 | 
			
		||||
        $this->server->listen($socket);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    public function stop(): void
 | 
			
		||||
    {
 | 
			
		||||
        $this->loop->stop();
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    public function abort(int $exitcode, ?string $message=null): void
 | 
			
		||||
    {
 | 
			
		||||
        $bt = debug_backtrace(limit:2);
 | 
			
		||||
        $bt = end($bt);
 | 
			
		||||
        fprintf(STDERR, "Abort: %s\n  at %s:%d\n  in %s\n", $message??"Unknown reason", $bt['file'], $bt['line'], ($bt['class']??null).($bt['type']??null).$bt['function']);
 | 
			
		||||
        $this->loop->stop();
 | 
			
		||||
        register_shutdown_function(function () use ($exitcode) {
 | 
			
		||||
            exit($exitcode);
 | 
			
		||||
        });
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
							
								
								
									
										12
									
								
								src/Http/Exception/RequestException.php
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										12
									
								
								src/Http/Exception/RequestException.php
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,12 @@
 | 
			
		||||
<?php
 | 
			
		||||
 | 
			
		||||
namespace NoccyLabs\Mercureact\Http\Exeption;
 | 
			
		||||
 | 
			
		||||
use Exception;
 | 
			
		||||
 | 
			
		||||
class RequestException extends Exception
 | 
			
		||||
{
 | 
			
		||||
    const ERR_BAD_REQUEST = 51001;
 | 
			
		||||
    const ERR_BAD_REQUEST_DATA = 51002;
 | 
			
		||||
    const ERR_INVALID_REQUEST_DATA = 51003;
 | 
			
		||||
}
 | 
			
		||||
							
								
								
									
										10
									
								
								src/Http/Exception/SecurityException.php
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										10
									
								
								src/Http/Exception/SecurityException.php
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,10 @@
 | 
			
		||||
<?php
 | 
			
		||||
 | 
			
		||||
namespace NoccyLabs\Mercureact\Http\Exeption;
 | 
			
		||||
 | 
			
		||||
use Exception;
 | 
			
		||||
 | 
			
		||||
class SecurityException extends Exception
 | 
			
		||||
{
 | 
			
		||||
    const ERR_ACCESS_DENIED = 50001;
 | 
			
		||||
}
 | 
			
		||||
							
								
								
									
										463
									
								
								src/Http/Server.php
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										463
									
								
								src/Http/Server.php
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,463 @@
 | 
			
		||||
<?php
 | 
			
		||||
 | 
			
		||||
namespace NoccyLabs\Mercureact\Http;
 | 
			
		||||
 | 
			
		||||
use NoccyLabs\Mercureact\Broker\Message;
 | 
			
		||||
use NoccyLabs\Mercureact\Broker\TopicManager;
 | 
			
		||||
use NoccyLabs\Mercureact\Configuration;
 | 
			
		||||
use NoccyLabs\Mercureact\Http\Exeption\RequestException;
 | 
			
		||||
use NoccyLabs\Mercureact\Http\Exeption\SecurityException;
 | 
			
		||||
use NoccyLabs\React\WebSocket\WebSocketConnection;
 | 
			
		||||
use NoccyLabs\React\WebSocket\WebSocketMiddleware;
 | 
			
		||||
use NoccyLabs\SimpleJWT\JWTToken;
 | 
			
		||||
use NoccyLabs\SimpleJWT\Key\JWTPlaintextKey;
 | 
			
		||||
use Psr\Http\Message\ResponseInterface;
 | 
			
		||||
use Psr\Http\Message\ServerRequestInterface;
 | 
			
		||||
use React\EventLoop\Loop;
 | 
			
		||||
use React\EventLoop\LoopInterface;
 | 
			
		||||
use React\Http\HttpServer;
 | 
			
		||||
use React\Http\Message\Response;
 | 
			
		||||
use React\Promise\Promise;
 | 
			
		||||
use React\Promise\PromiseInterface;
 | 
			
		||||
use React\Socket\ServerInterface;
 | 
			
		||||
use React\Stream\ThroughStream;
 | 
			
		||||
use SplObjectStorage;
 | 
			
		||||
use Symfony\Component\Uid\Uuid;
 | 
			
		||||
use Throwable;
 | 
			
		||||
 | 
			
		||||
class Server
 | 
			
		||||
{
 | 
			
		||||
    private Configuration $config;
 | 
			
		||||
 | 
			
		||||
    private LoopInterface $loop;
 | 
			
		||||
 | 
			
		||||
    private HttpServer $server;
 | 
			
		||||
 | 
			
		||||
    private WebSocketMiddleware $webSocket;
 | 
			
		||||
 | 
			
		||||
    private SplObjectStorage $webSocketClients;
 | 
			
		||||
 | 
			
		||||
    private SplObjectStorage $eventClients;
 | 
			
		||||
 | 
			
		||||
    private TopicManager $topicManager;
 | 
			
		||||
 | 
			
		||||
    public static string $indexPage;
 | 
			
		||||
 | 
			
		||||
    /**
 | 
			
		||||
     * 
 | 
			
		||||
     * 
 | 
			
		||||
     */
 | 
			
		||||
    public function __construct(Configuration $config, array $options=[], ?LoopInterface $loop=null)
 | 
			
		||||
    {
 | 
			
		||||
        $this->loop = $loop??Loop::get();
 | 
			
		||||
 | 
			
		||||
        $this->config = $config;
 | 
			
		||||
 | 
			
		||||
        $this->topicManager = new TopicManager();
 | 
			
		||||
 | 
			
		||||
        $this->server = $this->createHttpServer($options);
 | 
			
		||||
        
 | 
			
		||||
        $this->webSocket = new WebSocketMiddleware();
 | 
			
		||||
 | 
			
		||||
        $this->eventClients = new SplObjectStorage();
 | 
			
		||||
        $this->webSocketClients = new SplObjectStorage();
 | 
			
		||||
        $this->webSocket->on(WebSocketMiddleware::EVENT_CONNECTION, $this->onWebSocketConnection(...));
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /**
 | 
			
		||||
     * 
 | 
			
		||||
     * 
 | 
			
		||||
     * @return void
 | 
			
		||||
     */
 | 
			
		||||
    public function listen(ServerInterface $socket): void
 | 
			
		||||
    {
 | 
			
		||||
        $this->server->listen($socket);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /**
 | 
			
		||||
     * 
 | 
			
		||||
     * @return HttpServer
 | 
			
		||||
     */
 | 
			
		||||
    private function createHttpServer(array $options): HttpServer
 | 
			
		||||
    {
 | 
			
		||||
        return new HttpServer(
 | 
			
		||||
            $this->rejectionWrappingMiddleware(...),
 | 
			
		||||
            $this->checkRequestSecurityMiddleware(...),
 | 
			
		||||
            $this->handleWebSocketRequest(...),
 | 
			
		||||
            $this->handleMercureRequest(...),
 | 
			
		||||
            $this->handleApiRequest(...),
 | 
			
		||||
            $this->handleNotFound(...)
 | 
			
		||||
        );
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /**
 | 
			
		||||
     * 
 | 
			
		||||
     * 
 | 
			
		||||
     * @param ServerRequestInterface $request
 | 
			
		||||
     * @return PromiseInterface
 | 
			
		||||
     */
 | 
			
		||||
    private function handleNotFound(ServerRequestInterface $request): PromiseInterface
 | 
			
		||||
    {
 | 
			
		||||
        return new Promise(
 | 
			
		||||
            function ($resolve) { 
 | 
			
		||||
                $resolve(Response::plaintext("Not found")->withStatus(404));
 | 
			
		||||
            }
 | 
			
		||||
        );
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /**
 | 
			
		||||
     * 
 | 
			
		||||
     * 
 | 
			
		||||
     * @param ServerRequestInterface $request
 | 
			
		||||
     * @param callable $next
 | 
			
		||||
     * @return PromiseInterface
 | 
			
		||||
     */
 | 
			
		||||
    private function rejectionWrappingMiddleware(ServerRequestInterface $request, callable $next): PromiseInterface
 | 
			
		||||
    {
 | 
			
		||||
        $promise = new Promise(
 | 
			
		||||
            function (callable $resolve) use ($request, $next) {
 | 
			
		||||
                $resolve($next($request));
 | 
			
		||||
            }
 | 
			
		||||
        );
 | 
			
		||||
        return $promise->then(
 | 
			
		||||
            function ($response) {
 | 
			
		||||
                if ($response instanceof ResponseInterface) {
 | 
			
		||||
                    return $response;
 | 
			
		||||
                }
 | 
			
		||||
                if (is_array($response)) {
 | 
			
		||||
                    return Response::json($response);
 | 
			
		||||
                }
 | 
			
		||||
                if (is_string($response)) {
 | 
			
		||||
                    return Response::plaintext($response);
 | 
			
		||||
                }
 | 
			
		||||
                return Response::plaintext((string)$response);
 | 
			
		||||
            },
 | 
			
		||||
            function (Throwable $t) {
 | 
			
		||||
                if ($t instanceof SecurityException) {
 | 
			
		||||
                    return Response::plaintext("Access Denied")->withStatus(Response::STATUS_UNAUTHORIZED);
 | 
			
		||||
                }
 | 
			
		||||
                return Response::plaintext("500: Internal Server Error (".$t->getMessage().")\n")->withStatus(500);
 | 
			
		||||
            }
 | 
			
		||||
        )->then(
 | 
			
		||||
            function ($response) use ($request) {
 | 
			
		||||
                assert("\$response instanceof ResponseInterface");
 | 
			
		||||
                $host = ($request->getServerParams()['SERVER_ADDR']??"");
 | 
			
		||||
                        //. ":" . ($request->getServerParams()['SERVER_PORT']??"80");
 | 
			
		||||
                fprintf(STDOUT, "%s %3d %s %s %d\n", 
 | 
			
		||||
                    $request->getServerParams()['REMOTE_ADDR'], 
 | 
			
		||||
                    $response->getStatusCode(), 
 | 
			
		||||
                    $request->getMethod(), 
 | 
			
		||||
                    $request->getUri()->getPath(), 
 | 
			
		||||
                    strlen($response->getBody())
 | 
			
		||||
                );
 | 
			
		||||
                return $response
 | 
			
		||||
                    ->withAddedHeader('Link', '<https://'.$host.'/.well-known/mercure>; rel="mercure"')
 | 
			
		||||
                    ->withAddedHeader('Link', '<wss://'.$host.'/.well-known/mercure>; rel="mercure+ws"')
 | 
			
		||||
                    ->withHeader('Access-Control-Allow-Origin', '*')
 | 
			
		||||
                    ->withHeader('Content-Security-Policy', "default-src * 'self' http: 'unsafe-eval' 'unsafe-inline'; connect-src * 'self'")
 | 
			
		||||
                    ->withHeader('Cache-Control', 'must-revalidate')
 | 
			
		||||
                    ->withHeader('Server', 'Mercureact/0.1.0');
 | 
			
		||||
            }
 | 
			
		||||
        );
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /**
 | 
			
		||||
     * 
 | 
			
		||||
     * 
 | 
			
		||||
     * @param ServerRequestInterface $request
 | 
			
		||||
     * @param callable $next
 | 
			
		||||
     * @return PromiseInterface
 | 
			
		||||
     */
 | 
			
		||||
    private function checkRequestSecurityMiddleware(ServerRequestInterface $request, callable $next): PromiseInterface
 | 
			
		||||
    {
 | 
			
		||||
        return new Promise(
 | 
			
		||||
            function (callable $resolve, callable $reject) use ($request, $next) {
 | 
			
		||||
                // Check JWT in authorization header or authorization query param
 | 
			
		||||
                $request = $this->checkAuthorization($request);
 | 
			
		||||
 | 
			
		||||
                $resolve($next($request));
 | 
			
		||||
            }
 | 
			
		||||
        );
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /**
 | 
			
		||||
     * 
 | 
			
		||||
     * 
 | 
			
		||||
     * @param ServerRequestInterface $request
 | 
			
		||||
     * @return ServerRequestInterface
 | 
			
		||||
     */
 | 
			
		||||
    private function checkAuthorization(ServerRequestInterface $request): ServerRequestInterface
 | 
			
		||||
    {
 | 
			
		||||
        $authorization = $request->getHeaderLine('authorization');
 | 
			
		||||
        if (str_starts_with(strtolower($authorization), "bearer ")) {
 | 
			
		||||
            $jwt = substr($authorization, strpos($authorization, " ")+1);
 | 
			
		||||
            $key = new JWTPlaintextKey($this->config->getJwtSecret());
 | 
			
		||||
            $tok = new JWTToken($key, $jwt);
 | 
			
		||||
            if (!$tok->isValid()) {
 | 
			
		||||
                throw new SecurityException(message:"Invalid token", code:SecurityException::ERR_ACCESS_DENIED);
 | 
			
		||||
            }
 | 
			
		||||
            $mercureClaims = $tok->claims->get('mercure');
 | 
			
		||||
            return $request
 | 
			
		||||
                ->withAttribute('authorization', $tok);
 | 
			
		||||
        } else {
 | 
			
		||||
            return $request
 | 
			
		||||
                ->withAttribute('authorization', null);
 | 
			
		||||
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /**
 | 
			
		||||
     * 
 | 
			
		||||
     * 
 | 
			
		||||
     * @param ServerRequestInterface $request
 | 
			
		||||
     * @param callable $next
 | 
			
		||||
     * @return PromiseInterface
 | 
			
		||||
     */
 | 
			
		||||
    private function handleMercureRequest(ServerRequestInterface $request, callable $next): PromiseInterface
 | 
			
		||||
    {
 | 
			
		||||
        return new Promise(
 | 
			
		||||
            function (callable $resolve, callable $reject) use ($next, $request) {
 | 
			
		||||
                if ($request->getUri()->getPath() == "/.well-known/mercure") {
 | 
			
		||||
                    if ($request->getMethod() == 'POST') {
 | 
			
		||||
                        $resolve($this->handleMercurePublish($request));
 | 
			
		||||
                        return;
 | 
			
		||||
                    }
 | 
			
		||||
                    $resolve($this->handleMercureClient($request));
 | 
			
		||||
                } else {
 | 
			
		||||
                    $resolve($next($request));
 | 
			
		||||
                }
 | 
			
		||||
            }
 | 
			
		||||
        );
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /**
 | 
			
		||||
     * 
 | 
			
		||||
     * 
 | 
			
		||||
     * @param ServerRequestInterface $request
 | 
			
		||||
     * @return ResponseInterface
 | 
			
		||||
     */
 | 
			
		||||
    private function handleMercureClient(ServerRequestInterface $request): ResponseInterface
 | 
			
		||||
    {
 | 
			
		||||
        $tok = $request->getAttribute('authorization');
 | 
			
		||||
        if ($tok instanceof JWTToken) {
 | 
			
		||||
            $claims = $tok->claims->getAll();
 | 
			
		||||
            if (isset($claims['mercure']['subscribe'])) {
 | 
			
		||||
                $subscribeClaims = $claims['mercure']['subscribe'];
 | 
			
		||||
                // TODO check topic against subscribeClaims
 | 
			
		||||
            }            
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        $responseStream = new ThroughStream();
 | 
			
		||||
 | 
			
		||||
        $response = new Response(
 | 
			
		||||
            body: $responseStream
 | 
			
		||||
        );
 | 
			
		||||
 | 
			
		||||
        $this->eventClients->attach($responseStream, $request);
 | 
			
		||||
        $responseStream->on('close', function () use ($responseStream) {
 | 
			
		||||
            $this->eventClients->detach($responseStream);;
 | 
			
		||||
        });
 | 
			
		||||
 | 
			
		||||
        return $response
 | 
			
		||||
            ->withHeader("Cache-Control", "no-store")
 | 
			
		||||
            ->withHeader("Content-Type", "text/event-stream");
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /**
 | 
			
		||||
     * 
 | 
			
		||||
     * 
 | 
			
		||||
     * @param ServerRequestInterface $request
 | 
			
		||||
     * @return ResponseInterface
 | 
			
		||||
     */
 | 
			
		||||
    private function handleMercurePublish(ServerRequestInterface $request): ResponseInterface
 | 
			
		||||
    {
 | 
			
		||||
        if ($request->getHeaderLine('content-type') !== 'application/x-www-form-urlencoded') {
 | 
			
		||||
            throw new \Exception("Invalid request");
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        // Grab the JWT token from the requests authorization attribute
 | 
			
		||||
        $tok = $request->getAttribute('authorization');
 | 
			
		||||
        if ($tok instanceof JWTToken) {
 | 
			
		||||
            $claims = $tok->claims->getAll();
 | 
			
		||||
            if (isset($claims['mercure']['publish'])) {
 | 
			
		||||
                $publishClaims = $claims['mercure']['publish'];
 | 
			
		||||
                // TODO check topic against publishClaims
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        // Parse out the urlencoded body. Pretty sure there is a better way to do this?
 | 
			
		||||
        $body = explode("&", (string)$request->getBody());
 | 
			
		||||
        $data = [];
 | 
			
		||||
        foreach ($body as $param) {
 | 
			
		||||
            if (!str_contains($param, "=")) throw new RequestException("Invalid request data", RequestException::ERR_INVALID_REQUEST_DATA);
 | 
			
		||||
            [ $name, $value ] = array_map('urldecode', explode("=", $param, 2));
 | 
			
		||||
                // FIXME support multiple topics?
 | 
			
		||||
            if (in_array($name, [ 'topic' ])) {
 | 
			
		||||
                if (!isset($data[$name]))
 | 
			
		||||
                    $data[$name] = [];
 | 
			
		||||
                $data[$name][] = $value;
 | 
			
		||||
            } else {
 | 
			
		||||
                $data[$name] = $value;
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        // Put an id in there if none already
 | 
			
		||||
        // TODO add a configurable for this
 | 
			
		||||
        if (!isset($data['id'])) {
 | 
			
		||||
            $data['id'] = (string)Uuid::v7();
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        $message = Message::fromData($data);
 | 
			
		||||
 | 
			
		||||
        $this->loop->futureTick(function () use ($message) {
 | 
			
		||||
            $this->publishMercureMessage($message);
 | 
			
		||||
        });
 | 
			
		||||
 | 
			
		||||
        return Response::plaintext("urn:uuid:".$message->id."\n");
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /**
 | 
			
		||||
     * 
 | 
			
		||||
     * 
 | 
			
		||||
     * @param Message $message
 | 
			
		||||
     * @return void
 | 
			
		||||
     */
 | 
			
		||||
    private function publishMercureMessage(Message $message): void
 | 
			
		||||
    {
 | 
			
		||||
        foreach ($this->webSocketClients as $webSocket) {
 | 
			
		||||
            $webSocket->write(json_encode([
 | 
			
		||||
                'type' => $message->type,
 | 
			
		||||
                //'topic' => $data['topic'],
 | 
			
		||||
                'data' => (@json_decode($message->data))??$message->data
 | 
			
		||||
            ]));
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        $sseMessage = "";
 | 
			
		||||
        if ($message->type) {
 | 
			
		||||
            $sseMessage .= "event: ".$message->type."\n";
 | 
			
		||||
        }
 | 
			
		||||
        $sseMessage .= "data: ".$message->data."\n\n";
 | 
			
		||||
 | 
			
		||||
        foreach ($this->eventClients as $client) {
 | 
			
		||||
            $client->write($sseMessage);
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /**
 | 
			
		||||
     * 
 | 
			
		||||
     * 
 | 
			
		||||
     * @param ServerRequestInterface $request
 | 
			
		||||
     * @param callable $next
 | 
			
		||||
     * @return PromiseInterface
 | 
			
		||||
     */
 | 
			
		||||
    private function handleWebSocketRequest(ServerRequestInterface $request, callable $next): PromiseInterface
 | 
			
		||||
    {
 | 
			
		||||
        return new Promise(
 | 
			
		||||
            function (callable $resolve, callable $reject) use ($next, $request) {
 | 
			
		||||
                if ($request->getUri()->getPath() == "/.well-known/mercure")
 | 
			
		||||
                    $resolve(call_user_func($this->webSocket, $request, $next));
 | 
			
		||||
                else 
 | 
			
		||||
                    $resolve($next($request));
 | 
			
		||||
            }
 | 
			
		||||
        );
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /**
 | 
			
		||||
     * 
 | 
			
		||||
     * 
 | 
			
		||||
     */
 | 
			
		||||
    private function onWebSocketConnection(WebSocketConnection $connection)
 | 
			
		||||
    {
 | 
			
		||||
        $this->webSocketClients->attach($connection);
 | 
			
		||||
 | 
			
		||||
        $connection->on('close', function () use ($connection) {
 | 
			
		||||
            $this->webSocketClients->detach($connection);
 | 
			
		||||
        });
 | 
			
		||||
 | 
			
		||||
        $request = $connection->getServerRequest();
 | 
			
		||||
        $topic = $request->getQueryParams()['topic'][0]??'';
 | 
			
		||||
        $connection->setGroup($topic);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /**
 | 
			
		||||
     * 
 | 
			
		||||
     * 
 | 
			
		||||
     * @param ServerRequestInterface $request
 | 
			
		||||
     * @param callable $next
 | 
			
		||||
     * @return PromiseInterface
 | 
			
		||||
     */
 | 
			
		||||
    private function handleApiRequest(ServerRequestInterface $request, callable $next): PromiseInterface
 | 
			
		||||
    {
 | 
			
		||||
        return new Promise(
 | 
			
		||||
            function (callable $resolve, callable $reject) use ($next, $request) {
 | 
			
		||||
 | 
			
		||||
                $path = $request->getUri()->getPath();
 | 
			
		||||
 | 
			
		||||
                if ($path === "/index.html") {
 | 
			
		||||
                    $resolve(Response::html(self::$indexPage));
 | 
			
		||||
                }
 | 
			
		||||
 | 
			
		||||
                switch (true) {
 | 
			
		||||
                    case preg_match('<^/.well-known/mercure/subscriptions(/.+?)$>', $path, $m):
 | 
			
		||||
                        $query = explode("/", trim($m[1]??null, "/"));
 | 
			
		||||
                        $topic = array_shift($query);
 | 
			
		||||
                        $subscription = array_shift($query);
 | 
			
		||||
                        $resolve($this->apiGetSubscriptions($topic, $subscription));
 | 
			
		||||
                        return;
 | 
			
		||||
                
 | 
			
		||||
                    case preg_match('<^/.well-known/mercureact/status$>', $path):
 | 
			
		||||
                        $resolve([
 | 
			
		||||
                            'server' => 'Mercureact/1.0',
 | 
			
		||||
                            'topics' => $this->topicManager->getTopicCount(),
 | 
			
		||||
                            'subscriptions' => $this->topicManager->getSubscriberCount(),
 | 
			
		||||
                            'memoryPeak' => memory_get_peak_usage(true),
 | 
			
		||||
                            'memoryUsage' => memory_get_usage(true)
 | 
			
		||||
                        ]);
 | 
			
		||||
                        return;
 | 
			
		||||
    
 | 
			
		||||
                    case preg_match('<^/.well-known/mercureact/status$>', $path):
 | 
			
		||||
                        $resolve([ 'version' => '1.0' ]);
 | 
			
		||||
                        return;
 | 
			
		||||
                }
 | 
			
		||||
 | 
			
		||||
                $resolve($next($request));
 | 
			
		||||
            }
 | 
			
		||||
        );
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /**
 | 
			
		||||
     * 
 | 
			
		||||
     * 
 | 
			
		||||
     * @return ResponseInterface
 | 
			
		||||
     */
 | 
			
		||||
    private function apiGetSubscriptions(string|null $topic, string|null $subscription): ResponseInterface
 | 
			
		||||
    {
 | 
			
		||||
        $lastEventId = "urn:uuid:5e94c686-2c0b-4f9b-958c-92ccc3bbb4eb";
 | 
			
		||||
 | 
			
		||||
        $data = [
 | 
			
		||||
            "@context" => "https://mercure.rocks/",
 | 
			
		||||
            "id" => "/.well-known/mercure/subscriptions",
 | 
			
		||||
            "type" => "Subscriptions",
 | 
			
		||||
            "lastEventID" => $lastEventId,
 | 
			
		||||
            "subscriptions" => []
 | 
			
		||||
        ];
 | 
			
		||||
 | 
			
		||||
        return Response::json($data)
 | 
			
		||||
            ->withHeader('Content-Type', 'application/ld+json')
 | 
			
		||||
            ->withHeader('ETag', $lastEventId);
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
Server::$indexPage = <<<ENDHTML
 | 
			
		||||
<html>
 | 
			
		||||
<head>
 | 
			
		||||
<meta http-equiv="Content-Security-Policy" content="default-src 'self' http: 'unsafe-eval' 'unsafe-inline'; style-src 'self';">
 | 
			
		||||
</head>
 | 
			
		||||
<body>
 | 
			
		||||
<script type="text/javascript">
 | 
			
		||||
  const events = new EventSource("http://127.0.0.1:9000/.well-known/mercure");
 | 
			
		||||
  events.onmessage = msg => console.log(msg);
 | 
			
		||||
</script>
 | 
			
		||||
</body>
 | 
			
		||||
</html>
 | 
			
		||||
ENDHTML;
 | 
			
		||||
		Reference in New Issue
	
	Block a user