From 6f49b69a66ec69bbb438461644a105d07b5a195a Mon Sep 17 00:00:00 2001 From: Christopher Vagnetoft Date: Fri, 1 Mar 2024 15:20:54 +0100 Subject: [PATCH] Implemented notify --- README.md | 2 +- examples/notify.php | 44 ++++++++++++++++++++++++++++++++++++++++ src/CommandBus.php | 10 +++++++++ src/CommandBusClient.php | 8 +++++--- 4 files changed, 60 insertions(+), 4 deletions(-) create mode 100644 examples/notify.php diff --git a/README.md b/README.md index 53e883a..14e2f2d 100644 --- a/README.md +++ b/README.md @@ -4,7 +4,7 @@ * Can run monolithic (create a bus and use as is), or distributed (create bus and use clients), or in a hybrid setup. The CommandBus functions identical with or without clients. * All commands called asynchronously using promises and deferreds. -* Push notifications from the bus to subscribers and listeners, such as progress or log/error messages. (WIP) +* Push notifications from the bus to subscribers and listeners, such as progress or log/error messages. ### Potential caveats diff --git a/examples/notify.php b/examples/notify.php new file mode 100644 index 0000000..e1f0d3a --- /dev/null +++ b/examples/notify.php @@ -0,0 +1,44 @@ +addServer($server); + +$bus->on(CommandBus::EVENT_NOTIFY, + function (string $event, array $data) { + printf("notify event: %s %s\n", $event, json_encode($data, JSON_UNESCAPED_SLASHES)); + } +); + +$client = new CommandBusClient(); +$client->connect("tcp://127.0.0.1:9999"); + +$client->on(CommandBusClient::EVENT_NOTIFY, + function (string $event, array $data) { + printf("notify client: %s %s\n", $event, json_encode($data, JSON_UNESCAPED_SLASHES)); + } +); + +// Wait for connections etc, before sending a notification. +Loop::addTimer(1, function () use ($bus) { + $bus->notify("hello", [ 'greet'=>"World" ]); +}); +// Shutdown in 2 secs +Loop::addTimer(2, function () use ($bus, $client) { + $bus->close(); + $client->close(); +}); \ No newline at end of file diff --git a/src/CommandBus.php b/src/CommandBus.php index 74e3814..8dc7f4f 100644 --- a/src/CommandBus.php +++ b/src/CommandBus.php @@ -109,6 +109,16 @@ class CommandBus implements CommandBusInterface */ public function notify(string $event, array $data): void { + $this->emit(self::EVENT_NOTIFY, [ $event, $data ]); + + $json = (new Message(Message::MSGTYPE_NOTIFY, [ + 'event' => $event, + 'data' => $data + ]))->toJson(); + + foreach ($this->connections as $client) { + $client->write($json."\n"); + } } } diff --git a/src/CommandBusClient.php b/src/CommandBusClient.php index edcce73..b099993 100644 --- a/src/CommandBusClient.php +++ b/src/CommandBusClient.php @@ -84,7 +84,7 @@ class CommandBusClient implements CommandBusInterface private function onServerMessage(Message $message): void { - // fprintf(STDERR, "onServerMessage: %s\n", $message->toJson()); + //fprintf(STDERR, "onServerMessage: %s\n", $message->toJson()); switch ($message->getType()) { case Message::MSGTYPE_EXECUTE: // server call to execute command // TODO implement me @@ -97,8 +97,10 @@ class CommandBusClient implements CommandBusInterface unset($this->pending[$uuid]); } break; - case Message::MSGTYPE_REGISTRY: // command registry actions - // TODO implement me + case Message::MSGTYPE_NOTIFY: // notify + $event = $message->getData()['event']??null; + $data = (array)($message->getData()['data']??[]); + $this->emit(self::EVENT_NOTIFY, [ $event, $data ]); break; default: $this->connection->end('{"msg":"error","data":{"error":"Unexpected message type"}}');