Added multichannels and a simple channel-based bus
This commit is contained in:
		
							
								
								
									
										31
									
								
								README.md
									
									
									
									
									
								
							
							
						
						
									
										31
									
								
								README.md
									
									
									
									
									
								
							@@ -155,3 +155,34 @@ or through the `createPair()` factory method.
 | 
			
		||||
    [ $ch1, $ch2 ] = StreamChannel::createPair();
 | 
			
		||||
    $ch1->send($data);
 | 
			
		||||
    $rcvd = $ch2->receive();
 | 
			
		||||
 | 
			
		||||
### MultiChannels
 | 
			
		||||
 | 
			
		||||
The `MultiStreamChannel` lets you connect multiple clients to a single master channel.
 | 
			
		||||
 | 
			
		||||
    $master = new MultiStreamChannel();
 | 
			
		||||
 | 
			
		||||
    $ch1 = $master->createClient();
 | 
			
		||||
    $ch2 = $master->createClient();
 | 
			
		||||
    $ch1->send($data);
 | 
			
		||||
    $rcvd = $ch2->receive();
 | 
			
		||||
 | 
			
		||||
### Buses
 | 
			
		||||
 | 
			
		||||
Buses are high-level abstractions of channels, implementing limited rpc with handling of
 | 
			
		||||
return values.
 | 
			
		||||
 | 
			
		||||
    $master = new Master();
 | 
			
		||||
 | 
			
		||||
    $client = $master->createClient();
 | 
			
		||||
    $client->export("/db/updateindex", function ($schema) {
 | 
			
		||||
        // Update the index
 | 
			
		||||
        return true;
 | 
			
		||||
    });
 | 
			
		||||
 | 
			
		||||
    $other = $master->createClient();
 | 
			
		||||
    $other->call("/db/updateindex", "*")->then(function ($status) {
 | 
			
		||||
        if ($status === true) {
 | 
			
		||||
            echo "Index updated!\n";
 | 
			
		||||
        }
 | 
			
		||||
    });
 | 
			
		||||
 
 | 
			
		||||
							
								
								
									
										28
									
								
								examples/bus.php
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										28
									
								
								examples/bus.php
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,28 @@
 | 
			
		||||
<?php
 | 
			
		||||
 | 
			
		||||
require_once __DIR__."/../vendor/autoload.php";
 | 
			
		||||
 | 
			
		||||
use NoccyLabs\Ipc\Interop\Bus\Master;
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
$master = new Master();
 | 
			
		||||
 | 
			
		||||
$client1 = $master->createClient();
 | 
			
		||||
$client1->addListener(function ($msg) use ($client1) {
 | 
			
		||||
    printf("channel1: %s\n", json_encode($msg));
 | 
			
		||||
    $client1->call("/channel2/hello", "channel1")->then(function ($ret) {
 | 
			
		||||
        printf("channel1: %s\n", $ret);
 | 
			
		||||
    });
 | 
			
		||||
});
 | 
			
		||||
$client2 = $master->createClient();
 | 
			
		||||
$client2->addListener(function ($msg) {
 | 
			
		||||
    printf("channel2: %s\n", json_encode($msg));
 | 
			
		||||
});
 | 
			
		||||
$client2->export("/channel2/hello", function ($name) {
 | 
			
		||||
    return sprintf("Hello, %s", $name);
 | 
			
		||||
});
 | 
			
		||||
 | 
			
		||||
$client1->send("hello world");
 | 
			
		||||
for ($n = 0; $n < 10; $n++) {
 | 
			
		||||
    sleep(1);
 | 
			
		||||
}
 | 
			
		||||
							
								
								
									
										20
									
								
								src/Interop/Async/Promise.php
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										20
									
								
								src/Interop/Async/Promise.php
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,20 @@
 | 
			
		||||
<?php
 | 
			
		||||
 | 
			
		||||
namespace NoccyLabs\Ipc\Interop\Async;
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class Promise
 | 
			
		||||
{
 | 
			
		||||
    private $then;
 | 
			
		||||
 | 
			
		||||
    public function then(callable $then)
 | 
			
		||||
    {
 | 
			
		||||
        $this->then = $then;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    public function invoke(...$args)
 | 
			
		||||
    {
 | 
			
		||||
        call_user_func($this->then, ...$args);
 | 
			
		||||
    }
 | 
			
		||||
    
 | 
			
		||||
}
 | 
			
		||||
@@ -23,7 +23,6 @@ class Timer
 | 
			
		||||
    public function __construct(callable $callback)
 | 
			
		||||
    {
 | 
			
		||||
        $this->callback = $callback;
 | 
			
		||||
        $this->seconds = $seconds;
 | 
			
		||||
        self::registerTimer($this->callback);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
							
								
								
									
										104
									
								
								src/Interop/Bus/Client.php
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										104
									
								
								src/Interop/Bus/Client.php
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,104 @@
 | 
			
		||||
<?php
 | 
			
		||||
 | 
			
		||||
namespace NoccyLabs\Ipc\Interop\Bus;
 | 
			
		||||
 | 
			
		||||
use NoccyLabs\Ipc\Interop\Channel\ChannelInterface;
 | 
			
		||||
use NoccyLabs\Ipc\Interop\Channel\MultiStreamChannel;
 | 
			
		||||
use NoccyLabs\Ipc\Interop\Async\Timer;
 | 
			
		||||
use NoccyLabs\Ipc\Interop\Async\Promise;
 | 
			
		||||
 | 
			
		||||
class Client
 | 
			
		||||
{
 | 
			
		||||
    /** @var ChannelInterface $channel */
 | 
			
		||||
    protected $channel;
 | 
			
		||||
 | 
			
		||||
    protected $listeners = [];
 | 
			
		||||
 | 
			
		||||
    protected $promises = [];
 | 
			
		||||
 | 
			
		||||
    protected $exports = [];
 | 
			
		||||
 | 
			
		||||
    /**
 | 
			
		||||
     * Constructor
 | 
			
		||||
     */
 | 
			
		||||
    public function __construct(ChannelInterface $channel)
 | 
			
		||||
    {
 | 
			
		||||
        $this->channel = $channel;
 | 
			
		||||
        $this->timer = new Timer([ $this, "update" ]);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    public function update()
 | 
			
		||||
    {
 | 
			
		||||
        while ($msg = $this->channel->receive()) {
 | 
			
		||||
            if (!array_key_exists('op', $msg)) {
 | 
			
		||||
                fprintf(STDERR, "Warning: Frame missing op -- %s\n", json_encode($msg));
 | 
			
		||||
                continue;
 | 
			
		||||
            }
 | 
			
		||||
            switch ($msg['op']) {
 | 
			
		||||
                case 'retn':
 | 
			
		||||
                    $id = $msg['id'];
 | 
			
		||||
                    if (!array_key_exists($id, $this->promises)) {
 | 
			
		||||
                        continue;
 | 
			
		||||
                    }
 | 
			
		||||
                    $promise = $this->promises[$id];
 | 
			
		||||
                    $promise->invoke($msg['ret']);
 | 
			
		||||
                    unset($this->promises[$id]);
 | 
			
		||||
                    continue;
 | 
			
		||||
                case 'call':
 | 
			
		||||
                    $id = $msg['id'];
 | 
			
		||||
                    $obj = $msg['obj'];
 | 
			
		||||
                    if (!array_key_exists($obj, $this->exports)) {
 | 
			
		||||
                        continue;
 | 
			
		||||
                    }
 | 
			
		||||
                    $ret = call_user_func($this->exports[$obj], ...$msg['arg']);
 | 
			
		||||
                    $frame = [
 | 
			
		||||
                        'op' => 'retn',
 | 
			
		||||
                        'id' => $id,
 | 
			
		||||
                        'ret' => $ret
 | 
			
		||||
                    ];
 | 
			
		||||
                    $this->channel->send($frame);
 | 
			
		||||
                    continue;
 | 
			
		||||
                case 'msg':
 | 
			
		||||
                default:
 | 
			
		||||
                    foreach ($this->listeners as $listener) {
 | 
			
		||||
                        call_user_func($listener, $msg);
 | 
			
		||||
                    }
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    public function addListener(callable $handler)
 | 
			
		||||
    {
 | 
			
		||||
        $this->listeners[] = $handler;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    public function export($path, callable $function)
 | 
			
		||||
    {
 | 
			
		||||
        $this->exports[$path] = $function;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    public function send($data)
 | 
			
		||||
    {
 | 
			
		||||
        $frame = [
 | 
			
		||||
            'op' => 'msg',
 | 
			
		||||
            'msg' => $data
 | 
			
		||||
        ];
 | 
			
		||||
        $this->channel->send($frame);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    public function call($path, ...$args)
 | 
			
		||||
    {
 | 
			
		||||
        $promise = new Promise();
 | 
			
		||||
 | 
			
		||||
        $rid = uniqid(md5($path),true);
 | 
			
		||||
        $frame = [
 | 
			
		||||
            'op' => 'call',
 | 
			
		||||
            'id' => $rid,
 | 
			
		||||
            'obj' => $path,
 | 
			
		||||
            'arg' => $args,
 | 
			
		||||
        ];
 | 
			
		||||
        $this->promises[$rid] = $promise;
 | 
			
		||||
        $this->channel->send($frame);
 | 
			
		||||
        return $promise;
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
							
								
								
									
										40
									
								
								src/Interop/Bus/Master.php
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										40
									
								
								src/Interop/Bus/Master.php
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,40 @@
 | 
			
		||||
<?php
 | 
			
		||||
 | 
			
		||||
namespace NoccyLabs\Ipc\Interop\Bus;
 | 
			
		||||
 | 
			
		||||
use NoccyLabs\Ipc\Interop\Channel\ChannelInterface;
 | 
			
		||||
use NoccyLabs\Ipc\Interop\Channel\MultiStreamChannel;
 | 
			
		||||
use NoccyLabs\Ipc\Interop\Async\Timer;
 | 
			
		||||
 | 
			
		||||
class Master
 | 
			
		||||
{
 | 
			
		||||
    /** @var ChannelInterface $channel */
 | 
			
		||||
    protected $channel;
 | 
			
		||||
 | 
			
		||||
    /**
 | 
			
		||||
     * Constructor
 | 
			
		||||
     */
 | 
			
		||||
    public function __construct()
 | 
			
		||||
    {
 | 
			
		||||
        $this->channel = new MultiStreamChannel();
 | 
			
		||||
 | 
			
		||||
        $this->timer = new Timer([ $this, "update" ]);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    public function createClient() : Client
 | 
			
		||||
    {
 | 
			
		||||
        /** @var ChannelInterface $channel */
 | 
			
		||||
        $channel = $this->channel->createClient();
 | 
			
		||||
 | 
			
		||||
        $client = new Client($channel);
 | 
			
		||||
        return $client;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    public function update()
 | 
			
		||||
    {
 | 
			
		||||
        while ($msg = $this->channel->receive()) {
 | 
			
		||||
            $this->channel->send($msg);
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
@@ -20,10 +20,10 @@ interface ChannelInterface
 | 
			
		||||
    public function receive();
 | 
			
		||||
 | 
			
		||||
    /**
 | 
			
		||||
     * Send a frame of data
 | 
			
		||||
     * Send a frame of data, returns true on success
 | 
			
		||||
     *
 | 
			
		||||
     * @param mixed $data
 | 
			
		||||
     * @return void
 | 
			
		||||
     * @return bool
 | 
			
		||||
     */
 | 
			
		||||
    public function send($data);
 | 
			
		||||
    public function send($data):bool;
 | 
			
		||||
}
 | 
			
		||||
@@ -40,13 +40,16 @@ class MultiStreamChannel implements ChannelInterface
 | 
			
		||||
     * {@inheritDoc}
 | 
			
		||||
     *
 | 
			
		||||
     * @param mixed $data
 | 
			
		||||
     * @return void
 | 
			
		||||
     * @return bool
 | 
			
		||||
     */
 | 
			
		||||
    public function send($data)
 | 
			
		||||
    public function send($data):bool
 | 
			
		||||
    {
 | 
			
		||||
        foreach ($this->clients as $client) {
 | 
			
		||||
            $client->send($data);
 | 
			
		||||
        foreach ($this->clients as $i=>$client) {
 | 
			
		||||
            if (false === $client->send($data)) {
 | 
			
		||||
                unset($this->clients[$i]);
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
        return true;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /**
 | 
			
		||||
 
 | 
			
		||||
@@ -36,11 +36,16 @@ class StreamChannel implements ChannelInterface
 | 
			
		||||
     * {@inheritDoc}
 | 
			
		||||
     *
 | 
			
		||||
     * @param mixed $data
 | 
			
		||||
     * @return void
 | 
			
		||||
     * @return bool
 | 
			
		||||
     */
 | 
			
		||||
    public function send($data)
 | 
			
		||||
    public function send($data):bool
 | 
			
		||||
    {
 | 
			
		||||
        fwrite($this->stream, json_encode($data)."\0");
 | 
			
		||||
        $json = json_encode($data) . "\0";
 | 
			
		||||
        $ret = @fwrite($this->stream, $json, strlen($json));
 | 
			
		||||
        if (false === $ret) {
 | 
			
		||||
            return false;
 | 
			
		||||
        }
 | 
			
		||||
        return true;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /**
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user