2024-03-01 14:34:14 +01:00
|
|
|
<?php
|
|
|
|
|
|
|
|
namespace NoccyLabs\React\CommandBus;
|
|
|
|
use Evenement\EventEmitterTrait;
|
|
|
|
use React\Promise\PromiseInterface;
|
|
|
|
use React\Promise\Promise;
|
|
|
|
use React\Promise\Deferred;
|
|
|
|
use React\Socket\ServerInterface;
|
|
|
|
use React\Stream\DuplexStreamInterface;
|
|
|
|
use SplObjectStorage;
|
|
|
|
|
|
|
|
class CommandBus implements CommandBusInterface
|
|
|
|
{
|
|
|
|
use EventEmitterTrait;
|
|
|
|
|
2024-12-15 13:24:57 +01:00
|
|
|
/** @var array<CommandResolverInterface> */
|
|
|
|
private $resolvers = [];
|
2024-03-01 14:34:14 +01:00
|
|
|
|
|
|
|
private SplObjectStorage $connections;
|
|
|
|
|
|
|
|
private SplObjectStorage $servers;
|
|
|
|
|
2024-12-15 13:24:57 +01:00
|
|
|
public function __construct(?CommandResolverInterface $commands=null)
|
2024-03-01 14:34:14 +01:00
|
|
|
{
|
2024-12-15 13:24:57 +01:00
|
|
|
if ($commands) {
|
|
|
|
$this->resolvers[] = $commands;
|
|
|
|
}
|
2024-03-01 14:34:14 +01:00
|
|
|
$this->connections = new SplObjectStorage();
|
|
|
|
$this->servers = new SplObjectStorage();
|
|
|
|
}
|
|
|
|
|
2024-12-15 13:24:57 +01:00
|
|
|
/**
|
|
|
|
* Get the command registry
|
|
|
|
*
|
|
|
|
* @deprecated Use getCommandNames() instead
|
|
|
|
* @return null
|
|
|
|
*/
|
|
|
|
public function getRegistry(): ?CommandRegistry
|
|
|
|
{
|
|
|
|
return null; // $this->commandRegistry;
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Add a CommandResolver
|
|
|
|
*
|
|
|
|
* @param CommandResolverInterface $resolver
|
|
|
|
* @return void
|
|
|
|
*/
|
|
|
|
public function addResolver(CommandResolverInterface $resolver): void
|
|
|
|
{
|
|
|
|
$this->resolvers[] = $resolver;
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Get the names of defined commands
|
|
|
|
*
|
|
|
|
* @return array
|
|
|
|
*/
|
|
|
|
public function getCommandNames(): array
|
|
|
|
{
|
|
|
|
$commands = [];
|
|
|
|
foreach ($this->resolvers as $resolver) {
|
|
|
|
$commands = array_merge($commands, $resolver->getNames());
|
|
|
|
}
|
|
|
|
sort($commands);
|
|
|
|
return array_unique($commands);
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Find a command by searching through the resolvers
|
|
|
|
*
|
|
|
|
* @param string $command
|
|
|
|
* @return Command|null
|
|
|
|
*/
|
|
|
|
public function findCommand(string $command): ?Command
|
2024-07-28 16:02:31 +02:00
|
|
|
{
|
2024-12-15 13:24:57 +01:00
|
|
|
foreach ($this->resolvers as $resolver) {
|
|
|
|
if ($found = $resolver->find($command))
|
|
|
|
return $found;
|
|
|
|
}
|
|
|
|
return null;
|
2024-07-28 16:02:31 +02:00
|
|
|
}
|
|
|
|
|
2024-12-15 13:24:57 +01:00
|
|
|
/**
|
|
|
|
* Add a server listener
|
|
|
|
*
|
|
|
|
* @param ServerInterface $server
|
|
|
|
* @return void
|
|
|
|
*/
|
2024-03-01 14:34:14 +01:00
|
|
|
public function addServer(ServerInterface $server): void
|
|
|
|
{
|
|
|
|
$this->servers->attach($server);
|
|
|
|
$server->on('connection', $this->onServerConnection(...));
|
|
|
|
}
|
|
|
|
|
2024-12-15 13:24:57 +01:00
|
|
|
/**
|
|
|
|
* Remove a server listener
|
|
|
|
*
|
|
|
|
* @param ServerInterface $server
|
|
|
|
* @return void
|
|
|
|
*/
|
2024-03-01 14:34:14 +01:00
|
|
|
public function removeServer(ServerInterface $server): void
|
|
|
|
{
|
|
|
|
$server->close();
|
|
|
|
$server->removeListener('connection', $this->onServerConnection(...));
|
|
|
|
$this->servers->detach($server);
|
|
|
|
}
|
|
|
|
|
2024-12-15 13:24:57 +01:00
|
|
|
/**
|
|
|
|
* Close and shutdown the servers
|
|
|
|
*
|
|
|
|
* @return void
|
|
|
|
*/
|
2024-03-01 14:34:14 +01:00
|
|
|
public function close(): void
|
|
|
|
{
|
|
|
|
foreach ($this->servers as $server) {
|
|
|
|
$this->removeServer($server);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
private function onServerConnection(DuplexStreamInterface $client): void
|
|
|
|
{
|
|
|
|
$this->connections->attach($client);
|
|
|
|
$client->on('data', function ($data) use ($client) {
|
|
|
|
try {
|
2024-03-01 21:48:28 +01:00
|
|
|
$messages = Message::fromStringMulti($data);
|
|
|
|
foreach ($messages as $message) {
|
|
|
|
$this->onClientMessage($client, $message);
|
|
|
|
}
|
2024-03-01 14:34:14 +01:00
|
|
|
} catch (MessageException $e) {
|
|
|
|
$client->end('{"msg":"error","data":{"error":"Bad message format"}}');
|
|
|
|
}
|
|
|
|
});
|
|
|
|
$client->on('close', function () use ($client) {
|
|
|
|
$this->connections->detach($client);
|
|
|
|
});
|
|
|
|
}
|
|
|
|
|
|
|
|
private function onClientMessage(DuplexStreamInterface $client, Message $message): void
|
|
|
|
{
|
|
|
|
switch ($message->getType()) {
|
|
|
|
case Message::MSGTYPE_EXECUTE: // Client call to execute command
|
|
|
|
$data = $message->getData();
|
|
|
|
$context = new Context($data['command'],$data['context']);
|
|
|
|
$this->executeContext($context)->then(
|
|
|
|
function ($result) use ($message, $client) {
|
|
|
|
$client->write($message->asResult($result)->toJson()."\n");
|
2024-03-01 21:48:28 +01:00
|
|
|
},
|
|
|
|
function (\Throwable $error) use ($message, $client) {
|
|
|
|
$client->write($message->asError($error->getMessage())->toJson()."\n");
|
2024-03-01 14:34:14 +01:00
|
|
|
}
|
|
|
|
);
|
|
|
|
break;
|
|
|
|
case Message::MSGTYPE_RESULT: // Result from execution on client
|
|
|
|
// TODO implement me
|
|
|
|
break;
|
2024-03-19 00:01:12 +01:00
|
|
|
case Message::MSGTYPE_REGISTRY: // command registry actions
|
2024-03-01 14:34:14 +01:00
|
|
|
// TODO implement me
|
|
|
|
break;
|
|
|
|
default:
|
|
|
|
$client->end('{"msg":"error","data":{"error":"Unexpected message type"}}');
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* {@inheritDoc}
|
|
|
|
*/
|
|
|
|
public function execute(string $command, array $context): PromiseInterface
|
|
|
|
{
|
|
|
|
$context = new Context($command, $context);
|
|
|
|
return $this->executeContext($context);
|
|
|
|
}
|
|
|
|
|
|
|
|
private function executeContext(Context $context): PromiseInterface
|
|
|
|
{
|
2024-12-15 13:24:57 +01:00
|
|
|
$command = $this->findCommand($context->getCommandName());
|
2024-03-01 14:34:14 +01:00
|
|
|
if (!$command) return new Promise(function (callable $resolve) use ($context) {
|
|
|
|
throw new \RuntimeException("Unable to resolve command: ".$context->getCommandName());
|
|
|
|
});
|
|
|
|
|
|
|
|
return $command->call($context);
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* {@inheritDoc}
|
|
|
|
*/
|
|
|
|
public function notify(string $event, array $data): void
|
|
|
|
{
|
2024-03-01 15:20:54 +01:00
|
|
|
$this->emit(self::EVENT_NOTIFY, [ $event, $data ]);
|
|
|
|
|
|
|
|
$json = (new Message(Message::MSGTYPE_NOTIFY, [
|
|
|
|
'event' => $event,
|
|
|
|
'data' => $data
|
|
|
|
]))->toJson();
|
|
|
|
|
|
|
|
foreach ($this->connections as $client) {
|
|
|
|
$client->write($json."\n");
|
|
|
|
}
|
2024-03-01 14:34:14 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
|