Files
php-react-serial/src/LineBufferedDuplexStream.php

145 lines
3.7 KiB
PHP

<?php
namespace NoccyLabs\React\Serial;
use Evenement\EventEmitterTrait;
use React\EventLoop\Loop;
use React\Promise\Deferred;
use React\Promise\PromiseInterface;
use React\Stream\DuplexStreamInterface;
use React\Stream\ReadableStreamInterface;
use React\Stream\WritableStreamInterface;
class LineBufferedDuplexStream implements ReadableStreamInterface, WritableStreamInterface
{
use EventEmitterTrait;
const EVT_DATA = 'data';
const EVT_END = 'end';
const EVT_ERROR = 'error';
const EVT_CLOSE = 'close';
const EVT_DRAIN = 'drain';
const EVT_PIPE = 'pipe';
const EVT_LINE = 'line'; // when a full line has been received
const EVT_PROMPT = 'prompt'; // when a matching prompt has been received
const EVT_OUTPUT = 'output'; // every line event until a prompt event, if $bufferOutput
private string $buffer = '';
private array $output = [];
public function __construct(
private readonly DuplexStreamInterface $stream,
private readonly ?string $promptPattern = null,
private readonly string $eol = "\n",
private bool $bufferOutput = false,
)
{
$stream->on('data', $this->onData(...));
$stream->on('end', $this->onEnd(...));
$stream->on('error', $this->onError(...));
$stream->on('close', $this->onClose(...));
$stream->on('drain', $this->onDrain(...));
$stream->on('pipe', $this->onPipe(...));
}
private function onData($data): void
{
// Pass on the data event as is
$this->emit(self::EVT_DATA, [ $data ]);
$this->buffer .= $data;
// Parse out any lines and emit events
while (($pos = strpos($this->buffer, $this->eol)) !== false) {
$line = substr($this->buffer, 0, $pos);
$this->buffer = substr($this->buffer, $pos + 1);
$this->emit(self::EVT_LINE, [ $line ]);
if ($this->bufferOutput) {
$this->output[] = $line;
}
}
// Check if the buffer matches the prompt pattern
if ($this->promptPattern !== null) {
if (preg_match($this->promptPattern, $this->buffer, $matches)) {
$this->buffer = preg_replace($this->promptPattern, '', $this->buffer);
if ($this->bufferOutput) {
$this->emit(self::EVT_OUTPUT, [$this->output]);
$this->output = [];
}
$this->emit(self::EVT_PROMPT, [ $matches ]);
}
}
}
private function onEnd(): void
{
$this->emit(self::EVT_END, [ ]);
}
private function onError($error): void
{
$this->emit(self::EVT_ERROR, [ $error ]);
}
private function onClose(): void
{
$this->emit(self::EVT_CLOSE, []);
}
private function onDrain(): void
{
// FIXME check parameters
$this->emit(self::EVT_DRAIN, []);
}
private function onPipe(): void
{
// FIXME check parameters
$this->emit(self::EVT_PIPE, []);
}
public function isReadable(): bool
{
return $this->stream->isReadable();
}
public function pause(): void
{
$this->stream->pause();
}
public function resume(): void
{
$this->stream->resume();
}
public function pipe(WritableStreamInterface $dest, array $options = []): WritableStreamInterface
{
// TODO
}
public function close(): void
{
$this->stream->close();
}
public function isWritable(): bool
{
return $this->stream->isWritable();
}
public function write($data): bool
{
return $this->stream->write($data);
}
public function end($data = null): bool
{
return $this->stream->end($data);
}
}