mercureact/src/Broker/TopicManager.php

114 lines
3.3 KiB
PHP

<?php
namespace NoccyLabs\Mercureact\Broker;
use NoccyLabs\Mercureact\Broker\Subscriber\SubscriberInterface;
use Psr\Log\LoggerInterface;
use SplObjectStorage;
class TopicManager
{
/** @var array<string,Topic> */
private array $topics = [];
private ?string $lastEventId = null;
private SplObjectStorage $subscribers;
public function __construct(private LoggerInterface $logger)
{
$this->subscribers = new SplObjectStorage();
}
public function getTopic(string $topic): Topic
{
if (!isset($this->topics[$topic])) {
$this->logger->debug("Created topic: {$topic}");
$this->topics[$topic] = new Topic($topic);
}
return $this->topics[$topic];
}
public function publish(Message $message): void
{
$this->lastEventId = $message->id;
foreach ($message->topic as $topic) {
$this->logger->debug("Publish: {$message->id}".json_encode($message->topic,JSON_UNESCAPED_SLASHES));
$this->getTopic($topic)->publish($message);
}
}
public function subscribe(SubscriberInterface $subscriber, array $topics): void
{
$this->logger->debug("Subscribed: ".$subscriber->getId()." + ".json_encode($topics,JSON_UNESCAPED_SLASHES));
$this->subscribers->attach($subscriber);
foreach ($topics as $topic) {
$this->getTopic($topic)->addSubscriber($subscriber);
}
}
public function unsubscribe(SubscriberInterface $subscriber, ?array $topics=null): void
{
$this->logger->debug("Unsubscribed: ".$subscriber->getId()." - ".json_encode($topics,JSON_UNESCAPED_SLASHES));
if (!$topics) {
$this->subscribers->detach($subscriber);
foreach ($this->topics as $topic) {
$topic->removeSubscriber($subscriber);
}
return;
}
foreach ($topics as $topic) {
$this->getTopic($topic)->removeSubscriber($subscriber);
}
}
public function getSubscriptions(): array
{
$all = [];
foreach ($this->topics as $topic) {
$subs = $topic->getSubscribers();
foreach ($subs as $sub) {
$all[] = [
'id' => './well-known/mercure/subsciptions/'.urlencode($topic->getTopic())."/".urlencode($sub->getId()),
'type' => "Subscription",
'topic' => $topic->getTopic(),
'subscriber' => $sub->getId(),
'active' => true,
'payload' => $sub->getPayload(),
];
}
}
return $all;
}
public function getLastEventId(): ?string
{
return ($this->lastEventId !== null)?("urn:uuid:".$this->lastEventId):null;
}
public function getTopicCount(): int
{
return count($this->topics);
}
public function getSubscriberCount(): int
{
return count($this->subscribers);
}
public function garbageCollect(): void
{
$this->topics = array_filter(
$this->topics,
function (Topic $topic) {
$topic->garbageCollect();
return ($topic->getHistorySize() > 0 || $topic->getSubscriberCount() > 0) || ($topic->getAge() < 60);
}
);
}
}