Compare commits
13 Commits
Author | SHA1 | Date | |
---|---|---|---|
c855f92ecd | |||
e8c4a85486 | |||
f8b43beaf5 | |||
00d2bead8e | |||
d6aacd841f | |||
31634c5054 | |||
20b6f1d42c | |||
c9c4a8aa78 | |||
4652e464b9 | |||
612cfc2035 | |||
dcb059e4be | |||
4e8ab73a88 | |||
2652480c5d |
1
.gitignore
vendored
1
.gitignore
vendored
@ -1,2 +1,3 @@
|
|||||||
/vendor/
|
/vendor/
|
||||||
/composer.lock
|
/composer.lock
|
||||||
|
/doc
|
||||||
|
174
README.md
174
README.md
@ -2,26 +2,168 @@ noccylabs/ipc
|
|||||||
=============
|
=============
|
||||||
|
|
||||||
This is a one-size-fits-all IPC library to facilitate communication between
|
This is a one-size-fits-all IPC library to facilitate communication between
|
||||||
threads and processes. It contains the following features:
|
threads and processes.
|
||||||
|
|
||||||
**Core:**
|
For complete examples, see the `examples` directory in the source tree.
|
||||||
|
|
||||||
- [ ] Semaphore
|
## Signals
|
||||||
- [ ] Mutex
|
|
||||||
- [x] Queue
|
|
||||||
- [x] SharedMemory key-value store
|
|
||||||
- [ ] SharedMemory blocks
|
|
||||||
- [x] Signals
|
|
||||||
- [x] Locks
|
|
||||||
|
|
||||||
**High-Level:**
|
Asynchronous signals are automatically enabled if supported. Otherwise, the
|
||||||
|
`pcntl_signal_dispatch()` method must be frequently called from your main loop.
|
||||||
|
You can test for this using the `SIGNALS_ASYNC` constant:
|
||||||
|
|
||||||
- [ ] EventBridge
|
if (!SIGNALS_ASYNC) {
|
||||||
- [ ] EventDispatcher
|
pcntl_signal_dispatch();
|
||||||
- [ ] InterOp/Marshalling
|
}
|
||||||
- [x] Asynchronous timers
|
|
||||||
|
|
||||||
**Transports:**
|
### Signal handlers
|
||||||
|
|
||||||
- [x] Stream channels
|
Signal handlers allow for multiple listeners, with any one of them being able to
|
||||||
|
prevent the signal from bubbling up.
|
||||||
|
|
||||||
|
$handler = new SignalHandler(SIGUSR1);
|
||||||
|
|
||||||
|
$handler->addHandler(function () {
|
||||||
|
// Handle SIGUSR1, return true to stop bubbling
|
||||||
|
return true;
|
||||||
|
});
|
||||||
|
|
||||||
|
You can also handle as well as fire signals using the `Signal` class:
|
||||||
|
|
||||||
|
$signal = new Signal(SIGUSR1);
|
||||||
|
$signal->setHandler(function () {
|
||||||
|
// Handle SIGUSR1
|
||||||
|
});
|
||||||
|
|
||||||
|
// Dispatch the signal to ourselves
|
||||||
|
(new Signal(SIGUSR1))->dispatch($pid);
|
||||||
|
|
||||||
|
|
||||||
|
### Signal traps
|
||||||
|
|
||||||
|
Traps are used in the main loop to break on signals
|
||||||
|
|
||||||
|
$trap = new SignalTrap(SIGINT);
|
||||||
|
|
||||||
|
while (!$trap->isTrapped()) {
|
||||||
|
// ...
|
||||||
|
}
|
||||||
|
|
||||||
|
### Timers
|
||||||
|
|
||||||
|
Timers fire asynchronously at fixed 1 second intervals. It requires signals to be
|
||||||
|
processed; see above.
|
||||||
|
|
||||||
|
// Once every second...
|
||||||
|
$timer = new Timer(function () {
|
||||||
|
echo ".";
|
||||||
|
});
|
||||||
|
|
||||||
|
## File locks
|
||||||
|
|
||||||
|
File locks uses a shared file as a resource for locking.
|
||||||
|
|
||||||
|
// Creating the lock will not acquire it
|
||||||
|
$lock = new FileLock(__FILE__);
|
||||||
|
|
||||||
|
if (!$lock->acquire()) {
|
||||||
|
echo "fail!\n";
|
||||||
|
} else {
|
||||||
|
$lock->release();
|
||||||
|
}
|
||||||
|
|
||||||
|
## SysV wrappers
|
||||||
|
|
||||||
|
All these wrappers depend on a `KeyInterface` being passed to the constructor.
|
||||||
|
This is usually an instance of a `FileKey`, created as such:
|
||||||
|
|
||||||
|
$key = new FileKey(__FILE__);
|
||||||
|
|
||||||
|
The key has a project identifier that starts at `chr(0)`, or `"\0"`. To increase
|
||||||
|
this identifier, and thus point to another segment, just clone it.
|
||||||
|
|
||||||
|
$key2 = clone $key1;
|
||||||
|
|
||||||
|
### Semaphores
|
||||||
|
|
||||||
|
Semaphores are created with a key and a max count.
|
||||||
|
|
||||||
|
$key = new FileKey(__FILE__);
|
||||||
|
$sem = new Semaphore($key, 2);
|
||||||
|
|
||||||
|
$sem->allocate(); // -> true
|
||||||
|
$sem->allocate(); // -> true
|
||||||
|
$sem->allocate(); // -> false
|
||||||
|
$sem->release();
|
||||||
|
|
||||||
|
$mutex->destroy();
|
||||||
|
|
||||||
|
### Mutexes
|
||||||
|
|
||||||
|
A mutex is a semaphore with a max count of 1.
|
||||||
|
|
||||||
|
$key = new FileKey(__FILE__);
|
||||||
|
$mutex = new Mutex($key);
|
||||||
|
|
||||||
|
$mutex->allocate(); // -> true
|
||||||
|
$mutex->allocate(); // -> false
|
||||||
|
$mutex->release();
|
||||||
|
|
||||||
|
$mutex->destroy();
|
||||||
|
|
||||||
|
### Message Queues
|
||||||
|
|
||||||
|
$key = new FileKey(__FILE__);
|
||||||
|
$msgq = new Queue($key);
|
||||||
|
|
||||||
|
$msgq->send(1, [ "Some data", [ "format"=>"foo" ]]);
|
||||||
|
|
||||||
|
$data = $msgq->receive(1, $type);
|
||||||
|
|
||||||
|
$msgq->destroy();
|
||||||
|
|
||||||
|
### Shared Memory
|
||||||
|
|
||||||
|
Shared memory using `SharedData` supports integrity checking when setting, using
|
||||||
|
the third parameter to `set()`.
|
||||||
|
|
||||||
|
$key = new FileKey(__FILE__);
|
||||||
|
$shm = new SharedData($key);
|
||||||
|
|
||||||
|
do {
|
||||||
|
$counter = $shm->get("counter") + 1;
|
||||||
|
} while (!$shm->set("counter", $counter, true));
|
||||||
|
|
||||||
|
$shm->destroy();
|
||||||
|
|
||||||
|
The `SharedMemory` class is a simple integer-indexed array
|
||||||
|
|
||||||
|
$key = new FileKey(__FILE__);
|
||||||
|
$shm = new SharedMemory($key);
|
||||||
|
|
||||||
|
$shm[0] = 42;
|
||||||
|
|
||||||
|
$shm->destroy();
|
||||||
|
|
||||||
|
## Communication
|
||||||
|
|
||||||
|
### Channels
|
||||||
|
|
||||||
|
Channels are essentially connected pipes. A channel can be created with a stream resource,
|
||||||
|
or through the `createPair()` factory method.
|
||||||
|
|
||||||
|
[ $ch1, $ch2 ] = StreamChannel::createPair();
|
||||||
|
$ch1->send($data);
|
||||||
|
$rcvd = $ch2->receive();
|
||||||
|
|
||||||
|
### MultiChannels
|
||||||
|
|
||||||
|
The `MultiStreamChannel` lets you connect multiple clients to a single master channel.
|
||||||
|
|
||||||
|
$master = new MultiStreamChannel();
|
||||||
|
|
||||||
|
$ch1 = $master->createClient();
|
||||||
|
$ch2 = $master->createClient();
|
||||||
|
$ch1->send($data);
|
||||||
|
$rcvd = $ch2->receive();
|
||||||
|
|
39
examples/multichannels.php
Normal file
39
examples/multichannels.php
Normal file
@ -0,0 +1,39 @@
|
|||||||
|
<?php
|
||||||
|
|
||||||
|
require_once __DIR__."/../vendor/autoload.php";
|
||||||
|
|
||||||
|
use NoccyLabs\Ipc\Interop\Channel\MultiStreamChannel;
|
||||||
|
|
||||||
|
// It really is this simple
|
||||||
|
$channel = new MultiStreamChannel();
|
||||||
|
|
||||||
|
// Create a child to report back in 3 seconds
|
||||||
|
$channel1 = $channel->createClient();
|
||||||
|
$pid1 = pcntl_fork();
|
||||||
|
if ($pid1 === 0) {
|
||||||
|
sleep(1);
|
||||||
|
echo "child 1 received ".$channel1->receive()."\n";
|
||||||
|
$channel1->send("Hello from child 1");
|
||||||
|
sleep(5);
|
||||||
|
exit;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create another child to report back in 4 seconds
|
||||||
|
$channel2 = $channel->createClient();
|
||||||
|
$pid2 = pcntl_fork();
|
||||||
|
if ($pid2 === 0) {
|
||||||
|
sleep(1);
|
||||||
|
echo "child 2 received ".$channel2->receive()."\n";
|
||||||
|
$channel2->send("Hello from child 2");
|
||||||
|
sleep(5);
|
||||||
|
exit;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Writing to the master works whether any clients have been added
|
||||||
|
$channel->send("Hello from master");
|
||||||
|
|
||||||
|
// Wait for 5 seconds and dump whatever messages are waiting
|
||||||
|
sleep(5);
|
||||||
|
while ($msg = $channel->receive()) {
|
||||||
|
echo "master received ".$msg."\n";
|
||||||
|
}
|
26
examples/queues.php
Normal file
26
examples/queues.php
Normal file
@ -0,0 +1,26 @@
|
|||||||
|
<?php
|
||||||
|
|
||||||
|
require_once __DIR__."/../vendor/autoload.php";
|
||||||
|
|
||||||
|
use NoccyLabs\Ipc\Key\FileKey;
|
||||||
|
use NoccyLabs\Ipc\Msg\Queue;
|
||||||
|
|
||||||
|
$key = new FileKey(__FILE__);
|
||||||
|
$msgq = new Queue($key);
|
||||||
|
|
||||||
|
// Send packages with msgtype >= 1...
|
||||||
|
$msgq->send(1, [ "what"=>"First" ]);
|
||||||
|
$msgq->send(2, [ "what"=>"Second" ]);
|
||||||
|
$msgq->send(3, [ "what"=>"Third" ]);
|
||||||
|
|
||||||
|
// Read messages by requesting a type...
|
||||||
|
$msg = $msgq->receive(2, $type);
|
||||||
|
printf("msg: %s, type: %d\n", json_encode($msg), $type);
|
||||||
|
|
||||||
|
// ...or read the first message with type 0...
|
||||||
|
$msg = $msgq->receive(0, $type);
|
||||||
|
printf("msg: %s, type: %d\n", json_encode($msg), $type);
|
||||||
|
$msg = $msgq->receive(0, $type);
|
||||||
|
printf("msg: %s, type: %d\n", json_encode($msg), $type);
|
||||||
|
|
||||||
|
$msgq->destroy();
|
21
examples/semaphores.php
Normal file
21
examples/semaphores.php
Normal file
@ -0,0 +1,21 @@
|
|||||||
|
<?php
|
||||||
|
|
||||||
|
require_once __DIR__."/../vendor/autoload.php";
|
||||||
|
|
||||||
|
use NoccyLabs\Ipc\Key\FileKey;
|
||||||
|
use NoccyLabs\Ipc\Sem\Semaphore;
|
||||||
|
|
||||||
|
$key = new FileKey(__FILE__);
|
||||||
|
|
||||||
|
// Create semaphore with max count of 2
|
||||||
|
$sem1 = new Semaphore($key, 2);
|
||||||
|
$sem2 = new Semaphore($key, 2);
|
||||||
|
$sem3 = new Semaphore($key, 2);
|
||||||
|
|
||||||
|
printf("sem1 acquire: %d\n", $sem1->acquire());
|
||||||
|
printf("sem2 acquire: %d\n", $sem2->acquire());
|
||||||
|
printf("sem3 acquire: %d\n", $sem3->acquire());
|
||||||
|
printf("sem2 release: %d\n", $sem2->release());
|
||||||
|
printf("sem3 acquire: %d\n", $sem3->acquire());
|
||||||
|
|
||||||
|
$sem1->destroy();
|
@ -32,3 +32,4 @@ if (!$shm->set("some.counter", $counter + 1, true)) {
|
|||||||
echo "some.counter has been modified since last read\n";
|
echo "some.counter has been modified since last read\n";
|
||||||
}
|
}
|
||||||
|
|
||||||
|
$shm->destroy();
|
@ -9,7 +9,14 @@ use NoccyLabs\Ipc\Signal\SignalTrap;
|
|||||||
$trap = new SignalTrap(SIGINT);
|
$trap = new SignalTrap(SIGINT);
|
||||||
echo "Press ctrl-c...\n";
|
echo "Press ctrl-c...\n";
|
||||||
|
|
||||||
while (!$trap->isTrapped()) {
|
while (!$trap->isTrapped(true)) {
|
||||||
|
usleep(10000);
|
||||||
|
}
|
||||||
|
|
||||||
|
echo "Thanks!\n";
|
||||||
|
echo "And once more...\n";
|
||||||
|
|
||||||
|
while (!$trap(true)) {
|
||||||
usleep(10000);
|
usleep(10000);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -8,7 +8,7 @@
|
|||||||
beStrictAboutTestsThatDoNotTestAnything="true"
|
beStrictAboutTestsThatDoNotTestAnything="true"
|
||||||
beStrictAboutTodoAnnotatedTests="true"
|
beStrictAboutTodoAnnotatedTests="true"
|
||||||
verbose="true">
|
verbose="true">
|
||||||
<testsuite>
|
<testsuite name="default">
|
||||||
<directory suffix="Test.php">tests</directory>
|
<directory suffix="Test.php">tests</directory>
|
||||||
</testsuite>
|
</testsuite>
|
||||||
|
|
||||||
|
@ -23,7 +23,6 @@ class Timer
|
|||||||
public function __construct(callable $callback)
|
public function __construct(callable $callback)
|
||||||
{
|
{
|
||||||
$this->callback = $callback;
|
$this->callback = $callback;
|
||||||
$this->seconds = $seconds;
|
|
||||||
self::registerTimer($this->callback);
|
self::registerTimer($this->callback);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -5,9 +5,25 @@ namespace NoccyLabs\Ipc\Interop\Channel;
|
|||||||
|
|
||||||
interface ChannelInterface
|
interface ChannelInterface
|
||||||
{
|
{
|
||||||
|
/**
|
||||||
|
* Check if the channel is open
|
||||||
|
*
|
||||||
|
* @return boolean
|
||||||
|
*/
|
||||||
public function isOpen():bool;
|
public function isOpen():bool;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Receive (and unserialize) a frame of data.
|
||||||
|
*
|
||||||
|
* @return mixed
|
||||||
|
*/
|
||||||
public function receive();
|
public function receive();
|
||||||
|
|
||||||
public function send($data);
|
/**
|
||||||
|
* Send a frame of data, returns true on success
|
||||||
|
*
|
||||||
|
* @param mixed $data
|
||||||
|
* @return bool
|
||||||
|
*/
|
||||||
|
public function send($data):bool;
|
||||||
}
|
}
|
70
src/Interop/Channel/MultiStreamChannel.php
Normal file
70
src/Interop/Channel/MultiStreamChannel.php
Normal file
@ -0,0 +1,70 @@
|
|||||||
|
<?php
|
||||||
|
|
||||||
|
namespace NoccyLabs\Ipc\Interop\Channel;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The MultiChannel is used to create a single channel and allow other threads
|
||||||
|
* to each grab an end of it. Think for example a web server that could write
|
||||||
|
* log output to a channel (as it´the MultiStreamChannel is always open); one
|
||||||
|
* thread could grab a channel to dump it to the console, and another thread
|
||||||
|
* could write to a logfile.
|
||||||
|
*/
|
||||||
|
class MultiStreamChannel implements ChannelInterface
|
||||||
|
{
|
||||||
|
|
||||||
|
protected $clients = [];
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a new client to receive data sent to the channel
|
||||||
|
*
|
||||||
|
* @return StreamChannel
|
||||||
|
*/
|
||||||
|
public function createClient(): StreamChannel
|
||||||
|
{
|
||||||
|
[ $a, $b ] = StreamChannel::createPair();
|
||||||
|
$this->clients[] = $a;
|
||||||
|
return $b;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* {@inheritDoc}
|
||||||
|
*
|
||||||
|
* @return boolean
|
||||||
|
*/
|
||||||
|
public function isOpen(): bool
|
||||||
|
{
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* {@inheritDoc}
|
||||||
|
*
|
||||||
|
* @param mixed $data
|
||||||
|
* @return bool
|
||||||
|
*/
|
||||||
|
public function send($data):bool
|
||||||
|
{
|
||||||
|
foreach ($this->clients as $i=>$client) {
|
||||||
|
if (false === $client->send($data)) {
|
||||||
|
unset($this->clients[$i]);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* {@inheritDoc}
|
||||||
|
*
|
||||||
|
* @return mixed
|
||||||
|
*/
|
||||||
|
public function receive()
|
||||||
|
{
|
||||||
|
foreach ($this->clients as $client) {
|
||||||
|
if ($read = $client->receive()) {
|
||||||
|
return $read;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
@ -2,11 +2,19 @@
|
|||||||
|
|
||||||
namespace NoccyLabs\Ipc\Interop\Channel;
|
namespace NoccyLabs\Ipc\Interop\Channel;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Channel based on streams
|
||||||
|
*
|
||||||
|
*/
|
||||||
class StreamChannel implements ChannelInterface
|
class StreamChannel implements ChannelInterface
|
||||||
{
|
{
|
||||||
protected $stream;
|
protected $stream;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Constructor
|
||||||
|
*
|
||||||
|
* @param resource|string $stream
|
||||||
|
*/
|
||||||
public function __construct($stream)
|
public function __construct($stream)
|
||||||
{
|
{
|
||||||
if (!is_resource($stream)) {
|
if (!is_resource($stream)) {
|
||||||
@ -21,26 +29,55 @@ class StreamChannel implements ChannelInterface
|
|||||||
|
|
||||||
$this->stream = $stream;
|
$this->stream = $stream;
|
||||||
|
|
||||||
$this->isOpen();
|
stream_set_blocking($stream, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
public function send($data)
|
/**
|
||||||
|
* {@inheritDoc}
|
||||||
|
*
|
||||||
|
* @param mixed $data
|
||||||
|
* @return bool
|
||||||
|
*/
|
||||||
|
public function send($data):bool
|
||||||
{
|
{
|
||||||
fwrite($this->stream, json_encode($data)."\0");
|
$json = json_encode($data) . "\0";
|
||||||
|
$ret = @fwrite($this->stream, $json, strlen($json));
|
||||||
|
if (false === $ret) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* {@inheritDoc}
|
||||||
|
*
|
||||||
|
* @return mixed
|
||||||
|
*/
|
||||||
public function receive()
|
public function receive()
|
||||||
{
|
{
|
||||||
$buf = fread($this->stream, 8192);
|
$buf = fread($this->stream, 8192);
|
||||||
|
if (!$buf) {
|
||||||
return json_decode(rtrim($buf, "\0"));
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return json_decode(rtrim($buf, "\0"), true);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* {@inheritDoc}
|
||||||
|
*
|
||||||
|
* @return boolean
|
||||||
|
*/
|
||||||
public function isOpen(): bool
|
public function isOpen(): bool
|
||||||
{
|
{
|
||||||
return is_resource($this->stream);
|
return is_resource($this->stream);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a pair of connected StreamChannel instances
|
||||||
|
*
|
||||||
|
* @return StreamChannel[]
|
||||||
|
*/
|
||||||
public static function createPair()
|
public static function createPair()
|
||||||
{
|
{
|
||||||
$fd = stream_socket_pair(STREAM_PF_UNIX, STREAM_SOCK_STREAM, STREAM_IPPROTO_IP);
|
$fd = stream_socket_pair(STREAM_PF_UNIX, STREAM_SOCK_STREAM, STREAM_IPPROTO_IP);
|
||||||
|
@ -15,20 +15,10 @@ class FileKey implements KeyInterface
|
|||||||
* @param string $proj
|
* @param string $proj
|
||||||
* @param boolean $create
|
* @param boolean $create
|
||||||
*/
|
*/
|
||||||
public function __construct(string $pathname, $proj="\0", bool $create=false)
|
public function __construct(string $pathname, $proj="\0", bool $create = false)
|
||||||
{
|
{
|
||||||
if (!file_exists($pathname)) {
|
$this->setPathname($pathname, $create);
|
||||||
if (!$create) {
|
$this->setProjectIdentifier($proj);
|
||||||
throw new \RuntimeException("Path does not exist: {$pathname}");
|
|
||||||
}
|
|
||||||
if (!is_dir(dirname($pathname))) {
|
|
||||||
throw new \RuntimeException("The directory ".dirname($pathname)." does not exist");
|
|
||||||
}
|
|
||||||
touch($pathname);
|
|
||||||
}
|
|
||||||
|
|
||||||
$this->pathname = $pathname;
|
|
||||||
$this->proj = substr($proj,0,1);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -41,6 +31,20 @@ class FileKey implements KeyInterface
|
|||||||
return $this->pathname;
|
return $this->pathname;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public function setPathname(string $pathname, bool $create = false)
|
||||||
|
{
|
||||||
|
if (!file_exists($pathname)) {
|
||||||
|
if (!$create) {
|
||||||
|
throw new \RuntimeException("Path does not exist: {$pathname}");
|
||||||
|
}
|
||||||
|
if (!is_dir(dirname($pathname))) {
|
||||||
|
throw new \RuntimeException("The directory ".dirname($pathname)." does not exist");
|
||||||
|
}
|
||||||
|
touch($pathname);
|
||||||
|
}
|
||||||
|
$this->pathname = $pathname;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get the project identifier. Not guaranteed to be printable
|
* Get the project identifier. Not guaranteed to be printable
|
||||||
*
|
*
|
||||||
@ -51,6 +55,17 @@ class FileKey implements KeyInterface
|
|||||||
return $this->proj;
|
return $this->proj;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set the project identifier
|
||||||
|
*
|
||||||
|
* @param string $proj
|
||||||
|
* @return void
|
||||||
|
*/
|
||||||
|
public function setProjectIdentifier(string $proj)
|
||||||
|
{
|
||||||
|
$this->proj = substr($proj, 0, 1);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get the key value
|
* Get the key value
|
||||||
*
|
*
|
||||||
|
@ -4,6 +4,10 @@ namespace NoccyLabs\Ipc\Key;
|
|||||||
|
|
||||||
interface KeyInterface
|
interface KeyInterface
|
||||||
{
|
{
|
||||||
|
/**
|
||||||
|
* Get the integer key used for SysV ipc operations
|
||||||
|
*
|
||||||
|
* @return integer
|
||||||
|
*/
|
||||||
public function getKey():int;
|
public function getKey():int;
|
||||||
public function __toString();
|
|
||||||
}
|
}
|
@ -32,6 +32,18 @@ class Queue
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Destroy the queue
|
||||||
|
*
|
||||||
|
* @return void
|
||||||
|
*/
|
||||||
|
public function destroy()
|
||||||
|
{
|
||||||
|
if ($this->resource) {
|
||||||
|
msg_remove_queue($this->resource);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Send to the queue
|
* Send to the queue
|
||||||
*
|
*
|
||||||
|
@ -8,18 +8,25 @@ use NoccyLabs\Ipc\Key\KeyInterface;
|
|||||||
|
|
||||||
class Semaphore
|
class Semaphore
|
||||||
{
|
{
|
||||||
public function __construct(KeyInterface $key, int $max)
|
protected $resource;
|
||||||
{
|
|
||||||
|
|
||||||
|
public function __construct(KeyInterface $key, int $max, $perm = 0660, $autorelease = 1)
|
||||||
|
{
|
||||||
|
$this->resource = sem_get($key->getKey(), $max, $perm, $autorelease);
|
||||||
}
|
}
|
||||||
|
|
||||||
public function acquire(float $timeout)
|
public function destroy()
|
||||||
{
|
{
|
||||||
|
sem_remove($this->resource);
|
||||||
}
|
}
|
||||||
|
|
||||||
public function release()
|
public function acquire(bool $wait = false):bool
|
||||||
{
|
{
|
||||||
|
return sem_acquire($this->resource, !$wait);
|
||||||
|
}
|
||||||
|
|
||||||
|
public function release():bool
|
||||||
|
{
|
||||||
|
return sem_release($this->resource);
|
||||||
}
|
}
|
||||||
}
|
}
|
@ -29,7 +29,7 @@ class SharedMemoryBlock
|
|||||||
*/
|
*/
|
||||||
public function __construct(KeyInterface $key, string $flags, int $memsize, int $perm = 0666)
|
public function __construct(KeyInterface $key, string $flags, int $memsize, int $perm = 0666)
|
||||||
{
|
{
|
||||||
if (!($shm_resource = shmop_open($key->getKey(), $memsize, $perm))) {
|
if (!($shm_resource = shmop_open($key->getKey(), $flags, $perm, $memsize))) {
|
||||||
throw new \RuntimeException("Unable to attach shm resource {$key}");
|
throw new \RuntimeException("Unable to attach shm resource {$key}");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -7,17 +7,34 @@ class Signal
|
|||||||
{
|
{
|
||||||
private $signo;
|
private $signo;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Constructor
|
||||||
|
*
|
||||||
|
* @param integer $signo
|
||||||
|
*/
|
||||||
public function __construct(int $signo)
|
public function __construct(int $signo)
|
||||||
{
|
{
|
||||||
$this->signo = $signo;
|
$this->signo = $signo;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set a signal handler, overwriting any previous handler
|
||||||
|
*
|
||||||
|
* @param callable $handler
|
||||||
|
* @return void
|
||||||
|
*/
|
||||||
public function setHandler(callable $handler):void
|
public function setHandler(callable $handler):void
|
||||||
{
|
{
|
||||||
pcntl_signal($this->signo, $handler);
|
pcntl_signal($this->signo, $handler);
|
||||||
}
|
}
|
||||||
|
|
||||||
public function dispatch($pid=null):bool
|
/**
|
||||||
|
* Dispatch the signal to the specified pid. Default is own pid
|
||||||
|
*
|
||||||
|
* @param int $pid
|
||||||
|
* @return boolean
|
||||||
|
*/
|
||||||
|
public function dispatch(int $pid=null):bool
|
||||||
{
|
{
|
||||||
return posix_kill($pid?:posix_getpid(), $this->signo);
|
return posix_kill($pid?:posix_getpid(), $this->signo);
|
||||||
}
|
}
|
||||||
|
@ -9,6 +9,13 @@ class SignalHandler
|
|||||||
|
|
||||||
private $callbacks;
|
private $callbacks;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Constructor, should only be called once for each signal but can have
|
||||||
|
* multiple handlers attached.
|
||||||
|
*
|
||||||
|
* @param integer $signo
|
||||||
|
* @param array $callbacks
|
||||||
|
*/
|
||||||
public function __construct(int $signo, array $callbacks=[])
|
public function __construct(int $signo, array $callbacks=[])
|
||||||
{
|
{
|
||||||
$this->signo = $signo;
|
$this->signo = $signo;
|
||||||
@ -21,11 +28,23 @@ class SignalHandler
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Append a handler to the signal
|
||||||
|
*
|
||||||
|
* @param callable $handler
|
||||||
|
* @return void
|
||||||
|
*/
|
||||||
public function addHandler(callable $handler):void
|
public function addHandler(callable $handler):void
|
||||||
{
|
{
|
||||||
$this->callbacks[] = $handler;
|
$this->callbacks[] = $handler;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Callback for signals
|
||||||
|
*
|
||||||
|
* @param int $signo
|
||||||
|
* @return void
|
||||||
|
*/
|
||||||
public function onSignal($signo)
|
public function onSignal($signo)
|
||||||
{
|
{
|
||||||
foreach ($this->callbacks as $callback) {
|
foreach ($this->callbacks as $callback) {
|
||||||
|
@ -9,17 +9,33 @@ class SignalTrap
|
|||||||
|
|
||||||
protected $trapped = false;
|
protected $trapped = false;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Constructor
|
||||||
|
*
|
||||||
|
* @param integer $signo
|
||||||
|
*/
|
||||||
public function __construct(int $signo)
|
public function __construct(int $signo)
|
||||||
{
|
{
|
||||||
$this->signal = new Signal($signo);
|
$this->signal = new Signal($signo);
|
||||||
$this->signal->setHandler([ $this, "onSignal" ]);
|
$this->signal->setHandler([ $this, "onSignal" ]);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Signal handler callback
|
||||||
|
*
|
||||||
|
* @return void
|
||||||
|
*/
|
||||||
public function onSignal()
|
public function onSignal()
|
||||||
{
|
{
|
||||||
$this->trapped = true;
|
$this->trapped = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Check if the signal has been received
|
||||||
|
*
|
||||||
|
* @param boolean $reset
|
||||||
|
* @return boolean
|
||||||
|
*/
|
||||||
public function isTrapped($reset=true):bool
|
public function isTrapped($reset=true):bool
|
||||||
{
|
{
|
||||||
if ($this->trapped) {
|
if ($this->trapped) {
|
||||||
@ -29,4 +45,9 @@ class SignalTrap
|
|||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public function __invoke($reset=true):bool
|
||||||
|
{
|
||||||
|
return $this->isTrapped($reset);
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
44
tests/Interop/Channel/MultiStreamChannelTest.php
Normal file
44
tests/Interop/Channel/MultiStreamChannelTest.php
Normal file
@ -0,0 +1,44 @@
|
|||||||
|
<?php
|
||||||
|
|
||||||
|
namespace NoccyLabs\Ipc\Interop\Channel;
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
class MultiStreamChannelTest extends \PhpUnit\Framework\TestCase
|
||||||
|
{
|
||||||
|
|
||||||
|
public function testCreatingMultipleClients()
|
||||||
|
{
|
||||||
|
$master = new MultiStreamChannel();
|
||||||
|
|
||||||
|
$client1 = $master->createClient();
|
||||||
|
$client2 = $master->createClient();
|
||||||
|
$client3 = $master->createClient();
|
||||||
|
|
||||||
|
$master->send("Hello World");
|
||||||
|
|
||||||
|
$this->assertEquals("Hello World", $client1->receive());
|
||||||
|
$this->assertEquals("Hello World", $client2->receive());
|
||||||
|
$this->assertEquals("Hello World", $client3->receive());
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
public function testReadingFromMultipleClients()
|
||||||
|
{
|
||||||
|
$master = new MultiStreamChannel();
|
||||||
|
|
||||||
|
$client1 = $master->createClient();
|
||||||
|
$client2 = $master->createClient();
|
||||||
|
$client3 = $master->createClient();
|
||||||
|
|
||||||
|
$client1->send("a");
|
||||||
|
$client2->send("b");
|
||||||
|
$client3->send("c");
|
||||||
|
|
||||||
|
$this->assertEquals("a", $master->receive());
|
||||||
|
$this->assertEquals("b", $master->receive());
|
||||||
|
$this->assertEquals("c", $master->receive());
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
27
tests/Sem/SemaphoreTest.php
Normal file
27
tests/Sem/SemaphoreTest.php
Normal file
@ -0,0 +1,27 @@
|
|||||||
|
<?php
|
||||||
|
|
||||||
|
namespace NoccyLabs\Ipc\Sem;
|
||||||
|
|
||||||
|
use NoccyLabs\Ipc\Key\FileKey;
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
class SemaphoreTest extends \PhpUnit\Framework\TestCase
|
||||||
|
{
|
||||||
|
|
||||||
|
public function testCreatingAndAquiringSemaphore()
|
||||||
|
{
|
||||||
|
$key = new FileKey(__FILE__);
|
||||||
|
$sem = new Semaphore($key, 2);
|
||||||
|
|
||||||
|
$this->assertTrue($sem->acquire());
|
||||||
|
$this->assertTrue($sem->acquire());
|
||||||
|
$this->assertFalse($sem->acquire());
|
||||||
|
$this->assertTrue($sem->release());
|
||||||
|
$this->assertTrue($sem->acquire());
|
||||||
|
$this->assertFalse($sem->acquire());
|
||||||
|
$this->assertTrue($sem->release());
|
||||||
|
$this->assertTrue($sem->release());
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
25
tests/Shm/SharedMemoryBlockTest.php
Normal file
25
tests/Shm/SharedMemoryBlockTest.php
Normal file
@ -0,0 +1,25 @@
|
|||||||
|
<?php
|
||||||
|
|
||||||
|
namespace NoccyLabs\Ipc\Shm;
|
||||||
|
|
||||||
|
use NoccyLabs\Ipc\Key\FileKey;
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
class SharedMemoryBlockTest extends \PhpUnit\Framework\TestCase
|
||||||
|
{
|
||||||
|
public function testWritingAndReading()
|
||||||
|
{
|
||||||
|
$key = new FileKey(__DIR__);
|
||||||
|
$shm = new SharedMemoryBlock($key, SharedMemoryBlock::FLAG_CREATE, 64);
|
||||||
|
|
||||||
|
$this->assertEquals(11, $shm->write("Hello World", 0));
|
||||||
|
$this->assertEquals("Hello World", $shm->read(11, 0));
|
||||||
|
$this->assertEquals(3, $shm->write("foo", 2));
|
||||||
|
$this->assertEquals("Hefoo World", $shm->read(11, 0));
|
||||||
|
|
||||||
|
$shm->destroy();
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user