From 00d2bead8ea94c961ce1a8628ce5dd51f4a082bb Mon Sep 17 00:00:00 2001 From: Christopher Vagnetoft Date: Mon, 16 Apr 2018 01:54:30 +0200 Subject: [PATCH] Added MultiStreamChannel to handle multiple endpoints --- examples/multichannels.php | 39 +++++++++++ src/Interop/Channel/MultiStreamChannel.php | 67 +++++++++++++++++++ src/Interop/Channel/StreamChannel.php | 7 +- .../Channel/MultiStreamChannelTest.php | 44 ++++++++++++ 4 files changed, 156 insertions(+), 1 deletion(-) create mode 100644 examples/multichannels.php create mode 100644 src/Interop/Channel/MultiStreamChannel.php create mode 100644 tests/Interop/Channel/MultiStreamChannelTest.php diff --git a/examples/multichannels.php b/examples/multichannels.php new file mode 100644 index 0000000..fd8ba42 --- /dev/null +++ b/examples/multichannels.php @@ -0,0 +1,39 @@ +createClient(); +$pid1 = pcntl_fork(); +if ($pid1 === 0) { + sleep(1); + echo "child 1 received ".$channel1->receive()."\n"; + $channel1->send("Hello from child 1"); + sleep(5); + exit; +} + +// Create another child to report back in 4 seconds +$channel2 = $channel->createClient(); +$pid2 = pcntl_fork(); +if ($pid2 === 0) { + sleep(1); + echo "child 2 received ".$channel2->receive()."\n"; + $channel2->send("Hello from child 2"); + sleep(5); + exit; +} + +// Writing to the master works whether any clients have been added +$channel->send("Hello from master"); + +// Wait for 5 seconds and dump whatever messages are waiting +sleep(5); +while ($msg = $channel->receive()) { + echo "master received ".$msg."\n"; +} \ No newline at end of file diff --git a/src/Interop/Channel/MultiStreamChannel.php b/src/Interop/Channel/MultiStreamChannel.php new file mode 100644 index 0000000..c6f2231 --- /dev/null +++ b/src/Interop/Channel/MultiStreamChannel.php @@ -0,0 +1,67 @@ +clients[] = $a; + return $b; + } + + /** + * {@inheritDoc} + * + * @return boolean + */ + public function isOpen(): bool + { + return true; + } + + /** + * {@inheritDoc} + * + * @param mixed $data + * @return void + */ + public function send($data) + { + foreach ($this->clients as $client) { + $client->send($data); + } + } + + /** + * {@inheritDoc} + * + * @return mixed + */ + public function receive() + { + foreach ($this->clients as $client) { + if ($read = $client->receive()) { + return $read; + } + } + return false; + } + +} \ No newline at end of file diff --git a/src/Interop/Channel/StreamChannel.php b/src/Interop/Channel/StreamChannel.php index 180c70c..4ce3b7d 100644 --- a/src/Interop/Channel/StreamChannel.php +++ b/src/Interop/Channel/StreamChannel.php @@ -28,6 +28,8 @@ class StreamChannel implements ChannelInterface } $this->stream = $stream; + + stream_set_blocking($stream, false); } /** @@ -49,8 +51,11 @@ class StreamChannel implements ChannelInterface public function receive() { $buf = fread($this->stream, 8192); + if (!$buf) { + return false; + } - return json_decode(rtrim($buf, "\0")); + return json_decode(rtrim($buf, "\0"), true); } /** diff --git a/tests/Interop/Channel/MultiStreamChannelTest.php b/tests/Interop/Channel/MultiStreamChannelTest.php new file mode 100644 index 0000000..d896684 --- /dev/null +++ b/tests/Interop/Channel/MultiStreamChannelTest.php @@ -0,0 +1,44 @@ +createClient(); + $client2 = $master->createClient(); + $client3 = $master->createClient(); + + $master->send("Hello World"); + + $this->assertEquals("Hello World", $client1->receive()); + $this->assertEquals("Hello World", $client2->receive()); + $this->assertEquals("Hello World", $client3->receive()); + + } + + public function testReadingFromMultipleClients() + { + $master = new MultiStreamChannel(); + + $client1 = $master->createClient(); + $client2 = $master->createClient(); + $client3 = $master->createClient(); + + $client1->send("a"); + $client2->send("b"); + $client3->send("c"); + + $this->assertEquals("a", $master->receive()); + $this->assertEquals("b", $master->receive()); + $this->assertEquals("c", $master->receive()); + + } + +} \ No newline at end of file