commit 86ff40274b4403e0cf29225ece7f28a7f6934184 Author: Christopher Vagnetoft Date: Sun Apr 15 16:41:46 2018 +0200 Initial commit diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..4fbb073 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +/vendor/ +/composer.lock diff --git a/README.md b/README.md new file mode 100644 index 0000000..a2261e8 --- /dev/null +++ b/README.md @@ -0,0 +1,27 @@ +noccylabs/ipc +============= + +This is a one-size-fits-all IPC library to facilitate communication between +threads and processes. It contains the following features: + +**Core:** + + - [ ] Semaphore + - [ ] Mutex + - [x] Queue + - [x] SharedMemory key-value store + - [ ] SharedMemory blocks + - [x] Signals + - [x] Locks + +**High-Level:** + + - [ ] EventBridge + - [ ] EventDispatcher + - [ ] InterOp/Marshalling + - [x] Asynchronous timers + +**Transports:** + + - [x] Stream channels + \ No newline at end of file diff --git a/composer.json b/composer.json new file mode 100644 index 0000000..3d626d7 --- /dev/null +++ b/composer.json @@ -0,0 +1,27 @@ +{ + "name": "noccylabs/ipc", + "description": "A complete set of IPC facilities", + "type": "library", + "license": "GPL-3.0", + "authors": [ + { + "name": "Christopher Vagnetoft", + "email": "cvagnetoft@gmail.com" + } + ], + "require": {}, + "repositories": [ + { + "type": "composer", + "url": "https://packages.noccylabs.info/composer/" + } + ], + "autoload": { + "files": [ + "src/signals.stub.php" + ], + "psr-4": { + "NoccyLabs\\Ipc\\": "src/" + } + } +} \ No newline at end of file diff --git a/examples/timers.php b/examples/timers.php new file mode 100644 index 0000000..5b52991 --- /dev/null +++ b/examples/timers.php @@ -0,0 +1,15 @@ + + + + tests + + + + + src + + + diff --git a/src/Interop/Async/Timer.php b/src/Interop/Async/Timer.php new file mode 100644 index 0000000..4f34a5f --- /dev/null +++ b/src/Interop/Async/Timer.php @@ -0,0 +1,80 @@ +callback = $callback; + $this->seconds = $seconds; + self::registerTimer($this->callback); + } + + /** + * Destructor + */ + public function __destruct() + { + self::clearTimer($this->callback); + } + + /** + * Register a timer callback function + * + * @param callable $timer + * @return void + */ + private static function registerTimer(callable $timer) + { + if (self::$timers === null) { + $handler = new SignalHandler(SIGALRM, [ __CLASS__."::onSignal" ]); + pcntl_alarm(1); + } + self::$timers[] = $timer; + } + + /** + * Remove a timer callback function + * + * @param callable $timer + * @return void + */ + private static function clearTimer(callable $timer) + { + self::$timers = array_filter(self::$timers, function ($t) use ($timer) { + return $t !== $timer; + }); + } + + /** + * Handle signals when the alarm fires + * + * @return void + */ + public static function onSignal() + { + foreach (self::$timers as $timer) { + call_user_func($timer); + } + pcntl_alarm(1); + } + +} + diff --git a/src/Interop/Channel/ChannelInterface.php b/src/Interop/Channel/ChannelInterface.php new file mode 100644 index 0000000..0f09313 --- /dev/null +++ b/src/Interop/Channel/ChannelInterface.php @@ -0,0 +1,13 @@ +stream = $stream; + + $this->isOpen(); + } + + public function send($data) + { + fwrite($this->stream, json_encode($data)."\0"); + } + + public function receive() + { + $buf = fread($this->stream, 8192); + + return json_decode(rtrim($buf, "\0")); + } + + public function isOpen(): bool + { + return is_resource($this->stream); + } + + public static function createPair() + { + $fd = stream_socket_pair(STREAM_PF_UNIX, STREAM_SOCK_STREAM, STREAM_IPPROTO_IP); + + return [ + new StreamChannel($fd[0]), + new StreamChannel($fd[1]) + ]; + } + +} \ No newline at end of file diff --git a/src/Key/FileKey.php b/src/Key/FileKey.php new file mode 100644 index 0000000..f8181d4 --- /dev/null +++ b/src/Key/FileKey.php @@ -0,0 +1,84 @@ +pathname = $pathname; + $this->proj = substr($proj,0,1); + } + + /** + * Get the pathname used to generate the key + * + * @return string + */ + public function getPathname():string + { + return $this->pathname; + } + + /** + * Get the project identifier. Not guaranteed to be printable + * + * @return string + */ + public function getProjectIdentifier():string + { + return $this->proj; + } + + /** + * Get the key value + * + * @return int + */ + public function getKey():int + { + return ftok($this->pathname, $this->proj); + } + + /** + * Clone the FileKey, increasing the project identifier to create a new key for the + * same file + * + * @return void + */ + public function __clone() + { + $this->proj = chr(ord($this->proj) + 1); + } + + /** + * Create printable representation of the key + * + * @return string + */ + public function __toString() + { + return $this->getKey(); + } +} \ No newline at end of file diff --git a/src/Key/KeyInterface.php b/src/Key/KeyInterface.php new file mode 100644 index 0000000..2b9cbb6 --- /dev/null +++ b/src/Key/KeyInterface.php @@ -0,0 +1,9 @@ +pathname = $pathname; + $this->resource = fopen($pathname, "r"); + } + + public function __destruct() + { + $this->release(); + fclose($this->resource); + } + + /** + * Acquire an exclusive lock + * + * @param float $timeout + * @return boolean + */ + public function acquire(float $timeout = self::DEFAULT_TIMEOUT):bool + { + if ($this->locked) { + return true; + } + + $expire = microtime(true) + $timeout; + while (!flock($this->resource, LOCK_EX|LOCK_NB)) { + if (microtime(true)>$expire) { + return false; + } + usleep(1000); + } + + $this->locked = true; + return true; + } + + /** + * Release an acquired lock + * + * @return void + */ + public function release() + { + flock($this->resource, LOCK_UN); + $this->locked = false; + } + + /** + * Check if the lock is acquired + * + * @return boolean + */ + public function isLocked():bool + { + return $this->locked; + } +} \ No newline at end of file diff --git a/src/Msg/Queue.php b/src/Msg/Queue.php new file mode 100644 index 0000000..078e365 --- /dev/null +++ b/src/Msg/Queue.php @@ -0,0 +1,87 @@ +resource = msg_get_queue($key->getKey(), $perms); + } + + /** + * Destructor + */ + public function __destruct() + { + + } + + /** + * Send to the queue + * + * @param integer $msgtype + * @param mixed $message + * @param boolean $blocking + * @throws RuntimeException on error + */ + public function send(int $msgtype, $message, bool $blocking = true) + { + $ret = msg_send($this->resource, $msgtype, $message, true, $blocking, $errno); + + if ($ret === false) { + throw new \RuntimeException(sprintf( + "msg_send error %d", + $errno + )); + } + } + + /** + * Receive from the queue + * + * @param integer $desiredmsgtype + * @param integer $msgtype + * @param integer $maxsize + * @param boolean $blocking + * @return mixed + * @throws RuntimeException on error + */ + public function receive(int $desiredmsgtype, &$msgtype, int $maxsize=4096, bool $blocking = true) + { + $ret = msg_receive( + $this->resource, + $desiredmsgtype, + $msgtype, + $maxsize, + $message, + true, + $blocking?MSG_IPC_NOWAIT:0, + $errno + ); + + if ($ret === false) { + throw new \RuntimeException(sprintf( + "msg_receive error %d", + $errno + )); + } + + return $message; + } +} \ No newline at end of file diff --git a/src/Sem/Mutex.php b/src/Sem/Mutex.php new file mode 100644 index 0000000..943524d --- /dev/null +++ b/src/Sem/Mutex.php @@ -0,0 +1,15 @@ +$expire) { + break; + } + } + $this[self::IDX_LOCK] = 1; + } + + /** + * Release the lock + * + * @return void + */ + private function unlock() + { + $this[self::IDX_LOCK] = null; + } + + /** + * Get the value of a key + * + * @param string $key + * @return mixed + */ + public function get(string $key) + { + $map = (array)$this[self::IDX_MAP]; + + // If the key doesn't exist, return null + if (!array_key_exists($key, $map)) { + return null; + } + + $index = $map[$key]; + $keyvar = $this[self::IDX_DATA + $index]; + + // If for some reason we don't get anything back, return null + if (!$keyvar) { + return null; + } + + // Save the checksum for later and return the value + $this->checks[$key] = $keyvar[0]; + return $keyvar[1]; + + } + + /** + * Set a key to a specific value + * + * @param string $key + * @param mixed $value + * @param boolean $check + * @return boolean + */ + public function set(string $key, $value, $check=false):bool + { + // Acquire a lock and fetch the map + $this->lock(); + $map = (array)$this[self::IDX_MAP]; + + // If the key doesn't exist in the map, we need to set it up + if (!array_key_exists($key, $map)) { + $free = 0; + // Find first free index + foreach ($map as $map_key=>$index) { + if ($index != $free) { break; } + $free++; + } + // Store the index in the map, and sort it by index. + $map[$key] = $free; + asort($map); + $this[self::IDX_MAP] = $map; + } + + // Look up the key and fetch the data + $index = $map[$key]; + $keyvar = $this[self::IDX_DATA + $index]; + + // Check the data to make sure the data hasn't changed since last read + if ($check && $keyvar && array_key_exists($key, $this->checks)) { + if ($keyvar[0] != $this->checks[$key]) { + $this->unlock(); + return false; + } + } + + // Update the data + $this[self::IDX_DATA + $index] = [ md5($value), $value ]; + $this->unlock(); + return true; + } + + /** + * Check if the map contains the key. This method doesn't acquire a lock + * + * @param string $key + * @return bool + */ + public function contains(string $key):bool + { + $map = (array)$this[self::IDX_MAP]; + + return array_key_exists($key, $map); + } + +} \ No newline at end of file diff --git a/src/Shm/SharedMemory.php b/src/Shm/SharedMemory.php new file mode 100644 index 0000000..6ccbaf8 --- /dev/null +++ b/src/Shm/SharedMemory.php @@ -0,0 +1,94 @@ +getKey(), $memsize, $perm))) { + throw new \RuntimeException("Unable to attach shm resource {$key}"); + } + + $this->resource = $shm_resource; + } + + /** + * Destructor + */ + public function __destruct() + { + shm_detach($this->resource); + + } + + /** + * Destroy the shm segment + * + * @return void + */ + public function destroy() + { + shm_remove($this->resource); + } + + /** + * {@inheritDoc} + * + * @param int $offset + * @return void + */ + public function offsetExists($offset) + { + return shm_has_var($this->resource, $offset); + } + + /** + * {@inheritDoc} + * + * @param int $offset + * @return mixed + */ + public function offsetGet($offset) + { + return shm_has_var($this->resource, $offset)?shm_get_var($this->resource, $offset):null; + } + + /** + * {@inheritDoc} + * + * @param int $offset + * @param mixed $value + * @return void + */ + public function offsetSet($offset, $value) + { + shm_put_var($this->resource, $offset, $value); + } + + /** + * {@inheritDoc} + * + * @param int $offset + * @return void + */ + public function offsetUnset($offset) + { + shm_remove_var($this->resource, $offset); + } +} \ No newline at end of file diff --git a/src/Shm/SharedMemoryBlock.php b/src/Shm/SharedMemoryBlock.php new file mode 100644 index 0000000..ec23762 --- /dev/null +++ b/src/Shm/SharedMemoryBlock.php @@ -0,0 +1,99 @@ +getKey(), $memsize, $perm))) { + throw new \RuntimeException("Unable to attach shm resource {$key}"); + } + + $this->resource = $shm_resource; + } + + /** + * Destructor + */ + public function __destruct() + { + if (!$this->resource) { + return; + } + shmop_close($this->resource); + } + + /** + * Destroy the memory block + * + * @return void + */ + public function destroy() + { + shmop_delete($this->resource); + shmop_close($this->resource); + $this->resource = null; + } + + /** + * Read bytes from the shared block + * + * @param integer $length + * @param integer $offset + * @return mixed + */ + public function read($length=0, $offset=0) + { + if ($length == 0) { + $length = shmop_size($this->resource) - $offset; + } + return shmop_read($this->resource, $offset, $length); + } + + /** + * Write bytes to the shared block + * + * @param mixed $data + * @param integer $offset + * @return integer + */ + public function write($data, $offset=0):int + { + return shmop_write($this->resource, $data, $offset); + } + + /** + * Get the size of the memory block + * + * @return integer + */ + public function getSize():int + { + return shmop_size($this->resource); + } + +} \ No newline at end of file diff --git a/src/Signal/Signal.php b/src/Signal/Signal.php new file mode 100644 index 0000000..ee35363 --- /dev/null +++ b/src/Signal/Signal.php @@ -0,0 +1,24 @@ +signo = $signo; + } + + public function setHandler(callable $handler):void + { + pcntl_signal($this->signo, $handler); + } + + public function dispatch($pid):bool + { + return posix_kill($pid, $this->signo); + } +} \ No newline at end of file diff --git a/src/Signal/SignalHandler.php b/src/Signal/SignalHandler.php new file mode 100644 index 0000000..d5545e0 --- /dev/null +++ b/src/Signal/SignalHandler.php @@ -0,0 +1,37 @@ +signo = $signo; + $this->callbacks = $callbacks; + pcntl_signal($this->signo, [ $this, "onSignal" ]); + } + + public function __destruct() + { + + } + + public function addHandler(callable $handler):void + { + $this->callbacks[] = $handler; + } + + public function onSignal($signo) + { + foreach ($this->callbacks as $callback) { + if (call_user_func($callback) === true) + return; + } + } + +} \ No newline at end of file diff --git a/src/Signal/SignalTrap.php b/src/Signal/SignalTrap.php new file mode 100644 index 0000000..8846999 --- /dev/null +++ b/src/Signal/SignalTrap.php @@ -0,0 +1,32 @@ +signal = new Signal($signo); + $this->signal->setHandler([ $this, "onSignal" ]); + } + + public function onSignal() + { + $this->trapped = true; + } + + public function isTrapped($reset=true):bool + { + if ($this->trapped) { + $reset && ($this->trapped = false); + return true; + } + return false; + } + +} \ No newline at end of file diff --git a/src/signals.stub.php b/src/signals.stub.php new file mode 100644 index 0000000..e49f268 --- /dev/null +++ b/src/signals.stub.php @@ -0,0 +1,41 @@ +send($frame); + $rcvd = $sc2->receive(); + + $this->assertEquals($frame, $rcvd); + } + + public function testCreatingPair() + { + [ $sc1, $sc2 ] = StreamChannel::createPair(); + + $frame = "Hello World"; + + $sc1->send($frame); + $rcvd = $sc2->receive(); + + $this->assertEquals($frame, $rcvd); + } + +} \ No newline at end of file diff --git a/tests/Key/FileKeyTest.php b/tests/Key/FileKeyTest.php new file mode 100644 index 0000000..6186045 --- /dev/null +++ b/tests/Key/FileKeyTest.php @@ -0,0 +1,39 @@ +getKey(); + + $this->assertGreaterThan(0, $key); + } + + public function testKeyGenerationFromTemporaryFile() + { + $tempfile = "/tmp/key.tmp"; + file_exists($tempfile) && unlink($tempfile); + + $keyfile = new FileKey($tempfile, "a", true); + + $key = $keyfile->getKey(); + + $this->assertGreaterThan(0, $key); + + } + + public function testCloningShouldIncreaseProject() + { + $keyfile = new FileKey(__FILE__, "a"); + + $this->assertEquals("a", $keyfile->getProjectIdentifier()); + + $keyfile2 = clone $keyfile; + + $this->assertEquals("b", $keyfile2->getProjectIdentifier()); + } +} \ No newline at end of file diff --git a/tests/Lock/FileLockTest.php b/tests/Lock/FileLockTest.php new file mode 100644 index 0000000..91c7dbc --- /dev/null +++ b/tests/Lock/FileLockTest.php @@ -0,0 +1,18 @@ +assertEquals(true, $lock1->acquire(0)); + $this->assertEquals(false, $lock2->acquire(0)); + + } +} \ No newline at end of file diff --git a/tests/Msg/QueueTest.php b/tests/Msg/QueueTest.php new file mode 100644 index 0000000..28069f4 --- /dev/null +++ b/tests/Msg/QueueTest.php @@ -0,0 +1,70 @@ +send(1, "Hello World"); + + $ret = $queue->receive(1, $type); + $this->assertEquals("Hello World", $ret); + $this->assertEquals(1, $type); + } + + public function testSendingAndReceivingWithTypes() + { + $key = new FileKey(__FILE__); + + $queue = new Queue($key); + + $queue->send(1, "Hello World"); + $queue->send(2, "Hello World"); + $queue->send(3, "Hello World"); + + $ret = $queue->receive(1, $type); + $this->assertEquals(1, $type); + + $ret = $queue->receive(2, $type); + $this->assertEquals(2, $type); + + $ret = $queue->receive(3, $type); + $this->assertEquals(3, $type); + + } + + public function testReceivingFromFront() + { + $key = new FileKey(__FILE__); + + $queue = new Queue($key); + + $queue->send(1, "Hello World"); + $queue->send(2, "Hello World"); + $queue->send(3, "Hello World"); + + $ret = $queue->receive(0, $type); + $this->assertEquals(1, $type); + + $ret = $queue->receive(0, $type); + $this->assertEquals(2, $type); + + $ret = $queue->receive(0, $type); + $this->assertEquals(3, $type); + + } + + +} \ No newline at end of file diff --git a/tests/Shm/SharedDataTest.php b/tests/Shm/SharedDataTest.php new file mode 100644 index 0000000..544aabd --- /dev/null +++ b/tests/Shm/SharedDataTest.php @@ -0,0 +1,31 @@ +assertNull($shm->get("foo")); + + $this->assertTrue($shm->set("foo", "hello")); + $this->assertEquals("hello", $shm->get("foo")); + + $this->assertNull($shm->get("bar")); + + $this->assertTrue($shm->set("bar", "world")); + $this->assertEquals("hello", $shm->get("foo")); + $this->assertEquals("world", $shm->get("bar")); + + $shm->destroy(); + } +} \ No newline at end of file diff --git a/tests/Signal/SignalHandlerTest.php b/tests/Signal/SignalHandlerTest.php new file mode 100644 index 0000000..d82d4a0 --- /dev/null +++ b/tests/Signal/SignalHandlerTest.php @@ -0,0 +1,85 @@ +assertEquals(true, $handler1); + } + + public function testSignalHandlerWithMultipleHandlersInConstructor() + { + $callbacks = [ + function () use (&$handler1) { $handler1 = true; }, + function () use (&$handler2) { $handler2 = true; }, + function () use (&$handler3) { $handler3 = true; } + ]; + + $handler = new SignalHandler(SIGUSR1, $callbacks); + + posix_kill(posix_getpid(), SIGUSR1); + + usleep(1000); + + $this->assertEquals(true, $handler1); + $this->assertEquals(true, $handler2); + $this->assertEquals(true, $handler3); + } + + public function testSignalHandlerWithAddedHandlers() + { + $callbacks = [ + function () use (&$handler1) { $handler1 = true; }, + function () use (&$handler2) { $handler2 = true; } + ]; + + $handler = new SignalHandler(SIGUSR1, $callbacks); + + $handler->addHandler(function () use (&$handler3) { $handler3 = true; }); + + posix_kill(posix_getpid(), SIGUSR1); + + usleep(1000); + + $this->assertEquals(true, $handler1); + $this->assertEquals(true, $handler2); + $this->assertEquals(true, $handler3); + } + + public function testSignalHandlerWithBlockingHandler() + { + $callbacks = [ + function () use (&$handler1) { $handler1 = true; return true; }, + function () use (&$handler2) { $handler2 = true; }, + function () use (&$handler3) { $handler3 = true; } + ]; + + $handler = new SignalHandler(SIGUSR1, $callbacks); + + posix_kill(posix_getpid(), SIGUSR1); + + usleep(1000); + + $this->assertEquals(true, $handler1); + $this->assertEquals(false, $handler2); + $this->assertEquals(false, $handler3); + } + +} \ No newline at end of file diff --git a/tests/Signal/SignalTest.php b/tests/Signal/SignalTest.php new file mode 100644 index 0000000..0eed1d5 --- /dev/null +++ b/tests/Signal/SignalTest.php @@ -0,0 +1,40 @@ +setHandler($callback); + + posix_kill(posix_getpid(), SIGUSR1); + + usleep(1000); + + $this->assertEquals(true, $handler1); + } + + public function testSimpleSignalDispatcher() + { + $callback = function () use (&$handler1) { $handler1 = true; }; + + $handler = new Signal(SIGUSR1); + $handler->setHandler($callback); + + $handler->dispatch(posix_getpid()); + + usleep(1000); + + $this->assertEquals(true, $handler1); + } + +} \ No newline at end of file diff --git a/tests/Signal/SignalTrapTest.php b/tests/Signal/SignalTrapTest.php new file mode 100644 index 0000000..f406378 --- /dev/null +++ b/tests/Signal/SignalTrapTest.php @@ -0,0 +1,42 @@ +assertEquals(false, $trap->isTrapped()); + + posix_kill(posix_getpid(), SIGUSR1); + + usleep(1000); + + $this->assertEquals(true, $trap->isTrapped()); + + $this->assertEquals(false, $trap->isTrapped()); + } + + public function testSignalTrapWithoutResetting() + { + $trap = new SignalTrap(SIGUSR1); + + $this->assertEquals(false, $trap->isTrapped()); + + posix_kill(posix_getpid(), SIGUSR1); + + usleep(1000); + + $this->assertEquals(true, $trap->isTrapped(false)); + + $this->assertEquals(true, $trap->isTrapped()); + } + +} \ No newline at end of file