Compare commits

...

14 Commits

Author SHA1 Message Date
Chris c855f92ecd Added __invoke() to SignalTrap and fixed phpunit xml 2020-06-06 23:00:48 +02:00
Chris e8c4a85486 Removed the bus interop code for now 2018-04-16 13:14:04 +02:00
Chris f8b43beaf5 Added multichannels and a simple channel-based bus 2018-04-16 02:52:51 +02:00
Chris 00d2bead8e Added MultiStreamChannel to handle multiple endpoints 2018-04-16 01:54:30 +02:00
Chris d6aacd841f Updated FileKey with setters 2018-04-16 00:41:13 +02:00
Chris 31634c5054 Improved quality of docblock comments 2018-04-15 22:12:35 +02:00
Chris 20b6f1d42c Fixed typo in const in readme 2018-04-15 20:55:30 +02:00
Chris c9c4a8aa78 Updated readme 2018-04-15 20:52:49 +02:00
Chris 4652e464b9 Semaphores fully implemented 2018-04-15 20:48:22 +02:00
Chris 612cfc2035 Bugfixed SharedMemoryBlock, added tests 2018-04-15 20:31:35 +02:00
Chris dcb059e4be Bugfixes
* Queue now has a destroy() method
* Examples updated
2018-04-15 18:50:19 +02:00
Chris 4e8ab73a88 Updated readme again 2018-04-15 18:39:10 +02:00
Chris 2652480c5d Updated readme 2018-04-15 18:27:51 +02:00
Chris 176f9aa5ec More examples and tweaks
* Signal dispatch() method now uses own pid as default value
* SharedData now updates checksum cache on set()
2018-04-15 17:25:55 +02:00
28 changed files with 745 additions and 50 deletions

1
.gitignore vendored
View File

@ -1,2 +1,3 @@
/vendor/
/composer.lock
/doc

176
README.md
View File

@ -2,26 +2,168 @@ noccylabs/ipc
=============
This is a one-size-fits-all IPC library to facilitate communication between
threads and processes. It contains the following features:
threads and processes.
**Core:**
For complete examples, see the `examples` directory in the source tree.
- [ ] Semaphore
- [ ] Mutex
- [x] Queue
- [x] SharedMemory key-value store
- [ ] SharedMemory blocks
- [x] Signals
- [x] Locks
## Signals
**High-Level:**
Asynchronous signals are automatically enabled if supported. Otherwise, the
`pcntl_signal_dispatch()` method must be frequently called from your main loop.
You can test for this using the `SIGNALS_ASYNC` constant:
- [ ] EventBridge
- [ ] EventDispatcher
- [ ] InterOp/Marshalling
- [x] Asynchronous timers
if (!SIGNALS_ASYNC) {
pcntl_signal_dispatch();
}
**Transports:**
### Signal handlers
Signal handlers allow for multiple listeners, with any one of them being able to
prevent the signal from bubbling up.
$handler = new SignalHandler(SIGUSR1);
$handler->addHandler(function () {
// Handle SIGUSR1, return true to stop bubbling
return true;
});
You can also handle as well as fire signals using the `Signal` class:
$signal = new Signal(SIGUSR1);
$signal->setHandler(function () {
// Handle SIGUSR1
});
// Dispatch the signal to ourselves
(new Signal(SIGUSR1))->dispatch($pid);
### Signal traps
Traps are used in the main loop to break on signals
$trap = new SignalTrap(SIGINT);
while (!$trap->isTrapped()) {
// ...
}
### Timers
Timers fire asynchronously at fixed 1 second intervals. It requires signals to be
processed; see above.
// Once every second...
$timer = new Timer(function () {
echo ".";
});
## File locks
File locks uses a shared file as a resource for locking.
// Creating the lock will not acquire it
$lock = new FileLock(__FILE__);
if (!$lock->acquire()) {
echo "fail!\n";
} else {
$lock->release();
}
## SysV wrappers
All these wrappers depend on a `KeyInterface` being passed to the constructor.
This is usually an instance of a `FileKey`, created as such:
$key = new FileKey(__FILE__);
The key has a project identifier that starts at `chr(0)`, or `"\0"`. To increase
this identifier, and thus point to another segment, just clone it.
$key2 = clone $key1;
### Semaphores
Semaphores are created with a key and a max count.
$key = new FileKey(__FILE__);
$sem = new Semaphore($key, 2);
$sem->allocate(); // -> true
$sem->allocate(); // -> true
$sem->allocate(); // -> false
$sem->release();
$mutex->destroy();
### Mutexes
A mutex is a semaphore with a max count of 1.
$key = new FileKey(__FILE__);
$mutex = new Mutex($key);
$mutex->allocate(); // -> true
$mutex->allocate(); // -> false
$mutex->release();
$mutex->destroy();
### Message Queues
$key = new FileKey(__FILE__);
$msgq = new Queue($key);
$msgq->send(1, [ "Some data", [ "format"=>"foo" ]]);
$data = $msgq->receive(1, $type);
$msgq->destroy();
### Shared Memory
Shared memory using `SharedData` supports integrity checking when setting, using
the third parameter to `set()`.
$key = new FileKey(__FILE__);
$shm = new SharedData($key);
do {
$counter = $shm->get("counter") + 1;
} while (!$shm->set("counter", $counter, true));
$shm->destroy();
The `SharedMemory` class is a simple integer-indexed array
$key = new FileKey(__FILE__);
$shm = new SharedMemory($key);
$shm[0] = 42;
$shm->destroy();
## Communication
### Channels
Channels are essentially connected pipes. A channel can be created with a stream resource,
or through the `createPair()` factory method.
[ $ch1, $ch2 ] = StreamChannel::createPair();
$ch1->send($data);
$rcvd = $ch2->receive();
### MultiChannels
The `MultiStreamChannel` lets you connect multiple clients to a single master channel.
$master = new MultiStreamChannel();
$ch1 = $master->createClient();
$ch2 = $master->createClient();
$ch1->send($data);
$rcvd = $ch2->receive();
- [x] Stream channels

View File

@ -9,6 +9,12 @@
"email": "cvagnetoft@gmail.com"
}
],
"keywords": [
"ipc",
"shm",
"msgqueue",
"signals"
],
"require": {},
"repositories": [
{
@ -24,4 +30,4 @@
"NoccyLabs\\Ipc\\": "src/"
}
}
}
}

16
examples/channels.php Normal file
View File

@ -0,0 +1,16 @@
<?php
require_once __DIR__."/../vendor/autoload.php";
use NoccyLabs\Ipc\Interop\Channel\StreamChannel;
// Create a pair of channels.
[ $ch1, $ch2 ] = StreamChannel::createPair();
// Send messages with send()
$ch1->send([
'msg' => "Hello World"
]);
// Receive at the other end using receive()
print_r($ch2->receive());

43
examples/locks.php Normal file
View File

@ -0,0 +1,43 @@
<?php
require_once __DIR__."/../vendor/autoload.php";
use NoccyLabs\Ipc\Lock\FileLock;
// Creating the lock will not acquire it
$lock1 = new FileLock(__FILE__);
$lock2 = new FileLock(__FILE__);
// Acquire the lock like this
if ($lock1->acquire()) {
echo "lock1 acquired\n";
} else {
echo "lock1 not acquired\n";
}
// Test the locks like this
if ($lock1->isLocked()) {
echo "lock1 is locked\n";
} else {
echo "lock1 is not locked\n";
}
if ($lock2->isLocked()) {
echo "lock2 is locked\n";
} else {
echo "lock2 is not locked\n";
}
// This will timeout after a second
if ($lock2->acquire(1)) {
echo "lock2 acquired\n";
} else {
echo "lock2 not acquired\n";
}
// After releasing, you can acquire it
$lock1->release();
if ($lock2->acquire(1)) {
echo "lock2 acquired\n";
} else {
echo "lock2 not acquired\n";
}

View File

@ -0,0 +1,39 @@
<?php
require_once __DIR__."/../vendor/autoload.php";
use NoccyLabs\Ipc\Interop\Channel\MultiStreamChannel;
// It really is this simple
$channel = new MultiStreamChannel();
// Create a child to report back in 3 seconds
$channel1 = $channel->createClient();
$pid1 = pcntl_fork();
if ($pid1 === 0) {
sleep(1);
echo "child 1 received ".$channel1->receive()."\n";
$channel1->send("Hello from child 1");
sleep(5);
exit;
}
// Create another child to report back in 4 seconds
$channel2 = $channel->createClient();
$pid2 = pcntl_fork();
if ($pid2 === 0) {
sleep(1);
echo "child 2 received ".$channel2->receive()."\n";
$channel2->send("Hello from child 2");
sleep(5);
exit;
}
// Writing to the master works whether any clients have been added
$channel->send("Hello from master");
// Wait for 5 seconds and dump whatever messages are waiting
sleep(5);
while ($msg = $channel->receive()) {
echo "master received ".$msg."\n";
}

26
examples/queues.php Normal file
View File

@ -0,0 +1,26 @@
<?php
require_once __DIR__."/../vendor/autoload.php";
use NoccyLabs\Ipc\Key\FileKey;
use NoccyLabs\Ipc\Msg\Queue;
$key = new FileKey(__FILE__);
$msgq = new Queue($key);
// Send packages with msgtype >= 1...
$msgq->send(1, [ "what"=>"First" ]);
$msgq->send(2, [ "what"=>"Second" ]);
$msgq->send(3, [ "what"=>"Third" ]);
// Read messages by requesting a type...
$msg = $msgq->receive(2, $type);
printf("msg: %s, type: %d\n", json_encode($msg), $type);
// ...or read the first message with type 0...
$msg = $msgq->receive(0, $type);
printf("msg: %s, type: %d\n", json_encode($msg), $type);
$msg = $msgq->receive(0, $type);
printf("msg: %s, type: %d\n", json_encode($msg), $type);
$msgq->destroy();

21
examples/semaphores.php Normal file
View File

@ -0,0 +1,21 @@
<?php
require_once __DIR__."/../vendor/autoload.php";
use NoccyLabs\Ipc\Key\FileKey;
use NoccyLabs\Ipc\Sem\Semaphore;
$key = new FileKey(__FILE__);
// Create semaphore with max count of 2
$sem1 = new Semaphore($key, 2);
$sem2 = new Semaphore($key, 2);
$sem3 = new Semaphore($key, 2);
printf("sem1 acquire: %d\n", $sem1->acquire());
printf("sem2 acquire: %d\n", $sem2->acquire());
printf("sem3 acquire: %d\n", $sem3->acquire());
printf("sem2 release: %d\n", $sem2->release());
printf("sem3 acquire: %d\n", $sem3->acquire());
$sem1->destroy();

35
examples/shmdata.php Normal file
View File

@ -0,0 +1,35 @@
<?php
require_once __DIR__."/../vendor/autoload.php";
use NoccyLabs\Ipc\Shm\SharedData;
use NoccyLabs\Ipc\Key\FileKey;
$key = new FileKey(__FILE__);
$shm = new SharedData($key);
// Set works as expected
$shm->set("some.key", "Some value");
// As does get
echo $shm->get("some.key")."\n";
// To make sure a value isn't modified while you are modifying it, use the third
// parameter to set...
//
// Let's start at 123
$shm->set("some.counter", 123);
$counter = $shm->get("some.counter");
echo "Counter is: ".$counter."\n";
// And attempt to increase it
$shm->set("some.counter", $counter + 1, true);
$counter = $shm->get("some.counter");
echo "Counter is: ".$counter."\n";
// If the value is modified, the call will fail
$shm2 = new SharedData($key);
$shm2->set("some.counter", 345);
if (!$shm->set("some.counter", $counter + 1, true)) {
echo "some.counter has been modified since last read\n";
}
$shm->destroy();

29
examples/signals.php Normal file
View File

@ -0,0 +1,29 @@
<?php
require_once __DIR__."/../vendor/autoload.php";
use NoccyLabs\Ipc\Signal\SignalHandler;
use NoccyLabs\Ipc\Signal\Signal;
$handler = new SignalHandler(SIGUSR1);
// Add handlers like this, or as an array passed as the second argument to the constructor
$handler->addHandler(function () {
echo "First handler\n";
});
// If a handler returns true, it will prevent any further handlers from firing
$handler->addHandler(function () {
echo "Second and final handler\n";
return true;
});
// Thus, this one will not be called
$handler->addHandler(function() {
echo "Third handler, never called\n";
});
// Dispatch the signal to ourselves
(new Signal(SIGUSR1))->dispatch(posix_getpid());
// Default target of dispatch is the own pid
(new Signal(SIGUSR1))->dispatch();

23
examples/signaltraps.php Normal file
View File

@ -0,0 +1,23 @@
<?php
require_once __DIR__."/../vendor/autoload.php";
use NoccyLabs\Ipc\Signal\SignalTrap;
$trap = new SignalTrap(SIGINT);
echo "Press ctrl-c...\n";
while (!$trap->isTrapped(true)) {
usleep(10000);
}
echo "Thanks!\n";
echo "And once more...\n";
while (!$trap(true)) {
usleep(10000);
}
echo "Thanks!\n";

View File

@ -8,7 +8,7 @@
beStrictAboutTestsThatDoNotTestAnything="true"
beStrictAboutTodoAnnotatedTests="true"
verbose="true">
<testsuite>
<testsuite name="default">
<directory suffix="Test.php">tests</directory>
</testsuite>

View File

@ -23,7 +23,6 @@ class Timer
public function __construct(callable $callback)
{
$this->callback = $callback;
$this->seconds = $seconds;
self::registerTimer($this->callback);
}

View File

@ -5,9 +5,25 @@ namespace NoccyLabs\Ipc\Interop\Channel;
interface ChannelInterface
{
/**
* Check if the channel is open
*
* @return boolean
*/
public function isOpen():bool;
/**
* Receive (and unserialize) a frame of data.
*
* @return mixed
*/
public function receive();
public function send($data);
/**
* Send a frame of data, returns true on success
*
* @param mixed $data
* @return bool
*/
public function send($data):bool;
}

View File

@ -0,0 +1,70 @@
<?php
namespace NoccyLabs\Ipc\Interop\Channel;
/**
* The MultiChannel is used to create a single channel and allow other threads
* to each grab an end of it. Think for example a web server that could write
* log output to a channel (as it´the MultiStreamChannel is always open); one
* thread could grab a channel to dump it to the console, and another thread
* could write to a logfile.
*/
class MultiStreamChannel implements ChannelInterface
{
protected $clients = [];
/**
* Create a new client to receive data sent to the channel
*
* @return StreamChannel
*/
public function createClient(): StreamChannel
{
[ $a, $b ] = StreamChannel::createPair();
$this->clients[] = $a;
return $b;
}
/**
* {@inheritDoc}
*
* @return boolean
*/
public function isOpen(): bool
{
return true;
}
/**
* {@inheritDoc}
*
* @param mixed $data
* @return bool
*/
public function send($data):bool
{
foreach ($this->clients as $i=>$client) {
if (false === $client->send($data)) {
unset($this->clients[$i]);
}
}
return true;
}
/**
* {@inheritDoc}
*
* @return mixed
*/
public function receive()
{
foreach ($this->clients as $client) {
if ($read = $client->receive()) {
return $read;
}
}
return false;
}
}

View File

@ -2,11 +2,19 @@
namespace NoccyLabs\Ipc\Interop\Channel;
/**
* Channel based on streams
*
*/
class StreamChannel implements ChannelInterface
{
protected $stream;
/**
* Constructor
*
* @param resource|string $stream
*/
public function __construct($stream)
{
if (!is_resource($stream)) {
@ -21,26 +29,55 @@ class StreamChannel implements ChannelInterface
$this->stream = $stream;
$this->isOpen();
stream_set_blocking($stream, false);
}
public function send($data)
/**
* {@inheritDoc}
*
* @param mixed $data
* @return bool
*/
public function send($data):bool
{
fwrite($this->stream, json_encode($data)."\0");
$json = json_encode($data) . "\0";
$ret = @fwrite($this->stream, $json, strlen($json));
if (false === $ret) {
return false;
}
return true;
}
/**
* {@inheritDoc}
*
* @return mixed
*/
public function receive()
{
$buf = fread($this->stream, 8192);
if (!$buf) {
return false;
}
return json_decode(rtrim($buf, "\0"));
return json_decode(rtrim($buf, "\0"), true);
}
/**
* {@inheritDoc}
*
* @return boolean
*/
public function isOpen(): bool
{
return is_resource($this->stream);
}
/**
* Create a pair of connected StreamChannel instances
*
* @return StreamChannel[]
*/
public static function createPair()
{
$fd = stream_socket_pair(STREAM_PF_UNIX, STREAM_SOCK_STREAM, STREAM_IPPROTO_IP);

View File

@ -15,20 +15,10 @@ class FileKey implements KeyInterface
* @param string $proj
* @param boolean $create
*/
public function __construct(string $pathname, $proj="\0", bool $create=false)
public function __construct(string $pathname, $proj="\0", bool $create = false)
{
if (!file_exists($pathname)) {
if (!$create) {
throw new \RuntimeException("Path does not exist: {$pathname}");
}
if (!is_dir(dirname($pathname))) {
throw new \RuntimeException("The directory ".dirname($pathname)." does not exist");
}
touch($pathname);
}
$this->pathname = $pathname;
$this->proj = substr($proj,0,1);
$this->setPathname($pathname, $create);
$this->setProjectIdentifier($proj);
}
/**
@ -41,6 +31,20 @@ class FileKey implements KeyInterface
return $this->pathname;
}
public function setPathname(string $pathname, bool $create = false)
{
if (!file_exists($pathname)) {
if (!$create) {
throw new \RuntimeException("Path does not exist: {$pathname}");
}
if (!is_dir(dirname($pathname))) {
throw new \RuntimeException("The directory ".dirname($pathname)." does not exist");
}
touch($pathname);
}
$this->pathname = $pathname;
}
/**
* Get the project identifier. Not guaranteed to be printable
*
@ -51,6 +55,17 @@ class FileKey implements KeyInterface
return $this->proj;
}
/**
* Set the project identifier
*
* @param string $proj
* @return void
*/
public function setProjectIdentifier(string $proj)
{
$this->proj = substr($proj, 0, 1);
}
/**
* Get the key value
*

View File

@ -4,6 +4,10 @@ namespace NoccyLabs\Ipc\Key;
interface KeyInterface
{
/**
* Get the integer key used for SysV ipc operations
*
* @return integer
*/
public function getKey():int;
public function __toString();
}

View File

@ -32,6 +32,18 @@ class Queue
}
/**
* Destroy the queue
*
* @return void
*/
public function destroy()
{
if ($this->resource) {
msg_remove_queue($this->resource);
}
}
/**
* Send to the queue
*

View File

@ -8,18 +8,25 @@ use NoccyLabs\Ipc\Key\KeyInterface;
class Semaphore
{
public function __construct(KeyInterface $key, int $max)
{
protected $resource;
public function __construct(KeyInterface $key, int $max, $perm = 0660, $autorelease = 1)
{
$this->resource = sem_get($key->getKey(), $max, $perm, $autorelease);
}
public function acquire(float $timeout)
public function destroy()
{
sem_remove($this->resource);
}
public function release()
public function acquire(bool $wait = false):bool
{
return sem_acquire($this->resource, !$wait);
}
public function release():bool
{
return sem_release($this->resource);
}
}

View File

@ -113,6 +113,7 @@ class SharedData extends SharedMemory
// Update the data
$this[self::IDX_DATA + $index] = [ md5($value), $value ];
$this->checks[$key] = md5($value);
$this->unlock();
return true;
}

View File

@ -29,7 +29,7 @@ class SharedMemoryBlock
*/
public function __construct(KeyInterface $key, string $flags, int $memsize, int $perm = 0666)
{
if (!($shm_resource = shmop_open($key->getKey(), $memsize, $perm))) {
if (!($shm_resource = shmop_open($key->getKey(), $flags, $perm, $memsize))) {
throw new \RuntimeException("Unable to attach shm resource {$key}");
}

View File

@ -7,18 +7,35 @@ class Signal
{
private $signo;
/**
* Constructor
*
* @param integer $signo
*/
public function __construct(int $signo)
{
$this->signo = $signo;
}
/**
* Set a signal handler, overwriting any previous handler
*
* @param callable $handler
* @return void
*/
public function setHandler(callable $handler):void
{
pcntl_signal($this->signo, $handler);
}
public function dispatch($pid):bool
/**
* Dispatch the signal to the specified pid. Default is own pid
*
* @param int $pid
* @return boolean
*/
public function dispatch(int $pid=null):bool
{
return posix_kill($pid, $this->signo);
return posix_kill($pid?:posix_getpid(), $this->signo);
}
}

View File

@ -9,6 +9,13 @@ class SignalHandler
private $callbacks;
/**
* Constructor, should only be called once for each signal but can have
* multiple handlers attached.
*
* @param integer $signo
* @param array $callbacks
*/
public function __construct(int $signo, array $callbacks=[])
{
$this->signo = $signo;
@ -21,11 +28,23 @@ class SignalHandler
}
/**
* Append a handler to the signal
*
* @param callable $handler
* @return void
*/
public function addHandler(callable $handler):void
{
$this->callbacks[] = $handler;
}
/**
* Callback for signals
*
* @param int $signo
* @return void
*/
public function onSignal($signo)
{
foreach ($this->callbacks as $callback) {

View File

@ -9,17 +9,33 @@ class SignalTrap
protected $trapped = false;
/**
* Constructor
*
* @param integer $signo
*/
public function __construct(int $signo)
{
$this->signal = new Signal($signo);
$this->signal->setHandler([ $this, "onSignal" ]);
}
/**
* Signal handler callback
*
* @return void
*/
public function onSignal()
{
$this->trapped = true;
}
/**
* Check if the signal has been received
*
* @param boolean $reset
* @return boolean
*/
public function isTrapped($reset=true):bool
{
if ($this->trapped) {
@ -29,4 +45,9 @@ class SignalTrap
return false;
}
}
public function __invoke($reset=true):bool
{
return $this->isTrapped($reset);
}
}

View File

@ -0,0 +1,44 @@
<?php
namespace NoccyLabs\Ipc\Interop\Channel;
class MultiStreamChannelTest extends \PhpUnit\Framework\TestCase
{
public function testCreatingMultipleClients()
{
$master = new MultiStreamChannel();
$client1 = $master->createClient();
$client2 = $master->createClient();
$client3 = $master->createClient();
$master->send("Hello World");
$this->assertEquals("Hello World", $client1->receive());
$this->assertEquals("Hello World", $client2->receive());
$this->assertEquals("Hello World", $client3->receive());
}
public function testReadingFromMultipleClients()
{
$master = new MultiStreamChannel();
$client1 = $master->createClient();
$client2 = $master->createClient();
$client3 = $master->createClient();
$client1->send("a");
$client2->send("b");
$client3->send("c");
$this->assertEquals("a", $master->receive());
$this->assertEquals("b", $master->receive());
$this->assertEquals("c", $master->receive());
}
}

View File

@ -0,0 +1,27 @@
<?php
namespace NoccyLabs\Ipc\Sem;
use NoccyLabs\Ipc\Key\FileKey;
class SemaphoreTest extends \PhpUnit\Framework\TestCase
{
public function testCreatingAndAquiringSemaphore()
{
$key = new FileKey(__FILE__);
$sem = new Semaphore($key, 2);
$this->assertTrue($sem->acquire());
$this->assertTrue($sem->acquire());
$this->assertFalse($sem->acquire());
$this->assertTrue($sem->release());
$this->assertTrue($sem->acquire());
$this->assertFalse($sem->acquire());
$this->assertTrue($sem->release());
$this->assertTrue($sem->release());
}
}

View File

@ -0,0 +1,25 @@
<?php
namespace NoccyLabs\Ipc\Shm;
use NoccyLabs\Ipc\Key\FileKey;
class SharedMemoryBlockTest extends \PhpUnit\Framework\TestCase
{
public function testWritingAndReading()
{
$key = new FileKey(__DIR__);
$shm = new SharedMemoryBlock($key, SharedMemoryBlock::FLAG_CREATE, 64);
$this->assertEquals(11, $shm->write("Hello World", 0));
$this->assertEquals("Hello World", $shm->read(11, 0));
$this->assertEquals(3, $shm->write("foo", 2));
$this->assertEquals("Hefoo World", $shm->read(11, 0));
$shm->destroy();
}
}