diff --git a/README.md b/README.md index 67d9627..cc81c79 100644 --- a/README.md +++ b/README.md @@ -167,22 +167,3 @@ The `MultiStreamChannel` lets you connect multiple clients to a single master ch $ch1->send($data); $rcvd = $ch2->receive(); -### Buses - -Buses are high-level abstractions of channels, implementing limited rpc with handling of -return values. - - $master = new Master(); - - $client = $master->createClient(); - $client->export("/db/updateindex", function ($schema) { - // Update the index - return true; - }); - - $other = $master->createClient(); - $other->call("/db/updateindex", "*")->then(function ($status) { - if ($status === true) { - echo "Index updated!\n"; - } - }); diff --git a/examples/bus.php b/examples/bus.php deleted file mode 100644 index 13cdef2..0000000 --- a/examples/bus.php +++ /dev/null @@ -1,28 +0,0 @@ -createClient(); -$client1->addListener(function ($msg) use ($client1) { - printf("channel1: %s\n", json_encode($msg)); - $client1->call("/channel2/hello", "channel1")->then(function ($ret) { - printf("channel1: %s\n", $ret); - }); -}); -$client2 = $master->createClient(); -$client2->addListener(function ($msg) { - printf("channel2: %s\n", json_encode($msg)); -}); -$client2->export("/channel2/hello", function ($name) { - return sprintf("Hello, %s", $name); -}); - -$client1->send("hello world"); -for ($n = 0; $n < 10; $n++) { - sleep(1); -} \ No newline at end of file diff --git a/src/Interop/Async/Promise.php b/src/Interop/Async/Promise.php deleted file mode 100644 index 8a2ec33..0000000 --- a/src/Interop/Async/Promise.php +++ /dev/null @@ -1,20 +0,0 @@ -then = $then; - } - - public function invoke(...$args) - { - call_user_func($this->then, ...$args); - } - -} diff --git a/src/Interop/Bus/Client.php b/src/Interop/Bus/Client.php deleted file mode 100644 index 790b324..0000000 --- a/src/Interop/Bus/Client.php +++ /dev/null @@ -1,104 +0,0 @@ -channel = $channel; - $this->timer = new Timer([ $this, "update" ]); - } - - public function update() - { - while ($msg = $this->channel->receive()) { - if (!array_key_exists('op', $msg)) { - fprintf(STDERR, "Warning: Frame missing op -- %s\n", json_encode($msg)); - continue; - } - switch ($msg['op']) { - case 'retn': - $id = $msg['id']; - if (!array_key_exists($id, $this->promises)) { - continue; - } - $promise = $this->promises[$id]; - $promise->invoke($msg['ret']); - unset($this->promises[$id]); - continue; - case 'call': - $id = $msg['id']; - $obj = $msg['obj']; - if (!array_key_exists($obj, $this->exports)) { - continue; - } - $ret = call_user_func($this->exports[$obj], ...$msg['arg']); - $frame = [ - 'op' => 'retn', - 'id' => $id, - 'ret' => $ret - ]; - $this->channel->send($frame); - continue; - case 'msg': - default: - foreach ($this->listeners as $listener) { - call_user_func($listener, $msg); - } - } - } - } - - public function addListener(callable $handler) - { - $this->listeners[] = $handler; - } - - public function export($path, callable $function) - { - $this->exports[$path] = $function; - } - - public function send($data) - { - $frame = [ - 'op' => 'msg', - 'msg' => $data - ]; - $this->channel->send($frame); - } - - public function call($path, ...$args) - { - $promise = new Promise(); - - $rid = uniqid(md5($path),true); - $frame = [ - 'op' => 'call', - 'id' => $rid, - 'obj' => $path, - 'arg' => $args, - ]; - $this->promises[$rid] = $promise; - $this->channel->send($frame); - return $promise; - } -} \ No newline at end of file diff --git a/src/Interop/Bus/Master.php b/src/Interop/Bus/Master.php deleted file mode 100644 index 3db8378..0000000 --- a/src/Interop/Bus/Master.php +++ /dev/null @@ -1,40 +0,0 @@ -channel = new MultiStreamChannel(); - - $this->timer = new Timer([ $this, "update" ]); - } - - public function createClient() : Client - { - /** @var ChannelInterface $channel */ - $channel = $this->channel->createClient(); - - $client = new Client($channel); - return $client; - } - - public function update() - { - while ($msg = $this->channel->receive()) { - $this->channel->send($msg); - } - } - -} \ No newline at end of file