Browse Source

Removed the bus interop code for now

master 0.1.4
Noccy 4 years ago
parent
commit
e8c4a85486
  1. 19
      README.md
  2. 28
      examples/bus.php
  3. 20
      src/Interop/Async/Promise.php
  4. 104
      src/Interop/Bus/Client.php
  5. 40
      src/Interop/Bus/Master.php

19
README.md

@ -167,22 +167,3 @@ The `MultiStreamChannel` lets you connect multiple clients to a single master ch
$ch1->send($data);
$rcvd = $ch2->receive();
### Buses
Buses are high-level abstractions of channels, implementing limited rpc with handling of
return values.
$master = new Master();
$client = $master->createClient();
$client->export("/db/updateindex", function ($schema) {
// Update the index
return true;
});
$other = $master->createClient();
$other->call("/db/updateindex", "*")->then(function ($status) {
if ($status === true) {
echo "Index updated!\n";
}
});

28
examples/bus.php

@ -1,28 +0,0 @@
<?php
require_once __DIR__."/../vendor/autoload.php";
use NoccyLabs\Ipc\Interop\Bus\Master;
$master = new Master();
$client1 = $master->createClient();
$client1->addListener(function ($msg) use ($client1) {
printf("channel1: %s\n", json_encode($msg));
$client1->call("/channel2/hello", "channel1")->then(function ($ret) {
printf("channel1: %s\n", $ret);
});
});
$client2 = $master->createClient();
$client2->addListener(function ($msg) {
printf("channel2: %s\n", json_encode($msg));
});
$client2->export("/channel2/hello", function ($name) {
return sprintf("Hello, %s", $name);
});
$client1->send("hello world");
for ($n = 0; $n < 10; $n++) {
sleep(1);
}

20
src/Interop/Async/Promise.php

@ -1,20 +0,0 @@
<?php
namespace NoccyLabs\Ipc\Interop\Async;
class Promise
{
private $then;
public function then(callable $then)
{
$this->then = $then;
}
public function invoke(...$args)
{
call_user_func($this->then, ...$args);
}
}

104
src/Interop/Bus/Client.php

@ -1,104 +0,0 @@
<?php
namespace NoccyLabs\Ipc\Interop\Bus;
use NoccyLabs\Ipc\Interop\Channel\ChannelInterface;
use NoccyLabs\Ipc\Interop\Channel\MultiStreamChannel;
use NoccyLabs\Ipc\Interop\Async\Timer;
use NoccyLabs\Ipc\Interop\Async\Promise;
class Client
{
/** @var ChannelInterface $channel */
protected $channel;
protected $listeners = [];
protected $promises = [];
protected $exports = [];
/**
* Constructor
*/
public function __construct(ChannelInterface $channel)
{
$this->channel = $channel;
$this->timer = new Timer([ $this, "update" ]);
}
public function update()
{
while ($msg = $this->channel->receive()) {
if (!array_key_exists('op', $msg)) {
fprintf(STDERR, "Warning: Frame missing op -- %s\n", json_encode($msg));
continue;
}
switch ($msg['op']) {
case 'retn':
$id = $msg['id'];
if (!array_key_exists($id, $this->promises)) {
continue;
}
$promise = $this->promises[$id];
$promise->invoke($msg['ret']);
unset($this->promises[$id]);
continue;
case 'call':
$id = $msg['id'];
$obj = $msg['obj'];
if (!array_key_exists($obj, $this->exports)) {
continue;
}
$ret = call_user_func($this->exports[$obj], ...$msg['arg']);
$frame = [
'op' => 'retn',
'id' => $id,
'ret' => $ret
];
$this->channel->send($frame);
continue;
case 'msg':
default:
foreach ($this->listeners as $listener) {
call_user_func($listener, $msg);
}
}
}
}
public function addListener(callable $handler)
{
$this->listeners[] = $handler;
}
public function export($path, callable $function)
{
$this->exports[$path] = $function;
}
public function send($data)
{
$frame = [
'op' => 'msg',
'msg' => $data
];
$this->channel->send($frame);
}
public function call($path, ...$args)
{
$promise = new Promise();
$rid = uniqid(md5($path),true);
$frame = [
'op' => 'call',
'id' => $rid,
'obj' => $path,
'arg' => $args,
];
$this->promises[$rid] = $promise;
$this->channel->send($frame);
return $promise;
}
}

40
src/Interop/Bus/Master.php

@ -1,40 +0,0 @@
<?php
namespace NoccyLabs\Ipc\Interop\Bus;
use NoccyLabs\Ipc\Interop\Channel\ChannelInterface;
use NoccyLabs\Ipc\Interop\Channel\MultiStreamChannel;
use NoccyLabs\Ipc\Interop\Async\Timer;
class Master
{
/** @var ChannelInterface $channel */
protected $channel;
/**
* Constructor
*/
public function __construct()
{
$this->channel = new MultiStreamChannel();
$this->timer = new Timer([ $this, "update" ]);
}
public function createClient() : Client
{
/** @var ChannelInterface $channel */
$channel = $this->channel->createClient();
$client = new Client($channel);
return $client;
}
public function update()
{
while ($msg = $this->channel->receive()) {
$this->channel->send($msg);
}
}
}
Loading…
Cancel
Save