php-ipc/src/Msg/Queue.php

99 lines
2.0 KiB
PHP

<?php
namespace NoccyLabs\Ipc\Msg;
use NoccyLabs\Ipc\Key\KeyInterface;
/**
* Message queue wrapper
*
*/
class Queue
{
protected $resource;
/**
* Constructor
*
* @param KeyInterface $key
* @param integer $perms
*/
public function __construct(KeyInterface $key, int $perms = 0666)
{
$this->resource = msg_get_queue($key->getKey(), $perms);
}
/**
* Destructor
*/
public function __destruct()
{
}
/**
* Destroy the queue
*
* @return void
*/
public function destroy()
{
if ($this->resource) {
msg_remove_queue($this->resource);
}
}
/**
* Send to the queue
*
* @param integer $msgtype
* @param mixed $message
* @param boolean $blocking
* @throws RuntimeException on error
*/
public function send(int $msgtype, $message, bool $blocking = true)
{
$ret = msg_send($this->resource, $msgtype, $message, true, $blocking, $errno);
if ($ret === false) {
throw new \RuntimeException(sprintf(
"msg_send error %d",
$errno
));
}
}
/**
* Receive from the queue
*
* @param integer $desiredmsgtype
* @param integer $msgtype
* @param integer $maxsize
* @param boolean $blocking
* @return mixed
* @throws RuntimeException on error
*/
public function receive(int $desiredmsgtype, &$msgtype, int $maxsize=4096, bool $blocking = true)
{
$ret = msg_receive(
$this->resource,
$desiredmsgtype,
$msgtype,
$maxsize,
$message,
true,
$blocking?MSG_IPC_NOWAIT:0,
$errno
);
if ($ret === false) {
throw new \RuntimeException(sprintf(
"msg_receive error %d",
$errno
));
}
return $message;
}
}