mercureact/src/Broker/TopicManager.php
Christopher Vagnetoft d310060309 Bugfix topicmanager
* Unsubscribing wouldn't always remove the subscriber.
2024-03-11 01:50:34 +01:00

79 lines
1.9 KiB
PHP

<?php
namespace NoccyLabs\Mercureact\Broker;
use SplObjectStorage;
class TopicManager
{
/** @var array<string,Topic> */
private array $topics = [];
private SplObjectStorage $subscribers;
public function __construct()
{
$this->subscribers = new SplObjectStorage();
}
public function getTopic(string $topic): Topic
{
if (!isset($this->topics[$topic])) {
$this->topics[$topic] = new Topic($topic);
}
return $this->topics[$topic];
}
public function publish(Message $message): void
{
foreach ($message->topic as $topic) {
$this->getTopic($topic)->publish($message);
}
}
public function subscribe(SubscriberInterface $subscriber, array $topics): void
{
$this->subscribers->attach($subscriber);
foreach ($topics as $topic) {
$this->getTopic($topic)->addSubscriber($subscriber);
}
}
public function unsubscribe(SubscriberInterface $subscriber, ?array $topics=null): void
{
$this->subscribers->detach($subscriber);
if (!$topics) {
foreach ($this->topics as $topic) {
$topic->removeSubscriber($subscriber);
}
return;
}
foreach ($topics as $topic) {
$this->getTopic($topic)->removeSubscriber($subscriber);
}
}
public function getTopicCount(): int
{
return count($this->topics);
}
public function getSubscriberCount(): int
{
return count($this->subscribers);
}
public function garbageCollect(): void
{
$this->topics = array_filter(
$this->topics,
function (Topic $topic) {
$topic->garbageCollect();
return ($topic->getHistorySize() > 0 || $topic->getSubscriberCount() > 0) || ($topic->getAge() < 60);
}
);
}
}