Added MultiStreamChannel to handle multiple endpoints

This commit is contained in:
Chris 2018-04-16 01:54:30 +02:00
parent d6aacd841f
commit 00d2bead8e
4 changed files with 156 additions and 1 deletions

View File

@ -0,0 +1,39 @@
<?php
require_once __DIR__."/../vendor/autoload.php";
use NoccyLabs\Ipc\Interop\Channel\MultiStreamChannel;
// It really is this simple
$channel = new MultiStreamChannel();
// Create a child to report back in 3 seconds
$channel1 = $channel->createClient();
$pid1 = pcntl_fork();
if ($pid1 === 0) {
sleep(1);
echo "child 1 received ".$channel1->receive()."\n";
$channel1->send("Hello from child 1");
sleep(5);
exit;
}
// Create another child to report back in 4 seconds
$channel2 = $channel->createClient();
$pid2 = pcntl_fork();
if ($pid2 === 0) {
sleep(1);
echo "child 2 received ".$channel2->receive()."\n";
$channel2->send("Hello from child 2");
sleep(5);
exit;
}
// Writing to the master works whether any clients have been added
$channel->send("Hello from master");
// Wait for 5 seconds and dump whatever messages are waiting
sleep(5);
while ($msg = $channel->receive()) {
echo "master received ".$msg."\n";
}

View File

@ -0,0 +1,67 @@
<?php
namespace NoccyLabs\Ipc\Interop\Channel;
/**
* The MultiChannel is used to create a single channel and allow other threads
* to each grab an end of it. Think for example a web server that could write
* log output to a channel (as it´the MultiStreamChannel is always open); one
* thread could grab a channel to dump it to the console, and another thread
* could write to a logfile.
*/
class MultiStreamChannel implements ChannelInterface
{
protected $clients = [];
/**
* Create a new client to receive data sent to the channel
*
* @return StreamChannel
*/
public function createClient(): StreamChannel
{
[ $a, $b ] = StreamChannel::createPair();
$this->clients[] = $a;
return $b;
}
/**
* {@inheritDoc}
*
* @return boolean
*/
public function isOpen(): bool
{
return true;
}
/**
* {@inheritDoc}
*
* @param mixed $data
* @return void
*/
public function send($data)
{
foreach ($this->clients as $client) {
$client->send($data);
}
}
/**
* {@inheritDoc}
*
* @return mixed
*/
public function receive()
{
foreach ($this->clients as $client) {
if ($read = $client->receive()) {
return $read;
}
}
return false;
}
}

View File

@ -28,6 +28,8 @@ class StreamChannel implements ChannelInterface
} }
$this->stream = $stream; $this->stream = $stream;
stream_set_blocking($stream, false);
} }
/** /**
@ -49,8 +51,11 @@ class StreamChannel implements ChannelInterface
public function receive() public function receive()
{ {
$buf = fread($this->stream, 8192); $buf = fread($this->stream, 8192);
if (!$buf) {
return false;
}
return json_decode(rtrim($buf, "\0")); return json_decode(rtrim($buf, "\0"), true);
} }
/** /**

View File

@ -0,0 +1,44 @@
<?php
namespace NoccyLabs\Ipc\Interop\Channel;
class MultiStreamChannelTest extends \PhpUnit\Framework\TestCase
{
public function testCreatingMultipleClients()
{
$master = new MultiStreamChannel();
$client1 = $master->createClient();
$client2 = $master->createClient();
$client3 = $master->createClient();
$master->send("Hello World");
$this->assertEquals("Hello World", $client1->receive());
$this->assertEquals("Hello World", $client2->receive());
$this->assertEquals("Hello World", $client3->receive());
}
public function testReadingFromMultipleClients()
{
$master = new MultiStreamChannel();
$client1 = $master->createClient();
$client2 = $master->createClient();
$client3 = $master->createClient();
$client1->send("a");
$client2->send("b");
$client3->send("c");
$this->assertEquals("a", $master->receive());
$this->assertEquals("b", $master->receive());
$this->assertEquals("c", $master->receive());
}
}