2024-03-01 14:34:14 +01:00
|
|
|
<?php
|
|
|
|
|
|
|
|
namespace NoccyLabs\React\CommandBus;
|
|
|
|
use Evenement\EventEmitterTrait;
|
|
|
|
use React\Promise\PromiseInterface;
|
|
|
|
use React\Promise\Promise;
|
|
|
|
use React\Promise\Deferred;
|
|
|
|
use React\Socket\ServerInterface;
|
|
|
|
use React\Stream\DuplexStreamInterface;
|
|
|
|
use SplObjectStorage;
|
|
|
|
|
|
|
|
class CommandBus implements CommandBusInterface
|
|
|
|
{
|
|
|
|
use EventEmitterTrait;
|
|
|
|
|
|
|
|
private CommandRegistry $commandRegistry;
|
|
|
|
|
|
|
|
private SplObjectStorage $connections;
|
|
|
|
|
|
|
|
private SplObjectStorage $servers;
|
|
|
|
|
|
|
|
public function __construct(CommandRegistry $commandRegistry)
|
|
|
|
{
|
|
|
|
$this->commandRegistry = $commandRegistry;
|
|
|
|
$this->connections = new SplObjectStorage();
|
|
|
|
$this->servers = new SplObjectStorage();
|
|
|
|
}
|
|
|
|
|
|
|
|
public function addServer(ServerInterface $server): void
|
|
|
|
{
|
|
|
|
$this->servers->attach($server);
|
|
|
|
$server->on('connection', $this->onServerConnection(...));
|
|
|
|
}
|
|
|
|
|
|
|
|
public function removeServer(ServerInterface $server): void
|
|
|
|
{
|
|
|
|
$server->close();
|
|
|
|
$server->removeListener('connection', $this->onServerConnection(...));
|
|
|
|
$this->servers->detach($server);
|
|
|
|
}
|
|
|
|
|
|
|
|
public function close(): void
|
|
|
|
{
|
|
|
|
foreach ($this->servers as $server) {
|
|
|
|
$this->removeServer($server);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
private function onServerConnection(DuplexStreamInterface $client): void
|
|
|
|
{
|
|
|
|
$this->connections->attach($client);
|
|
|
|
$client->on('data', function ($data) use ($client) {
|
|
|
|
try {
|
|
|
|
$message = Message::fromString($data);
|
|
|
|
$this->onClientMessage($client, $message);
|
|
|
|
} catch (MessageException $e) {
|
|
|
|
$client->end('{"msg":"error","data":{"error":"Bad message format"}}');
|
|
|
|
}
|
|
|
|
});
|
|
|
|
$client->on('close', function () use ($client) {
|
|
|
|
$this->connections->detach($client);
|
|
|
|
});
|
|
|
|
}
|
|
|
|
|
|
|
|
private function onClientMessage(DuplexStreamInterface $client, Message $message): void
|
|
|
|
{
|
|
|
|
switch ($message->getType()) {
|
|
|
|
case Message::MSGTYPE_EXECUTE: // Client call to execute command
|
|
|
|
$data = $message->getData();
|
|
|
|
$context = new Context($data['command'],$data['context']);
|
|
|
|
$this->executeContext($context)->then(
|
|
|
|
function ($result) use ($message, $client) {
|
|
|
|
$client->write($message->asResult($result)->toJson()."\n");
|
|
|
|
}
|
|
|
|
);
|
|
|
|
break;
|
|
|
|
case Message::MSGTYPE_RESULT: // Result from execution on client
|
|
|
|
// TODO implement me
|
|
|
|
break;
|
|
|
|
case Message::MSGTYPE_REGISTRY: // command registry actions
|
|
|
|
// TODO implement me
|
|
|
|
break;
|
|
|
|
default:
|
|
|
|
$client->end('{"msg":"error","data":{"error":"Unexpected message type"}}');
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* {@inheritDoc}
|
|
|
|
*/
|
|
|
|
public function execute(string $command, array $context): PromiseInterface
|
|
|
|
{
|
|
|
|
$context = new Context($command, $context);
|
|
|
|
return $this->executeContext($context);
|
|
|
|
}
|
|
|
|
|
|
|
|
private function executeContext(Context $context): PromiseInterface
|
|
|
|
{
|
|
|
|
$command = $this->commandRegistry->find($context->getCommandName());
|
|
|
|
if (!$command) return new Promise(function (callable $resolve) use ($context) {
|
|
|
|
throw new \RuntimeException("Unable to resolve command: ".$context->getCommandName());
|
|
|
|
});
|
|
|
|
|
|
|
|
return $command->call($context);
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* {@inheritDoc}
|
|
|
|
*/
|
|
|
|
public function notify(string $event, array $data): void
|
|
|
|
{
|
2024-03-01 15:20:54 +01:00
|
|
|
$this->emit(self::EVENT_NOTIFY, [ $event, $data ]);
|
|
|
|
|
|
|
|
$json = (new Message(Message::MSGTYPE_NOTIFY, [
|
|
|
|
'event' => $event,
|
|
|
|
'data' => $data
|
|
|
|
]))->toJson();
|
|
|
|
|
|
|
|
foreach ($this->connections as $client) {
|
|
|
|
$client->write($json."\n");
|
|
|
|
}
|
2024-03-01 14:34:14 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
|