Compare commits
6 Commits
Author | SHA1 | Date | |
---|---|---|---|
4652e464b9 | |||
612cfc2035 | |||
dcb059e4be | |||
4e8ab73a88 | |||
2652480c5d | |||
176f9aa5ec |
1
.gitignore
vendored
1
.gitignore
vendored
@ -1,2 +1,3 @@
|
||||
/vendor/
|
||||
/composer.lock
|
||||
/doc
|
||||
|
140
README.md
140
README.md
@ -2,26 +2,132 @@ noccylabs/ipc
|
||||
=============
|
||||
|
||||
This is a one-size-fits-all IPC library to facilitate communication between
|
||||
threads and processes. It contains the following features:
|
||||
threads and processes.
|
||||
|
||||
**Core:**
|
||||
For complete examples, see the `examples` directory in the source tree.
|
||||
## Signals
|
||||
|
||||
- [ ] Semaphore
|
||||
- [ ] Mutex
|
||||
- [x] Queue
|
||||
- [x] SharedMemory key-value store
|
||||
- [ ] SharedMemory blocks
|
||||
- [x] Signals
|
||||
- [x] Locks
|
||||
Asynchronous signals are automatically enabled if supported. Otherwise, the
|
||||
`pcntl_signal_dispatch()` method must be frequently called from your main loop.
|
||||
You can test for this using the `ASYNC_SIGNALS` constant:
|
||||
|
||||
**High-Level:**
|
||||
if (!ASYNC_SIGNALS) {
|
||||
pcntl_signal_dispatch();
|
||||
}
|
||||
|
||||
- [ ] EventBridge
|
||||
- [ ] EventDispatcher
|
||||
- [ ] InterOp/Marshalling
|
||||
- [x] Asynchronous timers
|
||||
### Signal handlers
|
||||
|
||||
**Transports:**
|
||||
Signal handlers allow for multiple listeners, with any one of them being able to
|
||||
prevent the signal from bubbling up.
|
||||
|
||||
- [x] Stream channels
|
||||
|
||||
$handler = new SignalHandler(SIGUSR1);
|
||||
|
||||
$handler->addHandler(function () {
|
||||
// Handle SIGUSR1, return true to stop bubbling
|
||||
return true;
|
||||
});
|
||||
|
||||
You can also handle as well as fire signals using the `Signal` class:
|
||||
|
||||
$signal = new Signal(SIGUSR1);
|
||||
$signal->setHandler(function () {
|
||||
// Handle SIGUSR1
|
||||
});
|
||||
|
||||
// Dispatch the signal to ourselves
|
||||
(new Signal(SIGUSR1))->dispatch($pid);
|
||||
|
||||
|
||||
### Signal traps
|
||||
|
||||
Traps are used in the main loop to break on signals
|
||||
|
||||
$trap = new SignalTrap(SIGINT);
|
||||
|
||||
while (!$trap->isTrapped()) {
|
||||
// ...
|
||||
}
|
||||
|
||||
### Timers
|
||||
|
||||
Timers fire asynchronously at fixed 1 second intervals. It requires signals to be
|
||||
processed; see above.
|
||||
|
||||
// Once every second...
|
||||
$timer = new Timer(function () {
|
||||
echo ".";
|
||||
});
|
||||
|
||||
## File locks
|
||||
|
||||
File locks uses a shared file as a resource for locking.
|
||||
|
||||
// Creating the lock will not acquire it
|
||||
$lock = new FileLock(__FILE__);
|
||||
|
||||
if (!$lock->acquire()) {
|
||||
echo "fail!\n";
|
||||
} else {
|
||||
$lock->release();
|
||||
}
|
||||
|
||||
## SysV wrappers
|
||||
|
||||
All these wrappers depend on a `KeyInterface` being passed to the constructor.
|
||||
This is usually an instance of a `FileKey`, created as such:
|
||||
|
||||
$key = new FileKey(__FILE__);
|
||||
|
||||
The key has a project identifier that starts at `chr(0)`, or `"\0"`. To increase
|
||||
this identifier, and thus point to another segment, just clone it.
|
||||
|
||||
$key2 = clone $key1;
|
||||
|
||||
### Semaphores
|
||||
|
||||
### Mutexes
|
||||
|
||||
### Message Queues
|
||||
|
||||
$key = new FileKey(__FILE__);
|
||||
$msgq = new Queue($key);
|
||||
|
||||
$msgq->send(1, [ "Some data", [ "format"=>"foo" ]]);
|
||||
|
||||
$data = $msgq->receive(1, $type);
|
||||
|
||||
$msgq->destroy();
|
||||
|
||||
### Shared Memory
|
||||
|
||||
Shared memory using `SharedData` supports integrity checking when setting, using
|
||||
the third parameter to `set()`.
|
||||
|
||||
$key = new FileKey(__FILE__);
|
||||
$shm = new SharedData($key);
|
||||
|
||||
do {
|
||||
$counter = $shm->get("counter") + 1;
|
||||
} while (!$shm->set("counter", $counter, true));
|
||||
|
||||
$shm->destroy();
|
||||
|
||||
The `SharedMemory` class is a simple integer-indexed array
|
||||
|
||||
$key = new FileKey(__FILE__);
|
||||
$shm = new SharedMemory($key);
|
||||
|
||||
$shm[0] = 42;
|
||||
|
||||
$shm->destroy();
|
||||
|
||||
## Communication
|
||||
|
||||
### Channels
|
||||
|
||||
Channels are essentially connected pipes. A channel can be created with a stream resource,
|
||||
or through the `createPair()` factory method.
|
||||
|
||||
[ $ch1, $ch2 ] = StreamChannel::createPair();
|
||||
$ch1->send($data);
|
||||
$rcvd = $ch2->receive();
|
||||
|
@ -9,6 +9,12 @@
|
||||
"email": "cvagnetoft@gmail.com"
|
||||
}
|
||||
],
|
||||
"keywords": [
|
||||
"ipc",
|
||||
"shm",
|
||||
"msgqueue",
|
||||
"signals"
|
||||
],
|
||||
"require": {},
|
||||
"repositories": [
|
||||
{
|
||||
@ -24,4 +30,4 @@
|
||||
"NoccyLabs\\Ipc\\": "src/"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
16
examples/channels.php
Normal file
16
examples/channels.php
Normal file
@ -0,0 +1,16 @@
|
||||
<?php
|
||||
|
||||
require_once __DIR__."/../vendor/autoload.php";
|
||||
|
||||
use NoccyLabs\Ipc\Interop\Channel\StreamChannel;
|
||||
|
||||
// Create a pair of channels.
|
||||
[ $ch1, $ch2 ] = StreamChannel::createPair();
|
||||
|
||||
// Send messages with send()
|
||||
$ch1->send([
|
||||
'msg' => "Hello World"
|
||||
]);
|
||||
|
||||
// Receive at the other end using receive()
|
||||
print_r($ch2->receive());
|
43
examples/locks.php
Normal file
43
examples/locks.php
Normal file
@ -0,0 +1,43 @@
|
||||
<?php
|
||||
|
||||
require_once __DIR__."/../vendor/autoload.php";
|
||||
|
||||
use NoccyLabs\Ipc\Lock\FileLock;
|
||||
|
||||
// Creating the lock will not acquire it
|
||||
$lock1 = new FileLock(__FILE__);
|
||||
$lock2 = new FileLock(__FILE__);
|
||||
|
||||
// Acquire the lock like this
|
||||
if ($lock1->acquire()) {
|
||||
echo "lock1 acquired\n";
|
||||
} else {
|
||||
echo "lock1 not acquired\n";
|
||||
}
|
||||
|
||||
// Test the locks like this
|
||||
if ($lock1->isLocked()) {
|
||||
echo "lock1 is locked\n";
|
||||
} else {
|
||||
echo "lock1 is not locked\n";
|
||||
}
|
||||
if ($lock2->isLocked()) {
|
||||
echo "lock2 is locked\n";
|
||||
} else {
|
||||
echo "lock2 is not locked\n";
|
||||
}
|
||||
|
||||
// This will timeout after a second
|
||||
if ($lock2->acquire(1)) {
|
||||
echo "lock2 acquired\n";
|
||||
} else {
|
||||
echo "lock2 not acquired\n";
|
||||
}
|
||||
|
||||
// After releasing, you can acquire it
|
||||
$lock1->release();
|
||||
if ($lock2->acquire(1)) {
|
||||
echo "lock2 acquired\n";
|
||||
} else {
|
||||
echo "lock2 not acquired\n";
|
||||
}
|
26
examples/queues.php
Normal file
26
examples/queues.php
Normal file
@ -0,0 +1,26 @@
|
||||
<?php
|
||||
|
||||
require_once __DIR__."/../vendor/autoload.php";
|
||||
|
||||
use NoccyLabs\Ipc\Key\FileKey;
|
||||
use NoccyLabs\Ipc\Msg\Queue;
|
||||
|
||||
$key = new FileKey(__FILE__);
|
||||
$msgq = new Queue($key);
|
||||
|
||||
// Send packages with msgtype >= 1...
|
||||
$msgq->send(1, [ "what"=>"First" ]);
|
||||
$msgq->send(2, [ "what"=>"Second" ]);
|
||||
$msgq->send(3, [ "what"=>"Third" ]);
|
||||
|
||||
// Read messages by requesting a type...
|
||||
$msg = $msgq->receive(2, $type);
|
||||
printf("msg: %s, type: %d\n", json_encode($msg), $type);
|
||||
|
||||
// ...or read the first message with type 0...
|
||||
$msg = $msgq->receive(0, $type);
|
||||
printf("msg: %s, type: %d\n", json_encode($msg), $type);
|
||||
$msg = $msgq->receive(0, $type);
|
||||
printf("msg: %s, type: %d\n", json_encode($msg), $type);
|
||||
|
||||
$msgq->destroy();
|
35
examples/shmdata.php
Normal file
35
examples/shmdata.php
Normal file
@ -0,0 +1,35 @@
|
||||
<?php
|
||||
|
||||
require_once __DIR__."/../vendor/autoload.php";
|
||||
|
||||
use NoccyLabs\Ipc\Shm\SharedData;
|
||||
use NoccyLabs\Ipc\Key\FileKey;
|
||||
|
||||
$key = new FileKey(__FILE__);
|
||||
$shm = new SharedData($key);
|
||||
|
||||
// Set works as expected
|
||||
$shm->set("some.key", "Some value");
|
||||
// As does get
|
||||
echo $shm->get("some.key")."\n";
|
||||
|
||||
// To make sure a value isn't modified while you are modifying it, use the third
|
||||
// parameter to set...
|
||||
//
|
||||
// Let's start at 123
|
||||
$shm->set("some.counter", 123);
|
||||
$counter = $shm->get("some.counter");
|
||||
echo "Counter is: ".$counter."\n";
|
||||
// And attempt to increase it
|
||||
$shm->set("some.counter", $counter + 1, true);
|
||||
$counter = $shm->get("some.counter");
|
||||
echo "Counter is: ".$counter."\n";
|
||||
|
||||
// If the value is modified, the call will fail
|
||||
$shm2 = new SharedData($key);
|
||||
$shm2->set("some.counter", 345);
|
||||
if (!$shm->set("some.counter", $counter + 1, true)) {
|
||||
echo "some.counter has been modified since last read\n";
|
||||
}
|
||||
|
||||
$shm->destroy();
|
29
examples/signals.php
Normal file
29
examples/signals.php
Normal file
@ -0,0 +1,29 @@
|
||||
<?php
|
||||
|
||||
require_once __DIR__."/../vendor/autoload.php";
|
||||
|
||||
use NoccyLabs\Ipc\Signal\SignalHandler;
|
||||
use NoccyLabs\Ipc\Signal\Signal;
|
||||
|
||||
|
||||
$handler = new SignalHandler(SIGUSR1);
|
||||
|
||||
// Add handlers like this, or as an array passed as the second argument to the constructor
|
||||
$handler->addHandler(function () {
|
||||
echo "First handler\n";
|
||||
});
|
||||
// If a handler returns true, it will prevent any further handlers from firing
|
||||
$handler->addHandler(function () {
|
||||
echo "Second and final handler\n";
|
||||
return true;
|
||||
});
|
||||
// Thus, this one will not be called
|
||||
$handler->addHandler(function() {
|
||||
echo "Third handler, never called\n";
|
||||
});
|
||||
|
||||
// Dispatch the signal to ourselves
|
||||
(new Signal(SIGUSR1))->dispatch(posix_getpid());
|
||||
// Default target of dispatch is the own pid
|
||||
(new Signal(SIGUSR1))->dispatch();
|
||||
|
16
examples/signaltraps.php
Normal file
16
examples/signaltraps.php
Normal file
@ -0,0 +1,16 @@
|
||||
<?php
|
||||
|
||||
require_once __DIR__."/../vendor/autoload.php";
|
||||
|
||||
use NoccyLabs\Ipc\Signal\SignalTrap;
|
||||
|
||||
|
||||
|
||||
$trap = new SignalTrap(SIGINT);
|
||||
echo "Press ctrl-c...\n";
|
||||
|
||||
while (!$trap->isTrapped()) {
|
||||
usleep(10000);
|
||||
}
|
||||
|
||||
echo "Thanks!\n";
|
@ -32,6 +32,13 @@ class Queue
|
||||
|
||||
}
|
||||
|
||||
public function destroy()
|
||||
{
|
||||
if ($this->resource) {
|
||||
msg_remove_queue($this->resource);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Send to the queue
|
||||
*
|
||||
|
@ -8,18 +8,25 @@ use NoccyLabs\Ipc\Key\KeyInterface;
|
||||
|
||||
class Semaphore
|
||||
{
|
||||
public function __construct(KeyInterface $key, int $max)
|
||||
{
|
||||
protected $resource;
|
||||
|
||||
public function __construct(KeyInterface $key, int $max, $perm = 0660, $autorelease = 1)
|
||||
{
|
||||
$this->resource = sem_get($key->getKey(), $max, $perm, $autorelease);
|
||||
}
|
||||
|
||||
public function acquire(float $timeout)
|
||||
public function destroy()
|
||||
{
|
||||
|
||||
sem_remove($this->resource);
|
||||
}
|
||||
|
||||
public function release()
|
||||
public function acquire(float $timeout = 0):bool
|
||||
{
|
||||
|
||||
return sem_acquire($this->resource, true);
|
||||
}
|
||||
|
||||
public function release():bool
|
||||
{
|
||||
return sem_release($this->resource);
|
||||
}
|
||||
}
|
@ -113,6 +113,7 @@ class SharedData extends SharedMemory
|
||||
|
||||
// Update the data
|
||||
$this[self::IDX_DATA + $index] = [ md5($value), $value ];
|
||||
$this->checks[$key] = md5($value);
|
||||
$this->unlock();
|
||||
return true;
|
||||
}
|
||||
|
@ -29,7 +29,7 @@ class SharedMemoryBlock
|
||||
*/
|
||||
public function __construct(KeyInterface $key, string $flags, int $memsize, int $perm = 0666)
|
||||
{
|
||||
if (!($shm_resource = shmop_open($key->getKey(), $memsize, $perm))) {
|
||||
if (!($shm_resource = shmop_open($key->getKey(), $flags, $perm, $memsize))) {
|
||||
throw new \RuntimeException("Unable to attach shm resource {$key}");
|
||||
}
|
||||
|
||||
|
@ -17,8 +17,8 @@ class Signal
|
||||
pcntl_signal($this->signo, $handler);
|
||||
}
|
||||
|
||||
public function dispatch($pid):bool
|
||||
public function dispatch($pid=null):bool
|
||||
{
|
||||
return posix_kill($pid, $this->signo);
|
||||
return posix_kill($pid?:posix_getpid(), $this->signo);
|
||||
}
|
||||
}
|
27
tests/Sem/SemaphoreTest.php
Normal file
27
tests/Sem/SemaphoreTest.php
Normal file
@ -0,0 +1,27 @@
|
||||
<?php
|
||||
|
||||
namespace NoccyLabs\Ipc\Sem;
|
||||
|
||||
use NoccyLabs\Ipc\Key\FileKey;
|
||||
|
||||
|
||||
|
||||
class SemaphoreTest extends \PhpUnit\Framework\TestCase
|
||||
{
|
||||
|
||||
public function testCreatingAndAquiringSemaphore()
|
||||
{
|
||||
$key = new FileKey(__FILE__);
|
||||
$sem = new Semaphore($key, 2);
|
||||
|
||||
$this->assertTrue($sem->acquire());
|
||||
$this->assertTrue($sem->acquire());
|
||||
$this->assertFalse($sem->acquire());
|
||||
$this->assertTrue($sem->release());
|
||||
$this->assertTrue($sem->acquire());
|
||||
$this->assertFalse($sem->acquire());
|
||||
$this->assertTrue($sem->release());
|
||||
$this->assertTrue($sem->release());
|
||||
}
|
||||
|
||||
}
|
25
tests/Shm/SharedMemoryBlockTest.php
Normal file
25
tests/Shm/SharedMemoryBlockTest.php
Normal file
@ -0,0 +1,25 @@
|
||||
<?php
|
||||
|
||||
namespace NoccyLabs\Ipc\Shm;
|
||||
|
||||
use NoccyLabs\Ipc\Key\FileKey;
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
class SharedMemoryBlockTest extends \PhpUnit\Framework\TestCase
|
||||
{
|
||||
public function testWritingAndReading()
|
||||
{
|
||||
$key = new FileKey(__DIR__);
|
||||
$shm = new SharedMemoryBlock($key, SharedMemoryBlock::FLAG_CREATE, 64);
|
||||
|
||||
$this->assertEquals(11, $shm->write("Hello World", 0));
|
||||
$this->assertEquals("Hello World", $shm->read(11, 0));
|
||||
$this->assertEquals(3, $shm->write("foo", 2));
|
||||
$this->assertEquals("Hefoo World", $shm->read(11, 0));
|
||||
|
||||
$shm->destroy();
|
||||
}
|
||||
}
|
Reference in New Issue
Block a user