145 lines
3.7 KiB
PHP
145 lines
3.7 KiB
PHP
<?php
|
|
|
|
namespace NoccyLabs\React\Serial;
|
|
|
|
use Evenement\EventEmitterTrait;
|
|
use React\EventLoop\Loop;
|
|
use React\Promise\Deferred;
|
|
use React\Promise\PromiseInterface;
|
|
use React\Stream\DuplexStreamInterface;
|
|
use React\Stream\ReadableStreamInterface;
|
|
use React\Stream\WritableStreamInterface;
|
|
|
|
class LineBufferedDuplexStream implements ReadableStreamInterface, WritableStreamInterface
|
|
{
|
|
use EventEmitterTrait;
|
|
|
|
const EVT_DATA = 'data';
|
|
const EVT_END = 'end';
|
|
const EVT_ERROR = 'error';
|
|
const EVT_CLOSE = 'close';
|
|
const EVT_DRAIN = 'drain';
|
|
const EVT_PIPE = 'pipe';
|
|
const EVT_LINE = 'line'; // when a full line has been received
|
|
const EVT_PROMPT = 'prompt'; // when a matching prompt has been received
|
|
const EVT_OUTPUT = 'output'; // every line event until a prompt event, if $bufferOutput
|
|
|
|
private string $buffer = '';
|
|
|
|
private array $output = [];
|
|
|
|
public function __construct(
|
|
private readonly DuplexStreamInterface $stream,
|
|
private readonly ?string $promptPattern = null,
|
|
private readonly string $eol = "\n",
|
|
private bool $bufferOutput = false,
|
|
)
|
|
{
|
|
$stream->on('data', $this->onData(...));
|
|
$stream->on('end', $this->onEnd(...));
|
|
$stream->on('error', $this->onError(...));
|
|
$stream->on('close', $this->onClose(...));
|
|
$stream->on('drain', $this->onDrain(...));
|
|
$stream->on('pipe', $this->onPipe(...));
|
|
}
|
|
|
|
private function onData($data): void
|
|
{
|
|
// Pass on the data event as is
|
|
$this->emit(self::EVT_DATA, [ $data ]);
|
|
|
|
$this->buffer .= $data;
|
|
|
|
// Parse out any lines and emit events
|
|
while (($pos = strpos($this->buffer, $this->eol)) !== false) {
|
|
$line = substr($this->buffer, 0, $pos);
|
|
$this->buffer = substr($this->buffer, $pos + 1);
|
|
$this->emit(self::EVT_LINE, [ $line ]);
|
|
if ($this->bufferOutput) {
|
|
$this->output[] = $line;
|
|
}
|
|
}
|
|
|
|
// Check if the buffer matches the prompt pattern
|
|
if ($this->promptPattern !== null) {
|
|
if (preg_match($this->promptPattern, $this->buffer, $matches)) {
|
|
$this->buffer = preg_replace($this->promptPattern, '', $this->buffer);
|
|
if ($this->bufferOutput) {
|
|
$this->emit(self::EVT_OUTPUT, [$this->output]);
|
|
$this->output = [];
|
|
}
|
|
$this->emit(self::EVT_PROMPT, [ $matches ]);
|
|
}
|
|
}
|
|
|
|
}
|
|
|
|
private function onEnd(): void
|
|
{
|
|
$this->emit(self::EVT_END, [ ]);
|
|
}
|
|
|
|
private function onError($error): void
|
|
{
|
|
$this->emit(self::EVT_ERROR, [ $error ]);
|
|
}
|
|
|
|
private function onClose(): void
|
|
{
|
|
$this->emit(self::EVT_CLOSE, []);
|
|
}
|
|
|
|
private function onDrain(): void
|
|
{
|
|
// FIXME check parameters
|
|
$this->emit(self::EVT_DRAIN, []);
|
|
}
|
|
|
|
private function onPipe(): void
|
|
{
|
|
// FIXME check parameters
|
|
$this->emit(self::EVT_PIPE, []);
|
|
}
|
|
|
|
public function isReadable(): bool
|
|
{
|
|
return $this->stream->isReadable();
|
|
}
|
|
|
|
public function pause(): void
|
|
{
|
|
$this->stream->pause();
|
|
}
|
|
|
|
public function resume(): void
|
|
{
|
|
$this->stream->resume();
|
|
}
|
|
|
|
public function pipe(WritableStreamInterface $dest, array $options = []): WritableStreamInterface
|
|
{
|
|
// TODO
|
|
}
|
|
|
|
public function close(): void
|
|
{
|
|
$this->stream->close();
|
|
}
|
|
|
|
public function isWritable(): bool
|
|
{
|
|
return $this->stream->isWritable();
|
|
}
|
|
|
|
public function write($data): bool
|
|
{
|
|
return $this->stream->write($data);
|
|
}
|
|
|
|
public function end($data = null): bool
|
|
{
|
|
return $this->stream->end($data);
|
|
}
|
|
|
|
}
|