Initial commit
This commit is contained in:
142
src/Binary/BinaryProtocol.php
Normal file
142
src/Binary/BinaryProtocol.php
Normal file
@@ -0,0 +1,142 @@
|
||||
<?php
|
||||
|
||||
namespace NoccyLabs\React\Protocol\Binary;
|
||||
|
||||
use Closure;
|
||||
use NoccyLabs\React\Protocol\ProtocolException;
|
||||
use NoccyLabs\React\Protocol\ProtocolInterface;
|
||||
|
||||
class BinaryProtocol implements ProtocolInterface
|
||||
{
|
||||
public function __construct(
|
||||
public readonly string $frameSeparator = "",
|
||||
public readonly int $prependSizeBytes = 0,
|
||||
public readonly string $prependSizeEndian = 'l',
|
||||
public readonly ?string $compression = null,
|
||||
private readonly ?Closure $beforePackCb = null,
|
||||
private readonly ?Closure $afterPackCb = null,
|
||||
private readonly ?Closure $beforeUnpackCb = null,
|
||||
private readonly ?Closure $afterUnpackCb = null,
|
||||
)
|
||||
{
|
||||
}
|
||||
|
||||
public function packFrame(array $frame): string
|
||||
{
|
||||
if (is_callable($this->beforePackCb))
|
||||
$frame = call_user_func($this->beforePackCb, $frame);
|
||||
|
||||
$data = $frame['payload'];
|
||||
|
||||
// append separator
|
||||
$data = $data . $this->frameSeparator;
|
||||
|
||||
// prepend size
|
||||
if ($this->prependSizeBytes > 0) {
|
||||
$data = $this->packSizeBytes(strlen($data)) . $data;
|
||||
}
|
||||
|
||||
if (is_callable($this->afterPackCb))
|
||||
$data = call_user_func($this->afterPackCb, $data);
|
||||
|
||||
return $data;
|
||||
}
|
||||
|
||||
public function unpackFrame(string $data): array
|
||||
{
|
||||
if (is_callable($this->beforeUnpackCb))
|
||||
$data = call_user_func($this->beforeUnpackCb, $data);
|
||||
|
||||
if ($this->prependSizeBytes > 0) {
|
||||
if ($this->prependSizeBytes > strlen($data)) {
|
||||
// not enough data to parse size...
|
||||
throw new ProtocolException("BinaryProtocol: Not enough data to parse size");
|
||||
}
|
||||
$len = $this->unpackSizeBytes($data);
|
||||
if ($len <= 0) {
|
||||
// unparsable?
|
||||
throw new ProtocolException("BinaryProtocol: Invalid size decoded from frame");
|
||||
}
|
||||
if ($len > strlen($data) - $this->prependSizeBytes) {
|
||||
// insufficient data
|
||||
throw new ProtocolException("BinaryProtocol: Insufficient data for unpacking (want {$len} but got ".(strlen($data)).")");
|
||||
}
|
||||
$data = substr($data, $this->prependSizeBytes);
|
||||
}
|
||||
|
||||
if ($this->frameSeparator) {
|
||||
$data = substr($data, 0, -strlen($this->frameSeparator));
|
||||
}
|
||||
|
||||
$frame = [ 'payload' => $data ];
|
||||
// echo "[{$data}]"; var_dump($frame);
|
||||
if (!$frame) {
|
||||
// invalid json
|
||||
throw new ProtocolException("Unparsable frame received: {$data}");
|
||||
}
|
||||
|
||||
if (is_callable($this->afterUnpackCb))
|
||||
$frame = call_user_func($this->afterUnpackCb, $frame);
|
||||
|
||||
return $frame;
|
||||
}
|
||||
|
||||
public function consumeFrame(string &$data): ?array
|
||||
{
|
||||
// check for $this->prependSizeBytes
|
||||
if ($this->prependSizeBytes > 0) {
|
||||
$len = $this->unpackSizeBytes($data);
|
||||
// if size is greater than data (i.e. incomplete)
|
||||
if ($len > strlen($data) - $this->prependSizeBytes) return null;
|
||||
$p = $len;
|
||||
// $data = substr($data, $this->prependSizeBytes);
|
||||
} elseif ($this->frameSeparator) {
|
||||
// check for $this->frameSeparator
|
||||
$p = strpos($data, $this->frameSeparator);
|
||||
if ($p === false) {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
$frame = substr($data, 0, $p + strlen($this->frameSeparator) + $this->prependSizeBytes);
|
||||
$data = substr($data, $p + strlen($this->frameSeparator) + $this->prependSizeBytes);
|
||||
|
||||
return $this->unpackFrame($frame);
|
||||
}
|
||||
|
||||
public function packSizeBytes(int $size): string
|
||||
{
|
||||
$endian = function($b,$l,$s) {
|
||||
return match ($this->prependSizeEndian) {
|
||||
'b' => $b,
|
||||
'l' => $l,
|
||||
default => $s
|
||||
};
|
||||
};
|
||||
return match ($this->prependSizeBytes) {
|
||||
1 => pack($endian("C","C","C"), $size),
|
||||
2 => pack($endian("n","v","S"), $size),
|
||||
4 => pack($endian("N","V","L"), $size),
|
||||
default => throw new ProtocolException("BinaryProtocol: Invalid message size length")
|
||||
};
|
||||
}
|
||||
|
||||
public function unpackSizeBytes(string $data): int
|
||||
{
|
||||
$bytes = substr($data, 0, $this->prependSizeBytes);
|
||||
$endian = function($b,$l,$s) {
|
||||
return match ($this->prependSizeEndian) {
|
||||
'b' => $b,
|
||||
'l' => $l,
|
||||
default => $s
|
||||
};
|
||||
};
|
||||
return match ($this->prependSizeBytes) {
|
||||
1 => unpack($endian("C","C","C")."len", $bytes)['len'],
|
||||
2 => unpack($endian("n","v","S")."len", $bytes)['len'],
|
||||
4 => unpack($endian("N","V","L")."len", $bytes)['len'],
|
||||
default => throw new ProtocolException("BinaryProtocol: Invalid message size length")
|
||||
};
|
||||
}
|
||||
|
||||
}
|
||||
148
src/Json/JsonProtocol.php
Normal file
148
src/Json/JsonProtocol.php
Normal file
@@ -0,0 +1,148 @@
|
||||
<?php
|
||||
|
||||
namespace NoccyLabs\React\Protocol\Json;
|
||||
|
||||
use NoccyLabs\React\Protocol\ProtocolInterface;
|
||||
use Closure;
|
||||
use NoccyLabs\React\Protocol\ProtocolException;
|
||||
use React\Stream\DuplexStreamInterface;
|
||||
use RuntimeException;
|
||||
|
||||
class JsonProtocol implements ProtocolInterface
|
||||
{
|
||||
public function __construct(
|
||||
public readonly string $frameSeparator = "\0",
|
||||
public readonly int $prependSizeBytes = 0,
|
||||
public readonly string $prependSizeEndian = 'l',
|
||||
public readonly bool $unescapedSlashes = true,
|
||||
private readonly ?Closure $beforePackCb = null,
|
||||
private readonly ?Closure $afterPackCb = null,
|
||||
private readonly ?Closure $beforeUnpackCb = null,
|
||||
private readonly ?Closure $afterUnpackCb = null,
|
||||
)
|
||||
{
|
||||
}
|
||||
|
||||
public function packFrame(array $frame): string
|
||||
{
|
||||
if (is_callable($this->beforePackCb))
|
||||
$frame = call_user_func($this->beforePackCb, $frame);
|
||||
|
||||
$jsonOpts = ($this->unescapedSlashes?\JSON_UNESCAPED_SLASHES:0);
|
||||
$data = @json_encode($frame, $jsonOpts);
|
||||
if (!$data) {
|
||||
throw new ProtocolException("JsonProtocol: Empty data after serializing");
|
||||
}
|
||||
|
||||
// append separator
|
||||
$data = $data . $this->frameSeparator;
|
||||
|
||||
// prepend size
|
||||
if ($this->prependSizeBytes > 0) {
|
||||
$data = $this->packSizeBytes(strlen($data)) . $data;
|
||||
}
|
||||
|
||||
if (is_callable($this->afterPackCb))
|
||||
$data = call_user_func($this->afterPackCb, $data);
|
||||
|
||||
return $data;
|
||||
}
|
||||
|
||||
public function unpackFrame(string $data): array
|
||||
{
|
||||
if (is_callable($this->beforeUnpackCb))
|
||||
$data = call_user_func($this->beforeUnpackCb, $data);
|
||||
|
||||
if ($this->prependSizeBytes > 0) {
|
||||
if ($this->prependSizeBytes > strlen($data)) {
|
||||
// not enough data to parse size...
|
||||
throw new ProtocolException("JsonProtocol: Not enough data to parse size");
|
||||
}
|
||||
$len = $this->unpackSizeBytes($data);
|
||||
if ($len <= 0) {
|
||||
// unparsable?
|
||||
throw new ProtocolException("JsonProtocol: Invalid size decoded from frame");
|
||||
}
|
||||
if ($len > strlen($data) - $this->prependSizeBytes) {
|
||||
// insufficient data
|
||||
throw new ProtocolException("JsonProtocol: Insufficient data for unpacking");
|
||||
}
|
||||
$data = substr($data, $this->prependSizeBytes);
|
||||
}
|
||||
|
||||
if ($this->frameSeparator) {
|
||||
$data = substr($data, 0, -strlen($this->frameSeparator));
|
||||
}
|
||||
|
||||
$frame = @json_decode($data, true);
|
||||
// echo "[{$data}]"; var_dump($frame);
|
||||
if (!$frame) {
|
||||
// invalid json
|
||||
throw new ProtocolException("Unparsable frame received: {$data}");
|
||||
}
|
||||
|
||||
if (is_callable($this->afterUnpackCb))
|
||||
$frame = call_user_func($this->afterUnpackCb, $frame);
|
||||
|
||||
return $frame;
|
||||
}
|
||||
|
||||
public function consumeFrame(string &$data): ?array
|
||||
{
|
||||
// check for $this->prependSizeBytes
|
||||
if ($this->prependSizeBytes > 0) {
|
||||
$len = $this->unpackSizeBytes($data);
|
||||
// if size is greater than data (i.e. incomplete)
|
||||
if ($len > strlen($data) - $this->prependSizeBytes) return null;
|
||||
$p = $len;
|
||||
// $data = substr($data, $this->prependSizeBytes);
|
||||
} elseif ($this->frameSeparator) {
|
||||
// check for $this->frameSeparator
|
||||
$p = strpos($data, $this->frameSeparator);
|
||||
if ($p === false) {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
$frame = substr($data, 0, $p + strlen($this->frameSeparator) + $this->prependSizeBytes);
|
||||
$data = substr($data, $p + strlen($this->frameSeparator) + $this->prependSizeBytes);
|
||||
|
||||
return $this->unpackFrame($frame);
|
||||
}
|
||||
|
||||
public function packSizeBytes(int $size): string
|
||||
{
|
||||
$endian = function($b,$l,$s) {
|
||||
return match ($this->prependSizeEndian) {
|
||||
'b' => $b,
|
||||
'l' => $l,
|
||||
default => $s
|
||||
};
|
||||
};
|
||||
return match ($this->prependSizeBytes) {
|
||||
1 => pack($endian("C","C","C"), $size),
|
||||
2 => pack($endian("n","v","S"), $size),
|
||||
4 => pack($endian("N","V","L"), $size),
|
||||
default => throw new ProtocolException("JsonProtocol: Invalid message size length")
|
||||
};
|
||||
}
|
||||
|
||||
public function unpackSizeBytes(string $data): int
|
||||
{
|
||||
$bytes = substr($data, 0, $this->prependSizeBytes);
|
||||
$endian = function($b,$l,$s) {
|
||||
return match ($this->prependSizeEndian) {
|
||||
'b' => $b,
|
||||
'l' => $l,
|
||||
default => $s
|
||||
};
|
||||
};
|
||||
return match ($this->prependSizeBytes) {
|
||||
1 => unpack($endian("C","C","C")."len", $bytes)['len'],
|
||||
2 => unpack($endian("n","v","S")."len", $bytes)['len'],
|
||||
4 => unpack($endian("N","V","L")."len", $bytes)['len'],
|
||||
default => throw new ProtocolException("JsonProtocol: Invalid message size length")
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
43
src/Json/JsonRpcProtocol.php
Normal file
43
src/Json/JsonRpcProtocol.php
Normal file
@@ -0,0 +1,43 @@
|
||||
<?php
|
||||
|
||||
namespace NoccyLabs\React\Protocol\Json;
|
||||
|
||||
use NoccyLabs\React\Protocol\ProtocolException;
|
||||
use React\Stream\DuplexStreamInterface;
|
||||
|
||||
/**
|
||||
* Implementation of the JSON-RPC base protocol.
|
||||
*
|
||||
*
|
||||
*/
|
||||
class JsonRpcProtocol extends JsonProtocol
|
||||
{
|
||||
public function __construct(
|
||||
string $frameSeparator = "\n",
|
||||
)
|
||||
{
|
||||
parent::__construct(
|
||||
frameSeparator: $frameSeparator,
|
||||
prependSizeBytes: 0,
|
||||
unescapedSlashes: true,
|
||||
beforePackCb: $this->beforePack(...),
|
||||
afterPackCb: null,
|
||||
beforeUnpackCb: null,
|
||||
afterUnpackCb: $this->afterUnpack(...)
|
||||
);
|
||||
}
|
||||
|
||||
private function beforePack(array $frame): array
|
||||
{
|
||||
$frame['jsonrpc'] ??= "2.0";
|
||||
if (!(isset($frame['method']) || isset($frame['result']))) {
|
||||
throw new ProtocolException("JsonRpcProtocol: Either method or result key must be present");
|
||||
}
|
||||
return $frame;
|
||||
}
|
||||
|
||||
private function afterUnpack(array $frame): array
|
||||
{
|
||||
return $frame;
|
||||
}
|
||||
}
|
||||
28
src/Json/NativeMessagingProtocol.php
Normal file
28
src/Json/NativeMessagingProtocol.php
Normal file
@@ -0,0 +1,28 @@
|
||||
<?php
|
||||
|
||||
namespace NoccyLabs\React\Protocol\Json;
|
||||
|
||||
use React\Stream\DuplexStreamInterface;
|
||||
|
||||
/**
|
||||
* Implementation of the common browser Native Messaging Protocol, used to
|
||||
* communicate with a browser script from local processes.
|
||||
*
|
||||
* For more see:
|
||||
* - https://developer.chrome.com/docs/extensions/develop/concepts/native-messaging
|
||||
* - https://developer.mozilla.org/en-US/docs/Mozilla/Add-ons/WebExtensions/Native_messaging
|
||||
*/
|
||||
class NativeMessagingProtocol extends JsonProtocol
|
||||
{
|
||||
public function __construct(
|
||||
)
|
||||
{
|
||||
parent::__construct(
|
||||
frameSeparator: '',
|
||||
prependSizeBytes: 4,
|
||||
prependSizeEndian: 's',
|
||||
unescapedSlashes: true,
|
||||
);
|
||||
}
|
||||
|
||||
}
|
||||
87
src/Line/HttpLikeProtocol.php
Normal file
87
src/Line/HttpLikeProtocol.php
Normal file
@@ -0,0 +1,87 @@
|
||||
<?php
|
||||
|
||||
namespace NoccyLabs\React\Protocol\Line;
|
||||
|
||||
use Closure;
|
||||
use NoccyLabs\React\Protocol\ProtocolException;
|
||||
|
||||
class HttpLikeProtocol extends LineProtocol
|
||||
{
|
||||
private ?int $wantedPayloadSize = null;
|
||||
|
||||
public function __construct(
|
||||
private readonly string $lineSeparator = "\n",
|
||||
private readonly bool $capturePayloads = false,
|
||||
private readonly ?string $payloadLengthHeader = null,
|
||||
private readonly ?Closure $beforePackCb = null,
|
||||
private readonly ?Closure $afterPackCb = null,
|
||||
private readonly ?Closure $beforeUnpackCb = null,
|
||||
private readonly ?Closure $afterUnpackCb = null
|
||||
)
|
||||
{
|
||||
return parent::__construct(
|
||||
lineBreak: str_repeat($lineSeparator, 2),
|
||||
);
|
||||
}
|
||||
|
||||
public function packFrame(array $frame): string
|
||||
{
|
||||
if (is_callable($this->beforePackCb))
|
||||
$frame = call_user_func($this->beforePackCb, $frame);
|
||||
|
||||
$headers = [];
|
||||
foreach ($frame['headers']??[] as $header => $values) {
|
||||
foreach ($values as $value) {
|
||||
$headers[] = sprintf("%s: %s", $header, $value);
|
||||
}
|
||||
}
|
||||
|
||||
if (is_callable($this->afterPackCb))
|
||||
$frame = call_user_func($this->afterPackCb, $frame);
|
||||
|
||||
return join($this->lineSeparator, [
|
||||
$frame['query'],
|
||||
...$headers,
|
||||
null,
|
||||
null,
|
||||
]);
|
||||
}
|
||||
|
||||
public function unpackFrame(string $data): array
|
||||
{
|
||||
if (is_callable($this->beforeUnpackCb))
|
||||
$data = call_user_func($this->beforeUnpackCb, $data);
|
||||
|
||||
$lines = explode($this->lineSeparator, $data);
|
||||
$query = array_shift($lines);
|
||||
$headers = [];
|
||||
foreach ($lines as $line) {
|
||||
if (!trim($line) && $line == end($lines)) continue;
|
||||
$line = trim($line);
|
||||
if (!str_contains($line, ":")) {
|
||||
throw new ProtocolException("Invalid header line: {$line}");
|
||||
}
|
||||
[$header, $value] = array_map(trim(...), explode(":", $line, 2));
|
||||
if (!array_key_exists($header, $headers)) $headers[$header] = [];
|
||||
$headers[$header][] = $value;
|
||||
}
|
||||
|
||||
$frame = [
|
||||
'query' => $query,
|
||||
'headers' => $headers,
|
||||
];
|
||||
|
||||
if (is_callable($this->afterUnpackCb))
|
||||
$frame = call_user_func($this->afterUnpackCb, $frame);
|
||||
|
||||
return $frame;
|
||||
}
|
||||
|
||||
public function consumeFrame(string &$data): ?array
|
||||
{
|
||||
if ($this->wantedPayloadSize > 0) {
|
||||
// consume and emit a "payload" event
|
||||
}
|
||||
return parent::consumeFrame($data);
|
||||
}
|
||||
}
|
||||
64
src/Line/LineProtocol.php
Normal file
64
src/Line/LineProtocol.php
Normal file
@@ -0,0 +1,64 @@
|
||||
<?php
|
||||
|
||||
namespace NoccyLabs\React\Protocol\Line;
|
||||
|
||||
use Closure;
|
||||
use NoccyLabs\React\Protocol\ProtocolInterface;
|
||||
|
||||
class LineProtocol implements ProtocolInterface
|
||||
{
|
||||
public function __construct(
|
||||
private readonly string $lineBreak = "\n",
|
||||
private readonly bool $quoteStrings = false,
|
||||
private readonly bool $escapeSpecial = false,
|
||||
private readonly ?Closure $beforePackCb = null,
|
||||
private readonly ?Closure $afterPackCb = null,
|
||||
private readonly ?Closure $beforeUnpackCb = null,
|
||||
private readonly ?Closure $afterUnpackCb = null,
|
||||
)
|
||||
{
|
||||
|
||||
}
|
||||
|
||||
public function packFrame(array $frame): string
|
||||
{
|
||||
if (is_callable($this->beforePackCb))
|
||||
$frame = call_user_func($this->beforePackCb, $frame);
|
||||
|
||||
if ($this->escapeSpecial) {
|
||||
$frame = array_map(quotemeta(...), $frame);
|
||||
}
|
||||
|
||||
if ($this->quoteStrings) {
|
||||
$frame = array_map(fn($v) => is_string($v) ? ('"'.str_replace('"',"\\\"",$v).'"') : $v, $frame);
|
||||
}
|
||||
|
||||
$data = join(" ", $frame);
|
||||
$data .= $this->lineBreak;
|
||||
|
||||
if (is_callable($this->afterPackCb))
|
||||
$data = call_user_func($this->afterPackCb, $data);
|
||||
|
||||
return $data;
|
||||
}
|
||||
|
||||
public function unpackFrame(string $data): array
|
||||
{
|
||||
return str_getcsv($data, ' ', '"', "\\");
|
||||
}
|
||||
|
||||
public function consumeFrame(string &$data): ?array
|
||||
{
|
||||
// check for $this->lineBreak
|
||||
$p = strpos($data, $this->lineBreak);
|
||||
if ($p === false) {
|
||||
return null;
|
||||
}
|
||||
|
||||
// break on separator
|
||||
$frame = substr($data, 0, $p);
|
||||
$data = substr($data, $p + strlen($this->lineBreak));
|
||||
|
||||
return $this->unpackFrame($frame);
|
||||
}
|
||||
}
|
||||
10
src/ProtocolException.php
Normal file
10
src/ProtocolException.php
Normal file
@@ -0,0 +1,10 @@
|
||||
<?php
|
||||
|
||||
namespace NoccyLabs\React\Protocol;
|
||||
|
||||
use Exception;
|
||||
|
||||
class ProtocolException extends Exception
|
||||
{
|
||||
|
||||
}
|
||||
50
src/ProtocolInterface.php
Normal file
50
src/ProtocolInterface.php
Normal file
@@ -0,0 +1,50 @@
|
||||
<?php
|
||||
|
||||
namespace NoccyLabs\React\Protocol;
|
||||
|
||||
interface ProtocolInterface
|
||||
{
|
||||
/**
|
||||
* Pack a frame, turning an array of data into the frame format.
|
||||
*
|
||||
* @param array $data
|
||||
* @return string
|
||||
*/
|
||||
public function packFrame(array $data): string;
|
||||
|
||||
/**
|
||||
* Unpack a frame, turning a string or binary blob into an array of data.
|
||||
*
|
||||
* @param string $data
|
||||
* @param array
|
||||
*/
|
||||
public function unpackFrame(string $data): array;
|
||||
|
||||
/**
|
||||
* Unpacks a frame from the buffer if possible. If a frame is unpacked, the
|
||||
* consumed bytes will be removed from the head of $data.
|
||||
*
|
||||
* @param string $data (byref)
|
||||
* @return array|null
|
||||
*/
|
||||
public function consumeFrame(string &$data): ?array;
|
||||
|
||||
/**
|
||||
* Peek at the buffer and determine if we can consume a frame.
|
||||
*
|
||||
* If peekFrame returns true, calling consumeFrame() should return the frame
|
||||
* detected by peekFrame(). The function may therefore prepare a frame to be
|
||||
* returned immediately on the next call to consumeFrame(). This also means
|
||||
* that you should NEVER call peekFrame() without intending to call
|
||||
* consumeFrame() immediately on a true result.
|
||||
*
|
||||
* The reasoning for this is that peekFrame() may need to parse a header to
|
||||
* determine a body length, and this parsing may be as involved as the
|
||||
* actual consumption/parsing.
|
||||
*
|
||||
* @param string $data
|
||||
* @return boolean
|
||||
*/
|
||||
// public function peekFrame(string $data): bool;
|
||||
|
||||
}
|
||||
72
src/ProtocolStream.php
Normal file
72
src/ProtocolStream.php
Normal file
@@ -0,0 +1,72 @@
|
||||
<?php
|
||||
|
||||
namespace NoccyLabs\React\Protocol;
|
||||
|
||||
use Evenement\EventEmitterInterface;
|
||||
use Evenement\EventEmitterTrait;
|
||||
use React\Stream\DuplexStreamInterface;
|
||||
|
||||
class ProtocolStream implements EventEmitterInterface
|
||||
{
|
||||
use EventEmitterTrait;
|
||||
|
||||
private string $readBuffer = '';
|
||||
|
||||
private ProtocolInterface $protocol;
|
||||
|
||||
public function __construct(
|
||||
private readonly DuplexStreamInterface $stream,
|
||||
ProtocolInterface $protocol,
|
||||
private readonly int $maxBuffer = 8192,
|
||||
private readonly bool $closeOnOverflow = false,
|
||||
)
|
||||
{
|
||||
$this->protocol = $protocol;
|
||||
$stream->on("data", $this->receive(...));
|
||||
}
|
||||
|
||||
/**
|
||||
* Upgrade the protocol, switching from f.ex. a text based to a JSON
|
||||
* based protocol without having to re-create the ProtocolStream.
|
||||
*
|
||||
* @param ProtocolInterface $protocol
|
||||
* @return void
|
||||
*/
|
||||
public function upgrade(ProtocolInterface $protocol): void
|
||||
{
|
||||
$this->protocol = $protocol;
|
||||
}
|
||||
|
||||
/**
|
||||
* Undocumented function
|
||||
*
|
||||
* @param array $frame
|
||||
* @return void
|
||||
*/
|
||||
public function send(array $frame): void
|
||||
{
|
||||
$data = $this->protocol->packFrame($frame);
|
||||
$this->stream->write($data);
|
||||
}
|
||||
|
||||
private function receive(string $data): void
|
||||
{
|
||||
$this->readBuffer .= $data;
|
||||
|
||||
try {
|
||||
while ($frame = $this->protocol->consumeFrame($this->readBuffer)) {
|
||||
$this->emit("message", [ $frame ]);
|
||||
}
|
||||
} catch (ProtocolException $e) {
|
||||
$this->emit("error", [ $e ]);
|
||||
return;
|
||||
}
|
||||
|
||||
if (strlen($this->readBuffer) > $this->maxBuffer) {
|
||||
$this->emit("overflow");
|
||||
if ($this->closeOnOverflow) {
|
||||
$this->stream->close();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user