Implemented notify
This commit is contained in:
parent
f849ac481a
commit
6f49b69a66
@ -4,7 +4,7 @@
|
|||||||
|
|
||||||
* Can run monolithic (create a bus and use as is), or distributed (create bus and use clients), or in a hybrid setup. The CommandBus functions identical with or without clients.
|
* Can run monolithic (create a bus and use as is), or distributed (create bus and use clients), or in a hybrid setup. The CommandBus functions identical with or without clients.
|
||||||
* All commands called asynchronously using promises and deferreds.
|
* All commands called asynchronously using promises and deferreds.
|
||||||
* Push notifications from the bus to subscribers and listeners, such as progress or log/error messages. (WIP)
|
* Push notifications from the bus to subscribers and listeners, such as progress or log/error messages.
|
||||||
|
|
||||||
### Potential caveats
|
### Potential caveats
|
||||||
|
|
||||||
|
44
examples/notify.php
Normal file
44
examples/notify.php
Normal file
@ -0,0 +1,44 @@
|
|||||||
|
<?php
|
||||||
|
|
||||||
|
require_once __DIR__."/../vendor/autoload.php";
|
||||||
|
|
||||||
|
use NoccyLabs\React\CommandBus\CommandBus;
|
||||||
|
use NoccyLabs\React\CommandBus\CommandBusClient;
|
||||||
|
use NoccyLabs\React\CommandBus\CommandRegistry;
|
||||||
|
use NoccyLabs\React\CommandBus\Context;
|
||||||
|
use React\EventLoop\Loop;
|
||||||
|
use React\Promise\Promise;
|
||||||
|
use React\Socket\SocketServer;
|
||||||
|
|
||||||
|
// This example demonstrates notifications, so there are no commands here.
|
||||||
|
// You can add whatever commands you like.
|
||||||
|
$commands = new CommandRegistry();
|
||||||
|
$bus = new CommandBus($commands);
|
||||||
|
|
||||||
|
$server = new SocketServer("tcp://127.0.0.1:9999");
|
||||||
|
$bus->addServer($server);
|
||||||
|
|
||||||
|
$bus->on(CommandBus::EVENT_NOTIFY,
|
||||||
|
function (string $event, array $data) {
|
||||||
|
printf("notify event: %s %s\n", $event, json_encode($data, JSON_UNESCAPED_SLASHES));
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
$client = new CommandBusClient();
|
||||||
|
$client->connect("tcp://127.0.0.1:9999");
|
||||||
|
|
||||||
|
$client->on(CommandBusClient::EVENT_NOTIFY,
|
||||||
|
function (string $event, array $data) {
|
||||||
|
printf("notify client: %s %s\n", $event, json_encode($data, JSON_UNESCAPED_SLASHES));
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
// Wait for connections etc, before sending a notification.
|
||||||
|
Loop::addTimer(1, function () use ($bus) {
|
||||||
|
$bus->notify("hello", [ 'greet'=>"World" ]);
|
||||||
|
});
|
||||||
|
// Shutdown in 2 secs
|
||||||
|
Loop::addTimer(2, function () use ($bus, $client) {
|
||||||
|
$bus->close();
|
||||||
|
$client->close();
|
||||||
|
});
|
@ -109,6 +109,16 @@ class CommandBus implements CommandBusInterface
|
|||||||
*/
|
*/
|
||||||
public function notify(string $event, array $data): void
|
public function notify(string $event, array $data): void
|
||||||
{
|
{
|
||||||
|
$this->emit(self::EVENT_NOTIFY, [ $event, $data ]);
|
||||||
|
|
||||||
|
$json = (new Message(Message::MSGTYPE_NOTIFY, [
|
||||||
|
'event' => $event,
|
||||||
|
'data' => $data
|
||||||
|
]))->toJson();
|
||||||
|
|
||||||
|
foreach ($this->connections as $client) {
|
||||||
|
$client->write($json."\n");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -84,7 +84,7 @@ class CommandBusClient implements CommandBusInterface
|
|||||||
|
|
||||||
private function onServerMessage(Message $message): void
|
private function onServerMessage(Message $message): void
|
||||||
{
|
{
|
||||||
// fprintf(STDERR, "onServerMessage: %s\n", $message->toJson());
|
//fprintf(STDERR, "onServerMessage: %s\n", $message->toJson());
|
||||||
switch ($message->getType()) {
|
switch ($message->getType()) {
|
||||||
case Message::MSGTYPE_EXECUTE: // server call to execute command
|
case Message::MSGTYPE_EXECUTE: // server call to execute command
|
||||||
// TODO implement me
|
// TODO implement me
|
||||||
@ -97,8 +97,10 @@ class CommandBusClient implements CommandBusInterface
|
|||||||
unset($this->pending[$uuid]);
|
unset($this->pending[$uuid]);
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
case Message::MSGTYPE_REGISTRY: // command registry actions
|
case Message::MSGTYPE_NOTIFY: // notify
|
||||||
// TODO implement me
|
$event = $message->getData()['event']??null;
|
||||||
|
$data = (array)($message->getData()['data']??[]);
|
||||||
|
$this->emit(self::EVENT_NOTIFY, [ $event, $data ]);
|
||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
$this->connection->end('{"msg":"error","data":{"error":"Unexpected message type"}}');
|
$this->connection->end('{"msg":"error","data":{"error":"Unexpected message type"}}');
|
||||||
|
Loading…
Reference in New Issue
Block a user