react-websocket/src/WebSocketConnection.php

249 lines
6.4 KiB
PHP

<?php
namespace NoccyLabs\React\WebSocket;
use Evenement\EventEmitterTrait;
use NoccyLabs\React\WebSocket\Group\ConnectionGroup;
use NoccyLabs\React\WebSocket\Group\GroupManager;
use Psr\Http\Message\ServerRequestInterface;
use React\Http\Message\Response;
use React\Socket\ConnectionInterface;
use React\Stream\CompositeStream;
use React\Stream\DuplexStreamInterface;
use React\Stream\ReadableStreamInterface;
use React\Stream\ThroughStream;
use React\Stream\WritableStreamInterface;
class WebSocketConnection implements WebSocketInterface
{
use EventEmitterTrait;
const OP_CONTINUATION = 0x0;
const OP_FRAME_TEXT = 0x1;
const OP_FRAME_BINARY = 0x2;
const OP_CLOSE = 0x8;
const OP_PING = 0x9;
const OP_PONG = 0xA;
private ?string $groupName = null;
private ?ConnectionGroup $group = null;
private GroupManager $groupManager;
private ReadableStreamInterface $inStream;
private WritableStreamInterface $outStream;
private ServerRequestInterface $request;
private ?string $buffer = null;
private ?int $bufferedOp = null;
public function __construct(ServerRequestInterface $request, ReadableStreamInterface $inStream, WritableStreamInterface $outStream, GroupManager $groupManager)
{
$this->request = $request;
$this->inStream = $inStream;
$this->outStream = $outStream;
$this->groupManager = $groupManager;
$this->inStream->on('data', $this->onWebSocketData(...));
}
private function onWebSocketData($data)
{
// Keep track of the number of bytes in the header
$header = 2;
// Peek at the first byte, holding flags and opcode
$byte0 = ord($data[0]);
$final = !!($byte0 & 0x80);
$opcode = $byte0 & 0x0F;
// Peek at the second byte, holding mask bit and len
$byte1 = ord($data[1]);
$masked = !!($byte1 & 0x80);
$len = $byte1 & 0x7F;
// Read extended length if present
if ($len == 126) {
$len = (ord($data[$header+0]) << 8)
| (ord($data[$header+1]));
$header += 2;
} elseif ($len == 127) {
$len = (ord($data[$header+0]) << 24)
| (ord($data[$header+1]) << 16)
| (ord($data[$header+2]) << 8)
| (ord($data[$header+3]));
$header += 4;
}
// Now for the masking
if ($masked) {
$mask = substr($data, $header, 4);
$header += 4;
}
// Extract and unmask payload
$payload = substr($data, $header, $len);
if ($masked) {
$payload = $this->unmask($payload, $mask);
}
if (!$final) {
if ($this->bufferedOp === null) {
$this->buffer = $payload;
$this->bufferedOp = $opcode;
} else {
$this->buffer .= $payload;
}
}
if ($final) {
$payload = $this->buffer . $payload;
$this->buffer = null;
if ($this->bufferedOp !== null) {
$opcode = $this->bufferedOp;
$this->bufferedOp = null;
}
}
switch ($opcode) {
case self::OP_PING:
$this->sendPong();
return;
case self::OP_CONTINUATION:
$this->buffer .= $payload;
return;
case self::OP_FRAME_TEXT:
$this->emit('text', [ $payload ]);
return;
case self::OP_FRAME_BINARY:
$this->emit('binary', [ $payload ]);
return;
}
}
private function unmask(string $payload, string $mask): string
{
$payloadData = array_map("ord", str_split($payload,1));
$maskData = array_map("ord", str_split($mask,1));
//printf("Mask: %02x %02x %02x %02x\n", ...$maskData);
$unmasked = [];
for ($n = 0; $n < count($payloadData); $n++) {
$unmasked[] = $payloadData[$n] ^ $maskData[$n % 4];
//printf("%02x ^ %02x = %02x %s\n", $payloadData[$n], $maskData[$n%4], $payloadData[$n]^$maskData[$n%4], chr($payloadData[$n]^$maskData[$n%4]));
}
return join("", array_map("chr", $unmasked));
}
private function sendPong(): void
{
}
public function setGroup(?string $name): void
{
if ($this->group) {
$this->group->remove($this);
$this->group = null;
}
$group = $this->groupManager->getConnectionGroup($name);
$group->add($this);
$this->group = $group;
$this->groupName = $name;
}
public function getGroupName(): ?string
{
return $this->groupName;
}
/**
* @return WebSocketInterface[]
*/
public function getGroupMembers(): array
{
return $this->group ? $this->group->getAll() : [];
}
public function getGroup(): ?ConnectionGroup
{
return $this->group;
}
public function getRemoteAddress()
{
return $this->request->getServerParams()['REMOTE_ADDR'];
}
public function getLocalAddress()
{
return $this->request->getServerParams()['SERVER_ADDR'];
}
public function isReadable()
{
return $this->inStream->isReadable();
}
public function pause()
{
return $this->inStream->pause();
}
public function resume()
{
return $this->inStream->resume();
}
public function isWritable()
{
return $this->outStream->isWritable();
}
public function pipe(WritableStreamInterface $dest, array $options = array())
{
// TODO implement
return $dest;
}
public function write($data)
{
// Header; final, text frame, unmasked
$frame = chr(0x81) . chr(strlen($data)) . $data;
$this->outStream->write($frame);
}
public function send(int $opcode, bool $final, string $data)
{
$frame = chr(($final?0x80:0x00) | ($opcode & 0xF));
$len = strlen($data);
if ($len > 126) {
$frame .= chr(0x7E) . chr(($len >> 8) & 0xFF) . chr($len & 0xFF);
}
$frame .= $data;
$this->outStream->write($frame);
}
public function close()
{
$this->outStream->close();
$this->inStream->close();
}
public function end($data = null)
{
}
}