Initial commit

This commit is contained in:
2018-04-15 16:41:46 +02:00
commit 86ff40274b
29 changed files with 1368 additions and 0 deletions

View File

@ -0,0 +1,80 @@
<?php
namespace NoccyLabs\Ipc\Interop\Async;
use NoccyLabs\Ipc\Signal\SignalHandler;
/**
* Simple second-granularity timer for asynchronous events
*
*/
class Timer
{
private static $timers = null;
private $callback;
/**
* Constructor
*
* @param callable $callback
*/
public function __construct(callable $callback)
{
$this->callback = $callback;
$this->seconds = $seconds;
self::registerTimer($this->callback);
}
/**
* Destructor
*/
public function __destruct()
{
self::clearTimer($this->callback);
}
/**
* Register a timer callback function
*
* @param callable $timer
* @return void
*/
private static function registerTimer(callable $timer)
{
if (self::$timers === null) {
$handler = new SignalHandler(SIGALRM, [ __CLASS__."::onSignal" ]);
pcntl_alarm(1);
}
self::$timers[] = $timer;
}
/**
* Remove a timer callback function
*
* @param callable $timer
* @return void
*/
private static function clearTimer(callable $timer)
{
self::$timers = array_filter(self::$timers, function ($t) use ($timer) {
return $t !== $timer;
});
}
/**
* Handle signals when the alarm fires
*
* @return void
*/
public static function onSignal()
{
foreach (self::$timers as $timer) {
call_user_func($timer);
}
pcntl_alarm(1);
}
}

View File

@ -0,0 +1,13 @@
<?php
namespace NoccyLabs\Ipc\Interop\Channel;
interface ChannelInterface
{
public function isOpen():bool;
public function receive();
public function send($data);
}

View File

@ -0,0 +1,54 @@
<?php
namespace NoccyLabs\Ipc\Interop\Channel;
class StreamChannel implements ChannelInterface
{
protected $stream;
public function __construct($stream)
{
if (!is_resource($stream)) {
if (!is_string($stream)) {
throw new \LogicException("Invalid stream");
}
$stream = stream_socket_client($stream, $errno, $errstr);
if (!$stream) {
throw new \RuntimeException(sprintf("%d %s", $errno, $errstr));
}
}
$this->stream = $stream;
$this->isOpen();
}
public function send($data)
{
fwrite($this->stream, json_encode($data)."\0");
}
public function receive()
{
$buf = fread($this->stream, 8192);
return json_decode(rtrim($buf, "\0"));
}
public function isOpen(): bool
{
return is_resource($this->stream);
}
public static function createPair()
{
$fd = stream_socket_pair(STREAM_PF_UNIX, STREAM_SOCK_STREAM, STREAM_IPPROTO_IP);
return [
new StreamChannel($fd[0]),
new StreamChannel($fd[1])
];
}
}

84
src/Key/FileKey.php Normal file
View File

@ -0,0 +1,84 @@
<?php
namespace NoccyLabs\Ipc\Key;
class FileKey implements KeyInterface
{
private $proj;
private $pathname;
/**
* Constructor
*
* @param string $pathname
* @param string $proj
* @param boolean $create
*/
public function __construct(string $pathname, $proj="\0", bool $create=false)
{
if (!file_exists($pathname)) {
if (!$create) {
throw new \RuntimeException("Path does not exist: {$pathname}");
}
if (!is_dir(dirname($pathname))) {
throw new \RuntimeException("The directory ".dirname($pathname)." does not exist");
}
touch($pathname);
}
$this->pathname = $pathname;
$this->proj = substr($proj,0,1);
}
/**
* Get the pathname used to generate the key
*
* @return string
*/
public function getPathname():string
{
return $this->pathname;
}
/**
* Get the project identifier. Not guaranteed to be printable
*
* @return string
*/
public function getProjectIdentifier():string
{
return $this->proj;
}
/**
* Get the key value
*
* @return int
*/
public function getKey():int
{
return ftok($this->pathname, $this->proj);
}
/**
* Clone the FileKey, increasing the project identifier to create a new key for the
* same file
*
* @return void
*/
public function __clone()
{
$this->proj = chr(ord($this->proj) + 1);
}
/**
* Create printable representation of the key
*
* @return string
*/
public function __toString()
{
return $this->getKey();
}
}

9
src/Key/KeyInterface.php Normal file
View File

@ -0,0 +1,9 @@
<?php
namespace NoccyLabs\Ipc\Key;
interface KeyInterface
{
public function getKey():int;
public function __toString();
}

88
src/Lock/FileLock.php Normal file
View File

@ -0,0 +1,88 @@
<?php
namespace NoccyLabs\Ipc\Lock;
/**
* Simple filesystem-based lock
*
*
*/
class FileLock
{
const DEFAULT_TIMEOUT = 5;
protected $pathname;
protected $resource;
protected $locked = false;
/**
* Constructor
*
* @param string $pathname
*/
public function __construct(string $pathname)
{
if (!file_exists($pathname)) {
throw new \RuntimeException(sprintf(
"The file %s does not exist",
$pathname
));
}
$this->pathname = $pathname;
$this->resource = fopen($pathname, "r");
}
public function __destruct()
{
$this->release();
fclose($this->resource);
}
/**
* Acquire an exclusive lock
*
* @param float $timeout
* @return boolean
*/
public function acquire(float $timeout = self::DEFAULT_TIMEOUT):bool
{
if ($this->locked) {
return true;
}
$expire = microtime(true) + $timeout;
while (!flock($this->resource, LOCK_EX|LOCK_NB)) {
if (microtime(true)>$expire) {
return false;
}
usleep(1000);
}
$this->locked = true;
return true;
}
/**
* Release an acquired lock
*
* @return void
*/
public function release()
{
flock($this->resource, LOCK_UN);
$this->locked = false;
}
/**
* Check if the lock is acquired
*
* @return boolean
*/
public function isLocked():bool
{
return $this->locked;
}
}

87
src/Msg/Queue.php Normal file
View File

@ -0,0 +1,87 @@
<?php
namespace NoccyLabs\Ipc\Msg;
use NoccyLabs\Ipc\Key\KeyInterface;
/**
* Message queue wrapper
*
*/
class Queue
{
protected $resource;
/**
* Constructor
*
* @param KeyInterface $key
* @param integer $perms
*/
public function __construct(KeyInterface $key, int $perms = 0666)
{
$this->resource = msg_get_queue($key->getKey(), $perms);
}
/**
* Destructor
*/
public function __destruct()
{
}
/**
* Send to the queue
*
* @param integer $msgtype
* @param mixed $message
* @param boolean $blocking
* @throws RuntimeException on error
*/
public function send(int $msgtype, $message, bool $blocking = true)
{
$ret = msg_send($this->resource, $msgtype, $message, true, $blocking, $errno);
if ($ret === false) {
throw new \RuntimeException(sprintf(
"msg_send error %d",
$errno
));
}
}
/**
* Receive from the queue
*
* @param integer $desiredmsgtype
* @param integer $msgtype
* @param integer $maxsize
* @param boolean $blocking
* @return mixed
* @throws RuntimeException on error
*/
public function receive(int $desiredmsgtype, &$msgtype, int $maxsize=4096, bool $blocking = true)
{
$ret = msg_receive(
$this->resource,
$desiredmsgtype,
$msgtype,
$maxsize,
$message,
true,
$blocking?MSG_IPC_NOWAIT:0,
$errno
);
if ($ret === false) {
throw new \RuntimeException(sprintf(
"msg_receive error %d",
$errno
));
}
return $message;
}
}

15
src/Sem/Mutex.php Normal file
View File

@ -0,0 +1,15 @@
<?php
namespace NoccyLabs\Ipc\Sem;
use NoccyLabs\Ipc\Key\KeyInterface;
class Mutex extends Semaphore
{
public function __construct(KeyInterface $key)
{
parent::__construct($key, 1);
}
}

25
src/Sem/Semaphore.php Normal file
View File

@ -0,0 +1,25 @@
<?php
namespace NoccyLabs\Ipc\Sem;
use NoccyLabs\Ipc\Key\KeyInterface;
class Semaphore
{
public function __construct(KeyInterface $key, int $max)
{
}
public function acquire(float $timeout)
{
}
public function release()
{
}
}

133
src/Shm/SharedData.php Normal file
View File

@ -0,0 +1,133 @@
<?php
namespace NoccyLabs\Ipc\Shm;
/**
* Shared data storage with a key-value interface
*
*/
class SharedData extends SharedMemory
{
private const IDX_LOCK = 0;
private const IDX_MAP = 1;
private const IDX_DATA = 2;
private $checks = [];
/**
* Acquire a lock
*
* @return void
*/
private function lock()
{
$expire = microtime(true) + 1;
while ($this[self::IDX_LOCK] !== null) {
usleep(1000);
// Forcefully continue after 1 second
if (microtime(true)>$expire) {
break;
}
}
$this[self::IDX_LOCK] = 1;
}
/**
* Release the lock
*
* @return void
*/
private function unlock()
{
$this[self::IDX_LOCK] = null;
}
/**
* Get the value of a key
*
* @param string $key
* @return mixed
*/
public function get(string $key)
{
$map = (array)$this[self::IDX_MAP];
// If the key doesn't exist, return null
if (!array_key_exists($key, $map)) {
return null;
}
$index = $map[$key];
$keyvar = $this[self::IDX_DATA + $index];
// If for some reason we don't get anything back, return null
if (!$keyvar) {
return null;
}
// Save the checksum for later and return the value
$this->checks[$key] = $keyvar[0];
return $keyvar[1];
}
/**
* Set a key to a specific value
*
* @param string $key
* @param mixed $value
* @param boolean $check
* @return boolean
*/
public function set(string $key, $value, $check=false):bool
{
// Acquire a lock and fetch the map
$this->lock();
$map = (array)$this[self::IDX_MAP];
// If the key doesn't exist in the map, we need to set it up
if (!array_key_exists($key, $map)) {
$free = 0;
// Find first free index
foreach ($map as $map_key=>$index) {
if ($index != $free) { break; }
$free++;
}
// Store the index in the map, and sort it by index.
$map[$key] = $free;
asort($map);
$this[self::IDX_MAP] = $map;
}
// Look up the key and fetch the data
$index = $map[$key];
$keyvar = $this[self::IDX_DATA + $index];
// Check the data to make sure the data hasn't changed since last read
if ($check && $keyvar && array_key_exists($key, $this->checks)) {
if ($keyvar[0] != $this->checks[$key]) {
$this->unlock();
return false;
}
}
// Update the data
$this[self::IDX_DATA + $index] = [ md5($value), $value ];
$this->unlock();
return true;
}
/**
* Check if the map contains the key. This method doesn't acquire a lock
*
* @param string $key
* @return bool
*/
public function contains(string $key):bool
{
$map = (array)$this[self::IDX_MAP];
return array_key_exists($key, $map);
}
}

94
src/Shm/SharedMemory.php Normal file
View File

@ -0,0 +1,94 @@
<?php
namespace NoccyLabs\Ipc\Shm;
use NoccyLabs\Ipc\Key\KeyInterface;
/**
* Shared memory segment
*/
class SharedMemory implements \ArrayAccess
{
protected $resource;
/**
* Constructor
*
* @param KeyInterface $key
* @param integer $memsize
* @param integer $perm
*/
public function __construct(KeyInterface $key, int $memsize = 64000, int $perm = 0666)
{
if (!($shm_resource = shm_attach($key->getKey(), $memsize, $perm))) {
throw new \RuntimeException("Unable to attach shm resource {$key}");
}
$this->resource = $shm_resource;
}
/**
* Destructor
*/
public function __destruct()
{
shm_detach($this->resource);
}
/**
* Destroy the shm segment
*
* @return void
*/
public function destroy()
{
shm_remove($this->resource);
}
/**
* {@inheritDoc}
*
* @param int $offset
* @return void
*/
public function offsetExists($offset)
{
return shm_has_var($this->resource, $offset);
}
/**
* {@inheritDoc}
*
* @param int $offset
* @return mixed
*/
public function offsetGet($offset)
{
return shm_has_var($this->resource, $offset)?shm_get_var($this->resource, $offset):null;
}
/**
* {@inheritDoc}
*
* @param int $offset
* @param mixed $value
* @return void
*/
public function offsetSet($offset, $value)
{
shm_put_var($this->resource, $offset, $value);
}
/**
* {@inheritDoc}
*
* @param int $offset
* @return void
*/
public function offsetUnset($offset)
{
shm_remove_var($this->resource, $offset);
}
}

View File

@ -0,0 +1,99 @@
<?php
namespace NoccyLabs\Ipc\Shm;
use NoccyLabs\Ipc\Key\KeyInterface;
/**
* A block of shared memory that can be read and write like a string buffer
*
*
*/
class SharedMemoryBlock
{
const FLAG_ACCESS = "a";
const FLAG_CREATE = "c";
const FLAG_WRITE = "w";
const FLAG_NEW = "n";
protected $resource;
/**
* Constructor
*
* @param KeyInterface $key
* @param string $flags
* @param integer $memsize
* @param integer $perm
*/
public function __construct(KeyInterface $key, string $flags, int $memsize, int $perm = 0666)
{
if (!($shm_resource = shmop_open($key->getKey(), $memsize, $perm))) {
throw new \RuntimeException("Unable to attach shm resource {$key}");
}
$this->resource = $shm_resource;
}
/**
* Destructor
*/
public function __destruct()
{
if (!$this->resource) {
return;
}
shmop_close($this->resource);
}
/**
* Destroy the memory block
*
* @return void
*/
public function destroy()
{
shmop_delete($this->resource);
shmop_close($this->resource);
$this->resource = null;
}
/**
* Read bytes from the shared block
*
* @param integer $length
* @param integer $offset
* @return mixed
*/
public function read($length=0, $offset=0)
{
if ($length == 0) {
$length = shmop_size($this->resource) - $offset;
}
return shmop_read($this->resource, $offset, $length);
}
/**
* Write bytes to the shared block
*
* @param mixed $data
* @param integer $offset
* @return integer
*/
public function write($data, $offset=0):int
{
return shmop_write($this->resource, $data, $offset);
}
/**
* Get the size of the memory block
*
* @return integer
*/
public function getSize():int
{
return shmop_size($this->resource);
}
}

24
src/Signal/Signal.php Normal file
View File

@ -0,0 +1,24 @@
<?php
namespace NoccyLabs\Ipc\Signal;
class Signal
{
private $signo;
public function __construct(int $signo)
{
$this->signo = $signo;
}
public function setHandler(callable $handler):void
{
pcntl_signal($this->signo, $handler);
}
public function dispatch($pid):bool
{
return posix_kill($pid, $this->signo);
}
}

View File

@ -0,0 +1,37 @@
<?php
namespace NoccyLabs\Ipc\Signal;
class SignalHandler
{
private $signo;
private $callbacks;
public function __construct(int $signo, array $callbacks=[])
{
$this->signo = $signo;
$this->callbacks = $callbacks;
pcntl_signal($this->signo, [ $this, "onSignal" ]);
}
public function __destruct()
{
}
public function addHandler(callable $handler):void
{
$this->callbacks[] = $handler;
}
public function onSignal($signo)
{
foreach ($this->callbacks as $callback) {
if (call_user_func($callback) === true)
return;
}
}
}

32
src/Signal/SignalTrap.php Normal file
View File

@ -0,0 +1,32 @@
<?php
namespace NoccyLabs\Ipc\Signal;
class SignalTrap
{
protected $signal;
protected $trapped = false;
public function __construct(int $signo)
{
$this->signal = new Signal($signo);
$this->signal->setHandler([ $this, "onSignal" ]);
}
public function onSignal()
{
$this->trapped = true;
}
public function isTrapped($reset=true):bool
{
if ($this->trapped) {
$reset && ($this->trapped = false);
return true;
}
return false;
}
}

41
src/signals.stub.php Normal file
View File

@ -0,0 +1,41 @@
<?php
if (function_exists('pcntl_async_signals')) {
define("SIGNALS_ASYNC", true);
pcntl_async_signals(true);
} else {
define("SIGNALS_ASYNC", false);
}
// Signal shims
defined("SIGHUP") || define("SIGHUP", 1); // Hangup (POSIX)
defined("SIGINT") || define("SIGINT", 2); // Terminal interrupt (ANSI)
defined("SIGQUIT") || define("SIGQUIT", 3); // Terminal quit (POSIX)
defined("SIGILL") || define("SIGILL", 4); // Illegal instruction (ANSI)
defined("SIGTRAP") || define("SIGTRAP", 5); // Trace trap (POSIX)
defined("SIGIOT") || define("SIGIOT", 6); // IOT Trap (4.2 BSD)
defined("SIGBUS") || define("SIGBUS", 7); // BUS error (4.2 BSD)
defined("SIGFPE") || define("SIGFPE", 8); // Floating point exception (ANSI)
defined("SIGKILL") || define("SIGKILL", 9); // Kill(can't be caught or ignored) (POSIX)
defined("SIGUSR1") || define("SIGUSR1", 10); // User defined signal 1 (POSIX)
defined("SIGSEGV") || define("SIGSEGV", 11); // Invalid memory segment access (ANSI)
defined("SIGUSR2") || define("SIGUSR2", 12); // User defined signal 2 (POSIX)
defined("SIGPIPE") || define("SIGPIPE", 13); // Write on a pipe with no reader, Broken pipe (POSIX)
defined("SIGALRM") || define("SIGALRM", 14); // Alarm clock (POSIX)
defined("SIGTERM") || define("SIGTERM", 15); // Termination (ANSI)
defined("SIGSTKFLT") || define("SIGSTKFLT", 16); // Stack fault
defined("SIGCHLD") || define("SIGCHLD", 17); // Child process has stopped or exited, changed (POSIX)
defined("SIGCONT") || define("SIGCONT", 18); // Continue executing, if stopped (POSIX)
defined("SIGSTOP") || define("SIGSTOP", 19); // Stop executing(can't be caught or ignored) (POSIX)
defined("SIGTSTP") || define("SIGTSTP", 20); // Terminal stop signal (POSIX)
defined("SIGTTIN") || define("SIGTTIN", 21); // Background process trying to read, from TTY (POSIX)
defined("SIGTTOU") || define("SIGTTOU", 22); // Background process trying to write, to TTY (POSIX)
defined("SIGURG") || define("SIGURG", 23); // Urgent condition on socket (4.2 BSD)
defined("SIGXCPU") || define("SIGXCPU", 24); // CPU limit exceeded (4.2 BSD)
defined("SIGXFSZ") || define("SIGXFSZ", 25); // File size limit exceeded (4.2 BSD)
defined("SIGVTALRM") || define("SIGVTALRM", 26); // Virtual alarm clock (4.2 BSD)
defined("SIGPROF") || define("SIGPROF", 27); // Profiling alarm clock (4.2 BSD)
defined("SIGWINCH") || define("SIGWINCH", 28); // Window size change (4.3 BSD, Sun)
defined("SIGIO") || define("SIGIO", 29); // I/O now possible (4.2 BSD)
defined("SIGPWR") || define("SIGPWR", 30); // Power failure restart (System V)