From befe5f5d598c1843fb2c8fc5a4aa5ef0425dfb2d Mon Sep 17 00:00:00 2001 From: Christopher Vagnetoft Date: Fri, 1 Mar 2024 14:34:14 +0100 Subject: [PATCH] Initial commit --- .gitignore | 3 + README.md | 52 +++++++++++++ composer.json | 29 +++++++ examples/local.php | 33 ++++++++ examples/server.php | 45 +++++++++++ phpstan.neon | 12 +++ src/Command.php | 40 ++++++++++ src/CommandBus.php | 115 +++++++++++++++++++++++++++ src/CommandBusClient.php | 150 ++++++++++++++++++++++++++++++++++++ src/CommandBusException.php | 8 ++ src/CommandBusInterface.php | 33 ++++++++ src/CommandRegistry.php | 53 +++++++++++++ src/Context.php | 41 ++++++++++ src/Message.php | 88 +++++++++++++++++++++ src/MessageException.php | 9 +++ 15 files changed, 711 insertions(+) create mode 100644 .gitignore create mode 100644 README.md create mode 100644 composer.json create mode 100644 examples/local.php create mode 100644 examples/server.php create mode 100644 phpstan.neon create mode 100644 src/Command.php create mode 100644 src/CommandBus.php create mode 100644 src/CommandBusClient.php create mode 100644 src/CommandBusException.php create mode 100644 src/CommandBusInterface.php create mode 100644 src/CommandRegistry.php create mode 100644 src/Context.php create mode 100644 src/Message.php create mode 100644 src/MessageException.php diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..cbaa689 --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +/vendor/ +/.phpunit.* +/composer.lock diff --git a/README.md b/README.md new file mode 100644 index 0000000..958ce37 --- /dev/null +++ b/README.md @@ -0,0 +1,52 @@ +# Command Bus for ReactPHP + +* Can run monolithic (create a bus and use as is), or distributed (create bus and use clients), or a hybrid. +* All commands called asynchronously using promises and deferreds. + +## Installing + +```shell +$ composer require noccylabs/react-command-bus:^0.1.0 +``` + +## Usage + +```php +// This is enough to setup a local bus. +$bus = new CommandBus(); +$bus->register('hello', function (Context $context) { + return new Promise(function (callable $resolve) use ($context) { + return $resolve([ 'message' => "Hello, {$context->name}" ]); + }); +}); + +// You can call it as expected +$bus->execute('hello', [ 'name' => "Bob" ]) + ->then(function (array $result) { + echo "Result: {$result['message']}\n"; + }); + +// Add a listener, and you can now connect to it! +$bus->addServer($server); + + +// So using this in another script works as expected, if you consider +// the async flow. See the examples for working examples. + +$client = new CommandBusClient(); +$client->connect($socket); + +$client->execute('hello', [ 'name' => "Bob" ]) + ->then(function (array $result) { + echo "Result: {$result['message']}\n"; + }); + + +// The bus can also notify all clients about important events +$bus->notify('updateCompleted', [ 'info' => [] ]); + +// Listening on the bus or client will yield the event +$bus->on('notify', function (string $event, array $data) {}); +$client->on('notify', function (string $event, array $data) {}); + +``` diff --git a/composer.json b/composer.json new file mode 100644 index 0000000..3c59803 --- /dev/null +++ b/composer.json @@ -0,0 +1,29 @@ +{ + "name": "noccylabs/react-command-bus", + "description": "A command bus for ReactPHP applications", + "type": "library", + "license": "GPL-3.0-or-later", + "autoload": { + "psr-4": { + "NoccyLabs\\React\\CommandBus\\": "src/" + } + }, + "authors": [ + { + "name": "NoccyLabs", + "email": "labs@noccy.com" + } + ], + "require": { + "react/event-loop": "^1.5", + "react/promise": "^3.1", + "react/socket": "^1.15", + "react/stream": "^1.3", + "react/promise-timer": "^1.10", + "symfony/uid": "^6.0|^7.0" + }, + "require-dev": { + "phpunit/phpunit": "^11.0", + "phpstan/phpstan": "^1.10" + } +} diff --git a/examples/local.php b/examples/local.php new file mode 100644 index 0000000..70d5d46 --- /dev/null +++ b/examples/local.php @@ -0,0 +1,33 @@ +register("hello", function (Context $context) { + return new Promise(function (callable $resolve) use ($context) { + Loop::addTimer(1, function () use ($context, $resolve) { + $resolve("Hello, {$context->name}"); + }); + }); +}); +$commands->register("hello2", function (Context $context) { + return "Hello2, {$context->name}"; +}); + +$bus = new CommandBus($commands); + +$bus->execute('hello', ['name'=>'Bob']) +->then(function ($result) { + var_dump($result); +}); + +$bus->execute('hello2', ['name'=>'Bob']) +->then(function ($result) { + var_dump($result); +}); diff --git a/examples/server.php b/examples/server.php new file mode 100644 index 0000000..b4e7a67 --- /dev/null +++ b/examples/server.php @@ -0,0 +1,45 @@ +register("hello", function (Context $context) { + // You don't have to, but you should return a promise from your + // commands. + return new Promise(function (callable $resolve) use ($context) { + $resolve("Hello, {$context->name}"); + }); +}); + +// Create the CommandBus and pass the CommandRegistry +$bus = new CommandBus($commands); + +$server = new SocketServer("tcp://127.0.0.1:9999"); +$bus->addServer($server); + +// The server is sorted, now for the client! + +$client = new CommandBusClient(); +$client->on(CommandBusClient::EVENT_CONNECTED, + function () use ($client, $bus) { + $client->execute('hello', ['name'=>'Bob']) + ->then(function ($result) use ($client,$bus) { + // Result from the call + var_dump($result); + // Shut down after receiving the response + $bus->close(); + $client->close(); + }); + } +); +$client->connect("tcp://127.0.0.1:9999"); \ No newline at end of file diff --git a/phpstan.neon b/phpstan.neon new file mode 100644 index 0000000..e68bade --- /dev/null +++ b/phpstan.neon @@ -0,0 +1,12 @@ +parameters: + level: 5 + + excludePaths: + - doc + - vendor + - tests + + # Paths to include in the analysis + paths: + - src + diff --git a/src/Command.php b/src/Command.php new file mode 100644 index 0000000..d050792 --- /dev/null +++ b/src/Command.php @@ -0,0 +1,40 @@ +name = $name; + $this->handler = $handler; + } + + public function getName(): string + { + return $this->name; + } + + public function call(Context $context): PromiseInterface + { + return new Promise(function (callable $resolve) use ($context) { + $resolve(call_user_func($this->handler, $context)); + return; + }); + } + +} + diff --git a/src/CommandBus.php b/src/CommandBus.php new file mode 100644 index 0000000..74e3814 --- /dev/null +++ b/src/CommandBus.php @@ -0,0 +1,115 @@ +commandRegistry = $commandRegistry; + $this->connections = new SplObjectStorage(); + $this->servers = new SplObjectStorage(); + } + + public function addServer(ServerInterface $server): void + { + $this->servers->attach($server); + $server->on('connection', $this->onServerConnection(...)); + } + + public function removeServer(ServerInterface $server): void + { + $server->close(); + $server->removeListener('connection', $this->onServerConnection(...)); + $this->servers->detach($server); + } + + public function close(): void + { + foreach ($this->servers as $server) { + $this->removeServer($server); + } + } + + private function onServerConnection(DuplexStreamInterface $client): void + { + $this->connections->attach($client); + $client->on('data', function ($data) use ($client) { + try { + $message = Message::fromString($data); + $this->onClientMessage($client, $message); + } catch (MessageException $e) { + $client->end('{"msg":"error","data":{"error":"Bad message format"}}'); + } + }); + $client->on('close', function () use ($client) { + $this->connections->detach($client); + }); + } + + private function onClientMessage(DuplexStreamInterface $client, Message $message): void + { + switch ($message->getType()) { + case Message::MSGTYPE_EXECUTE: // Client call to execute command + $data = $message->getData(); + $context = new Context($data['command'],$data['context']); + $this->executeContext($context)->then( + function ($result) use ($message, $client) { + $client->write($message->asResult($result)->toJson()."\n"); + } + ); + break; + case Message::MSGTYPE_RESULT: // Result from execution on client + // TODO implement me + break; + case Message::MSGTYPE_REGISTRY: // command registry actions + // TODO implement me + break; + default: + $client->end('{"msg":"error","data":{"error":"Unexpected message type"}}'); + } + } + + /** + * {@inheritDoc} + */ + public function execute(string $command, array $context): PromiseInterface + { + $context = new Context($command, $context); + return $this->executeContext($context); + } + + private function executeContext(Context $context): PromiseInterface + { + $command = $this->commandRegistry->find($context->getCommandName()); + if (!$command) return new Promise(function (callable $resolve) use ($context) { + throw new \RuntimeException("Unable to resolve command: ".$context->getCommandName()); + }); + + return $command->call($context); + } + + /** + * {@inheritDoc} + */ + public function notify(string $event, array $data): void + { + } + +} + diff --git a/src/CommandBusClient.php b/src/CommandBusClient.php new file mode 100644 index 0000000..edcce73 --- /dev/null +++ b/src/CommandBusClient.php @@ -0,0 +1,150 @@ + */ + private array $pending = []; + + private string $address; + + public function __construct(?CommandRegistry $commandRegistry = null, ?ConnectorInterface $connector = null) + { + $this->commandRegistry = $commandRegistry; + if ($commandRegistry) { + $commandRegistry->on(CommandRegistry::EVENT_REGISTERED, $this->onCommandRegistered(...)); + $commandRegistry->on(CommandRegistry::EVENT_UNREGISTERED, $this->onCommandUnregistered(...)); + } + $this->connector = $connector??new TcpConnector(); + } + + public function connect(string $address): void + { + $this->address = $address; + $this->reconnect(); + } + + public function close(): void + { + $this->connection->close(); + $this->connection->removeAllListeners(); + $this->connection = null; + } + + private function reconnect(): void + { + if ($this->connection) { + $this->close(); + } + + $this->connector->connect($this->address)->then( + function (DuplexStreamInterface $connection) { + $this->connection = $connection; + $this->emit(self::EVENT_CONNECTED); + $connection->on('close', function () { + $this->emit(self::EVENT_DISCONNECTED); + }); + $connection->on('data', function ($data) use ($connection) { + try { + $message = Message::fromString($data); + $this->onServerMessage($message); + } catch (MessageException $e) { + $connection->end('{"msg":"error","data":{"error":"Bad message format"}}'); + } + }); + }, + function (Throwable $e) { + $this->emit(self::EVENT_ERROR, [ $e->getMessage(), $e ]); + Loop::addTimer(5, $this->reconnect(...)); + } + ); + } + + private function onServerMessage(Message $message): void + { + // fprintf(STDERR, "onServerMessage: %s\n", $message->toJson()); + switch ($message->getType()) { + case Message::MSGTYPE_EXECUTE: // server call to execute command + // TODO implement me + break; + case Message::MSGTYPE_RESULT: // result from server + $uuid = $message->getUuid(); + $result = $message->getData()['result']; + if (array_key_exists($uuid, $this->pending)) { + $this->pending[$uuid]->resolve($result); + unset($this->pending[$uuid]); + } + break; + case Message::MSGTYPE_REGISTRY: // command registry actions + // TODO implement me + break; + default: + $this->connection->end('{"msg":"error","data":{"error":"Unexpected message type"}}'); + } + } + + private function onCommandRegistered(string $command): void + { + if ($this->connection && $this->connection->isWritable()) { + $msg = new Message(Message::MSGTYPE_REGISTRY, [ 'register' => [ $command ]]); + $this->connection->write($msg->toJson()); + } + } + + private function onCommandUnregistered(string $command): void + { + if ($this->connection && $this->connection->isWritable()) { + $msg = new Message(Message::MSGTYPE_REGISTRY, [ 'unregister' => [ $command ]]); + $this->connection->write($msg->toJson()); + } + } + + /** + * {@inheritDoc} + */ + public function execute(string $command, array $context): PromiseInterface + { + $deferred = new Deferred(); + + $message = new Message(Message::MSGTYPE_EXECUTE, [ + 'command' => $command, + 'context' => $context + ]); + $this->pending[$message->getUuid()] = $deferred; + + // fprintf(STDERR, "write: %s\n", $message->toJson()); + $this->connection->write($message->toJson()."\n"); + + return $deferred->promise(); + } + + /** + * {@inheritDoc} + */ + public function notify(string $event, array $data): void + { + } +} + diff --git a/src/CommandBusException.php b/src/CommandBusException.php new file mode 100644 index 0000000..f1baf9f --- /dev/null +++ b/src/CommandBusException.php @@ -0,0 +1,8 @@ + */ + private array $commands = []; + + public function register(string $command, callable $handler): void + { + $isNew = !array_key_exists($command, $this->commands); + + $this->commands[$command] = new Command($command, $handler); + + if ($isNew) { + $this->emit(self::EVENT_REGISTERED, [ $command ]); + } + } + + public function unregister(string $command): void + { + if (!array_key_exists($command, $this->commands)) { + return; + } + + unset($this->commands[$command]); + + $this->emit(self::EVENT_UNREGISTERED, [ $command ]); + } + + public function find(string $command): ?Command + { + return $this->commands[$command] ?? null; + } + + public function getNames(): array + { + return array_keys($this->commands); + } +} + diff --git a/src/Context.php b/src/Context.php new file mode 100644 index 0000000..b680e85 --- /dev/null +++ b/src/Context.php @@ -0,0 +1,41 @@ + The payload data */ + private array $payload = []; + + public function __construct(string $commandName, array $payload) + { + $this->commandName = $commandName; + $this->payload = $payload; + } + + public function getCommandName(): string + { + return $this->commandName; + } + + public function getPayload(): array + { + return $this->payload; + } + + public function __get($name) + { + return $this->payload[$name] ?? null; + } +} + diff --git a/src/Message.php b/src/Message.php new file mode 100644 index 0000000..299e927 --- /dev/null +++ b/src/Message.php @@ -0,0 +1,88 @@ +uuid = ($uuid && Uuid::isValid($uuid)) ? $uuid : (string)Uuid::v7(); + $this->messageType = $messageType; + $this->messageData = $messageData; + } + + public function getUuid(): string + { + return $this->uuid; + } + + public function getType(): string + { + return $this->messageType; + } + + public function getData(): array + { + return $this->messageData; + } + + /** + * + * @param string $data The JSON-encoded message data + * @return Message + * @throws MessageException if the message can not be parsed + */ + public static function fromString(string $data): Message + { + $json = @json_decode($data, true); + if (!$json || empty($json['msg'])) { + throw new MessageException("Invalid data"); + } + return new Message($json['msg'], $json['data'], $json['uuid']); + } + + public function asResult($result): Message + { + return new Message(self::MSGTYPE_RESULT, [ + 'result' => $result + ], $this->uuid); + } + + public function toJson(): string + { + return json_encode($this, JSON_UNESCAPED_SLASHES); + } + + public function jsonSerialize(): array + { + return [ + 'uuid' => $this->uuid, + 'msg' => $this->messageType, + 'data' => $this->messageData + ]; + } +} + diff --git a/src/MessageException.php b/src/MessageException.php new file mode 100644 index 0000000..e68d504 --- /dev/null +++ b/src/MessageException.php @@ -0,0 +1,9 @@ +