From f8b43beaf52cd70160ef8bfab81398eb559cd20a Mon Sep 17 00:00:00 2001 From: Christopher Vagnetoft Date: Mon, 16 Apr 2018 02:52:51 +0200 Subject: [PATCH] Added multichannels and a simple channel-based bus --- README.md | 31 ++++++ examples/bus.php | 28 ++++++ src/Interop/Async/Promise.php | 20 ++++ src/Interop/Async/Timer.php | 1 - src/Interop/Bus/Client.php | 104 +++++++++++++++++++++ src/Interop/Bus/Master.php | 40 ++++++++ src/Interop/Channel/ChannelInterface.php | 6 +- src/Interop/Channel/MultiStreamChannel.php | 11 ++- src/Interop/Channel/StreamChannel.php | 11 ++- 9 files changed, 241 insertions(+), 11 deletions(-) create mode 100644 examples/bus.php create mode 100644 src/Interop/Async/Promise.php create mode 100644 src/Interop/Bus/Client.php create mode 100644 src/Interop/Bus/Master.php diff --git a/README.md b/README.md index 7f5ec66..67d9627 100644 --- a/README.md +++ b/README.md @@ -155,3 +155,34 @@ or through the `createPair()` factory method. [ $ch1, $ch2 ] = StreamChannel::createPair(); $ch1->send($data); $rcvd = $ch2->receive(); + +### MultiChannels + +The `MultiStreamChannel` lets you connect multiple clients to a single master channel. + + $master = new MultiStreamChannel(); + + $ch1 = $master->createClient(); + $ch2 = $master->createClient(); + $ch1->send($data); + $rcvd = $ch2->receive(); + +### Buses + +Buses are high-level abstractions of channels, implementing limited rpc with handling of +return values. + + $master = new Master(); + + $client = $master->createClient(); + $client->export("/db/updateindex", function ($schema) { + // Update the index + return true; + }); + + $other = $master->createClient(); + $other->call("/db/updateindex", "*")->then(function ($status) { + if ($status === true) { + echo "Index updated!\n"; + } + }); diff --git a/examples/bus.php b/examples/bus.php new file mode 100644 index 0000000..13cdef2 --- /dev/null +++ b/examples/bus.php @@ -0,0 +1,28 @@ +createClient(); +$client1->addListener(function ($msg) use ($client1) { + printf("channel1: %s\n", json_encode($msg)); + $client1->call("/channel2/hello", "channel1")->then(function ($ret) { + printf("channel1: %s\n", $ret); + }); +}); +$client2 = $master->createClient(); +$client2->addListener(function ($msg) { + printf("channel2: %s\n", json_encode($msg)); +}); +$client2->export("/channel2/hello", function ($name) { + return sprintf("Hello, %s", $name); +}); + +$client1->send("hello world"); +for ($n = 0; $n < 10; $n++) { + sleep(1); +} \ No newline at end of file diff --git a/src/Interop/Async/Promise.php b/src/Interop/Async/Promise.php new file mode 100644 index 0000000..8a2ec33 --- /dev/null +++ b/src/Interop/Async/Promise.php @@ -0,0 +1,20 @@ +then = $then; + } + + public function invoke(...$args) + { + call_user_func($this->then, ...$args); + } + +} diff --git a/src/Interop/Async/Timer.php b/src/Interop/Async/Timer.php index 4f34a5f..d507dc9 100644 --- a/src/Interop/Async/Timer.php +++ b/src/Interop/Async/Timer.php @@ -23,7 +23,6 @@ class Timer public function __construct(callable $callback) { $this->callback = $callback; - $this->seconds = $seconds; self::registerTimer($this->callback); } diff --git a/src/Interop/Bus/Client.php b/src/Interop/Bus/Client.php new file mode 100644 index 0000000..790b324 --- /dev/null +++ b/src/Interop/Bus/Client.php @@ -0,0 +1,104 @@ +channel = $channel; + $this->timer = new Timer([ $this, "update" ]); + } + + public function update() + { + while ($msg = $this->channel->receive()) { + if (!array_key_exists('op', $msg)) { + fprintf(STDERR, "Warning: Frame missing op -- %s\n", json_encode($msg)); + continue; + } + switch ($msg['op']) { + case 'retn': + $id = $msg['id']; + if (!array_key_exists($id, $this->promises)) { + continue; + } + $promise = $this->promises[$id]; + $promise->invoke($msg['ret']); + unset($this->promises[$id]); + continue; + case 'call': + $id = $msg['id']; + $obj = $msg['obj']; + if (!array_key_exists($obj, $this->exports)) { + continue; + } + $ret = call_user_func($this->exports[$obj], ...$msg['arg']); + $frame = [ + 'op' => 'retn', + 'id' => $id, + 'ret' => $ret + ]; + $this->channel->send($frame); + continue; + case 'msg': + default: + foreach ($this->listeners as $listener) { + call_user_func($listener, $msg); + } + } + } + } + + public function addListener(callable $handler) + { + $this->listeners[] = $handler; + } + + public function export($path, callable $function) + { + $this->exports[$path] = $function; + } + + public function send($data) + { + $frame = [ + 'op' => 'msg', + 'msg' => $data + ]; + $this->channel->send($frame); + } + + public function call($path, ...$args) + { + $promise = new Promise(); + + $rid = uniqid(md5($path),true); + $frame = [ + 'op' => 'call', + 'id' => $rid, + 'obj' => $path, + 'arg' => $args, + ]; + $this->promises[$rid] = $promise; + $this->channel->send($frame); + return $promise; + } +} \ No newline at end of file diff --git a/src/Interop/Bus/Master.php b/src/Interop/Bus/Master.php new file mode 100644 index 0000000..3db8378 --- /dev/null +++ b/src/Interop/Bus/Master.php @@ -0,0 +1,40 @@ +channel = new MultiStreamChannel(); + + $this->timer = new Timer([ $this, "update" ]); + } + + public function createClient() : Client + { + /** @var ChannelInterface $channel */ + $channel = $this->channel->createClient(); + + $client = new Client($channel); + return $client; + } + + public function update() + { + while ($msg = $this->channel->receive()) { + $this->channel->send($msg); + } + } + +} \ No newline at end of file diff --git a/src/Interop/Channel/ChannelInterface.php b/src/Interop/Channel/ChannelInterface.php index 18bc92c..fd9c1f3 100644 --- a/src/Interop/Channel/ChannelInterface.php +++ b/src/Interop/Channel/ChannelInterface.php @@ -20,10 +20,10 @@ interface ChannelInterface public function receive(); /** - * Send a frame of data + * Send a frame of data, returns true on success * * @param mixed $data - * @return void + * @return bool */ - public function send($data); + public function send($data):bool; } \ No newline at end of file diff --git a/src/Interop/Channel/MultiStreamChannel.php b/src/Interop/Channel/MultiStreamChannel.php index c6f2231..202fac5 100644 --- a/src/Interop/Channel/MultiStreamChannel.php +++ b/src/Interop/Channel/MultiStreamChannel.php @@ -40,13 +40,16 @@ class MultiStreamChannel implements ChannelInterface * {@inheritDoc} * * @param mixed $data - * @return void + * @return bool */ - public function send($data) + public function send($data):bool { - foreach ($this->clients as $client) { - $client->send($data); + foreach ($this->clients as $i=>$client) { + if (false === $client->send($data)) { + unset($this->clients[$i]); + } } + return true; } /** diff --git a/src/Interop/Channel/StreamChannel.php b/src/Interop/Channel/StreamChannel.php index 4ce3b7d..52e8ca9 100644 --- a/src/Interop/Channel/StreamChannel.php +++ b/src/Interop/Channel/StreamChannel.php @@ -36,11 +36,16 @@ class StreamChannel implements ChannelInterface * {@inheritDoc} * * @param mixed $data - * @return void + * @return bool */ - public function send($data) + public function send($data):bool { - fwrite($this->stream, json_encode($data)."\0"); + $json = json_encode($data) . "\0"; + $ret = @fwrite($this->stream, $json, strlen($json)); + if (false === $ret) { + return false; + } + return true; } /**