react-websocket/src/WebSocketConnection.php

239 lines
5.7 KiB
PHP

<?php
namespace NoccyLabs\React\WebSocket;
use Evenement\EventEmitterTrait;
use NoccyLabs\React\WebSocket\Group\ConnectionGroup;
use NoccyLabs\React\WebSocket\Group\GroupManager;
use Psr\Http\Message\ServerRequestInterface;
use React\Http\Message\Response;
use React\Socket\ConnectionInterface;
use React\Stream\CompositeStream;
use React\Stream\DuplexStreamInterface;
use React\Stream\ReadableStreamInterface;
use React\Stream\ThroughStream;
use React\Stream\WritableStreamInterface;
class WebSocketConnection implements WebSocketInterface
{
use EventEmitterTrait;
const OP_CONTINUATION = 0x0;
const OP_FRAME_TEXT = 0x1;
const OP_FRAME_BINARY = 0x2;
const OP_CLOSE = 0x8;
const OP_PING = 0x9;
const OP_PONG = 0xA;
private ?string $groupName = null;
private WebSocketCodec $codec;
private ?ConnectionGroup $group = null;
private GroupManager $groupManager;
private ReadableStreamInterface $inStream;
private WritableStreamInterface $outStream;
private ServerRequestInterface $request;
private ?string $buffer = null;
private ?int $bufferedOp = null;
public function __construct(ServerRequestInterface $request, ReadableStreamInterface $inStream, WritableStreamInterface $outStream, GroupManager $groupManager)
{
// The codec is used to encode and decode frames
$this->codec = new WebSocketCodec();
$this->request = $request;
$this->inStream = $inStream;
$this->outStream = $outStream;
$this->groupManager = $groupManager;
$this->inStream->on('data', $this->onWebSocketData(...));
$this->inStream->on('close', function () {
$this->close();
$this->emit('close', []);
});
}
private function onWebSocketData($data)
{
$decoded = $this->codec->decode($data);
$opcode = $decoded['opcode'];
$final = $decoded['final'];
$payload = $decoded['payload'];
if (!$final) {
if ($this->bufferedOp === null) {
$this->buffer = $payload;
$this->bufferedOp = $opcode;
} else {
$this->buffer .= $payload;
}
}
if ($final) {
$payload = $this->buffer . $payload;
$this->buffer = null;
if ($this->bufferedOp !== null) {
$opcode = $this->bufferedOp;
$this->bufferedOp = null;
}
}
switch ($opcode) {
case self::OP_PING:
$this->sendPong($payload);
return;
case self::OP_PONG:
$this->checkPong($payload);
return;
case self::OP_CLOSE:
// TODO implement
return;
case self::OP_CONTINUATION:
$this->buffer .= $payload;
return;
case self::OP_FRAME_TEXT:
$this->emit('text', [ $payload ]);
return;
case self::OP_FRAME_BINARY:
$this->emit('binary', [ $payload ]);
return;
}
}
/**
* Sends a ping, and closes the connection on timeout.
*
*/
public function ping(): void
{
// TODO save the state somehow
$payload = "ping";
$this->send(self::OP_PING, $payload, true);
}
private function sendPong(string $data): void
{
$this->send(self::OP_PONG, $data, true);
}
private function checkPong(string $data): void
{
// TODO reset the state and any ping timers
}
public function setGroup(?string $name): void
{
if ($this->group) {
$this->group->remove($this);
$this->group = null;
}
$group = $this->groupManager->getConnectionGroup($name);
$group->add($this);
$this->group = $group;
$this->groupName = $name;
}
public function getGroupName(): ?string
{
return $this->groupName;
}
/**
* @return WebSocketInterface[]
*/
public function getGroupMembers(): array
{
return $this->group ? $this->group->getAll() : [];
}
public function getGroup(): ?ConnectionGroup
{
return $this->group;
}
public function getRemoteAddress()
{
return $this->request->getServerParams()['REMOTE_ADDR'];
}
public function getLocalAddress()
{
return $this->request->getServerParams()['SERVER_ADDR'];
}
public function isReadable()
{
return $this->inStream->isReadable();
}
public function pause()
{
return $this->inStream->pause();
}
public function resume()
{
return $this->inStream->resume();
}
public function isWritable()
{
return $this->outStream->isWritable();
}
public function pipe(WritableStreamInterface $dest, array $options = array())
{
// TODO implement
return $dest;
}
public function write($data)
{
return $this->send(self::OP_FRAME_TEXT, $data);
}
public function writeBinary($data)
{
return $this->send(self::OP_FRAME_BINARY, $data);
}
/**
*
*/
public function send(int $opcode, string $data, bool $final = true, bool $masked = false)
{
$frame = $this->codec->encode([
'opcode' => $opcode,
'payload' => $data,
'final' => $final,
'masked' => $masked
]);
$this->outStream->write($frame);
return true;
}
public function close()
{
$this->outStream->close();
$this->inStream->close();
}
public function end($data = null)
{
}
}