4 Commits
0.1.0 ... 0.1.1

Author SHA1 Message Date
be6955ea48 More inline docs 2024-02-22 00:34:11 +01:00
417c11670a More code cleanup 2024-02-22 00:15:22 +01:00
70e353bd0c Code cleanup, added phpstan 2024-02-21 23:48:13 +01:00
a6f70dbb76 Added events to GroupManager 2024-02-21 22:43:31 +01:00
9 changed files with 179 additions and 69 deletions

View File

@ -3,6 +3,7 @@
"description": "Native ReactPHP WebSocket implementation", "description": "Native ReactPHP WebSocket implementation",
"type": "library", "type": "library",
"license": "GPL-3.0-or-later", "license": "GPL-3.0-or-later",
"keywords": [ "reactphp", "websockets" ],
"autoload": { "autoload": {
"psr-4": { "psr-4": {
"NoccyLabs\\React\\WebSocket\\": "src/" "NoccyLabs\\React\\WebSocket\\": "src/"
@ -18,6 +19,7 @@
"react/http": "^1.9.0" "react/http": "^1.9.0"
}, },
"require-dev": { "require-dev": {
"phpunit/phpunit": "^11.0" "phpunit/phpunit": "^11.0",
"phpstan/phpstan": "^1.10"
} }
} }

View File

@ -15,4 +15,28 @@ foreach ($websocket->getGroup() as $other) {
To remove a group from a connection, pass `null` to `WebSocketConnection::setGroup()`. To remove a group from a connection, pass `null` to `WebSocketConnection::setGroup()`.
The group will emit a `join` event (`WebSocketConnection::EVENT_JOIN`) when another member joins the group, and a `leave` event (`WebSocketConnection::EVENT_LEAVE`) when a member leaves. The events will be sent to the leaving member as well, so consider this in your logic. The group will emit a `join` event (`ConnectionGroup::EVENT_JOIN`) when another member joins the group, and a `leave` event (`ConnectionGroup::EVENT_LEAVE`) when a member leaves. The events will be sent to the leaving member as well, so consider this in your logic.
## Events
The GroupManager emits events when a group is `created` (`GroupManager::EVENT_CREATED`) or `destroyed` (`GroupManager::EVENT_DESTROYED`). You can use these events to hook the join and leave events.
```php
// Create a GroupManager
$groupManager = new GroupManager();
$groupManager->on('created', function (ConnectionGroup $group) {
// Listen for joins
$group->on('join', function (WebSocketConnection $connection) use ($group) {
// Someone joined the group!
$group->write("Someone joined!");
})
});
// The GroupManager is injected into the WebSocketMiddleware
$middleware = new WebSocketMiddleware($groupManager);
```
## Future
* Add a GroupManagerImplementation so custom logic can be provided.
* Make it possible to reject setting a group by GroupManager not returning a group.

12
phpstan.neon Normal file
View File

@ -0,0 +1,12 @@
parameters:
level: 5
excludePaths:
- doc
- vendor
- tests
# Paths to include in the analysis
paths:
- src

View File

@ -0,0 +1,28 @@
<?php
namespace NoccyLabs\React\WebSocket\Debug;
trait HexDumpTrait
{
private function hexdump($data): void
{
printf("_____ *%4d\n", strlen($data));
$rows = str_split($data, 16);
$offs = 0;
foreach ($rows as $row) {
$h = []; $a = [];
for ($n = 0; $n < 16; $n++) {
if ($n < strlen($row)) {
$h[] = sprintf("%02x%s", ord($row[$n]), ($n==7)?" ":" ");
$a[] = sprintf("%s%s", (ctype_print($row[$n])?$row[$n]:"."), ($n==7)?" ":"");
} else {
$h[] = (($n==7)?" ":" ");
$a[] = (($n==7)?" ":" ");
}
}
printf("%04x | %s | %s\n", 16 * $offs++, join("", $h), join("", $a));
}
}
}

View File

@ -60,4 +60,9 @@ class ConnectionGroup implements EventEmitterInterface, IteratorAggregate, Count
{ {
return $this->name; return $this->name;
} }
public function write(string $payload)
{
}
} }

View File

@ -2,12 +2,23 @@
namespace NoccyLabs\React\WebSocket\Group; namespace NoccyLabs\React\WebSocket\Group;
use NoccyLabs\React\WebSocket\WebSocketInterface; use Evenement\EventEmitterInterface;
use Evenement\EventEmitterTrait;
use React\EventLoop\Loop; use React\EventLoop\Loop;
use WeakReference;
class GroupManager class GroupManager implements EventEmitterInterface
{ {
use EventEmitterTrait;
/**
* @var string emitted when a new group is created
*/
const EVENT_CREATED = 'created';
/**
* @var string emitted after the last member leaves, when the group is destroyed
*/
const EVENT_DESTROYED = 'destroyed';
/** @var array<string,ConnectionGroup> */ /** @var array<string,ConnectionGroup> */
private array $groups = []; private array $groups = [];
@ -20,11 +31,13 @@ class GroupManager
$group->on(ConnectionGroup::EVENT_LEAVE, function () use ($group) { $group->on(ConnectionGroup::EVENT_LEAVE, function () use ($group) {
Loop::futureTick(function () use ($group) { Loop::futureTick(function () use ($group) {
if (count($group) === 0) { if (count($group) === 0) {
$this->emit(self::EVENT_DESTROYED, [ $group ]);
$group->removeAllListeners(); $group->removeAllListeners();
unset($this->groups[$group->getName()]); unset($this->groups[$group->getName()]);
} }
}); });
}); });
$this->emit(self::EVENT_CREATED, [ $group ]);
} else { } else {
$group = $this->groups[$name]; $group = $this->groups[$name];
} }

View File

@ -17,19 +17,26 @@ use React\Stream\WritableStreamInterface;
class WebSocketCodec class WebSocketCodec
{ {
const OP_CONTINUATION = 0x0; /**
const OP_FRAME_TEXT = 0x1; * Encode a frame. Required keys are opcode and payload. Keys that can be passed are:
const OP_FRAME_BINARY = 0x2; *
const OP_CLOSE = 0x8; * opcode (int) - the opcode
const OP_PING = 0x9; * final (bool) - final frame
const OP_PONG = 0xA; * rsv1-3 (bool) - reserved bits
* masked (bool) - if the frame was masked
* mask (string) - the mask bytes, if masked
* length (int) - length of payload
* payload (string) - the payload
*
* @param array $frame The frame
* @return string The encoded frame
*/
public function encode(array $frame): string public function encode(array $frame): string
{ {
// Encoded frame // Encoded frame
$encoded = null; $encoded = null;
// Unpack frame with defaults // Re-unpack frame with defaults
$frame = [ $frame = [
...[ ...[
'final' => true, 'final' => true,
@ -56,10 +63,13 @@ class WebSocketCodec
$size1 = ($len >> 8) & 0xFF; $size1 = ($len >> 8) & 0xFF;
$size2 = $len & 0xFF; $size2 = $len & 0xFF;
$size3 = null; $size3 = null;
$size4 = null;
} else { } else {
$size0 = $len; $size0 = $len;
$size1 = null; $size1 = null;
$size2 = null;
$size3 = null; $size3 = null;
$size4 = null;
} }
$encoded .= chr(($frame['final']?0x80:0x00) $encoded .= chr(($frame['final']?0x80:0x00)
@ -86,21 +96,24 @@ class WebSocketCodec
$encoded .= $frame['payload']; $encoded .= $frame['payload'];
} }
//$this->hexdump($encoded);
return $encoded; return $encoded;
} }
/** /**
* Decode a websocket frame and return an array with the keys: * Decode a websocket frame and return an array with the keys:
*
* opcode (int) - the opcode * opcode (int) - the opcode
* fin (bool) - final frame * final (bool) - final frame
* rsv1-3 (bool) - reserved bits * rsv1-3 (bool) - reserved bits
* masked (bool) - if the frame was masked * masked (bool) - if the frame was masked
* mask (string) - the mask bytes, if masked
* length (int) - length of payload * length (int) - length of payload
* payload (string) - the payload * payload (string) - the payload
*
* @param string $frame The frame data to decode
* @return array The decoded frame
*/ */
public function decode($frame): array public function decode(string $frame): array
{ {
// Decoded frame // Decoded frame
$decoded = []; $decoded = [];
@ -152,38 +165,24 @@ class WebSocketCodec
return $decoded; return $decoded;
} }
/**
* Masking is reversible, and simply xors a repeated 4-byte key with the data.
*
* @param string $payload The unmasked (or masked) input
* @param string $mask The mask to use (4 bytes)
* @return string The masked (or unmasked) output
*/
private function mask(string $payload, string $mask): string private function mask(string $payload, string $mask): string
{ {
$payloadData = array_map("ord", str_split($payload,1)); $payloadData = array_map("ord", str_split($payload,1));
$maskData = array_map("ord", str_split($mask,1)); $maskData = array_map("ord", str_split($mask,1));
//printf("Mask: %02x %02x %02x %02x\n", ...$maskData);
$unmasked = []; $unmasked = [];
for ($n = 0; $n < count($payloadData); $n++) { for ($n = 0; $n < count($payloadData); $n++) {
$unmasked[] = $payloadData[$n] ^ $maskData[$n % 4]; $unmasked[] = $payloadData[$n] ^ $maskData[$n % 4];
} }
return join("", array_map("chr", $unmasked)); return join("", array_map("chr", $unmasked));
} }
private function hexdump($data): void
{
printf("%4d .\n", strlen($data));
$rows = str_split($data, 16);
$offs = 0;
foreach ($rows as $row) {
$h = []; $a = [];
for ($n = 0; $n < 16; $n++) {
if ($n < strlen($row)) {
$h[] = sprintf("%02x%s", ord($row[$n]), ($n==7)?" ":" ");
$a[] = sprintf("%s%s", (ctype_print($row[$n])?$row[$n]:"."), ($n==7)?" ":"");
} else {
$h[] = (($n==7)?" ":" ");
$a[] = (($n==7)?" ":" ");
}
}
printf("%04x | %s | %s\n", 16 * $offs++, join("", $h), join("", $a));
}
}
} }

View File

@ -25,8 +25,10 @@ class WebSocketConnection implements WebSocketInterface
const OP_PING = 0x9; const OP_PING = 0x9;
const OP_PONG = 0xA; const OP_PONG = 0xA;
/** @var string|null The name of the group that this connection has joined, or null */
private ?string $groupName = null; private ?string $groupName = null;
/** @var WebSocketCodec The frame encoder/decoder */
private WebSocketCodec $codec; private WebSocketCodec $codec;
private ?ConnectionGroup $group = null; private ?ConnectionGroup $group = null;
@ -54,6 +56,10 @@ class WebSocketConnection implements WebSocketInterface
$this->groupManager = $groupManager; $this->groupManager = $groupManager;
$this->inStream->on('data', $this->onWebSocketData(...)); $this->inStream->on('data', $this->onWebSocketData(...));
$this->inStream->on('close', function () {
$this->close();
$this->emit('close', []);
});
} }
private function onWebSocketData($data) private function onWebSocketData($data)
@ -87,6 +93,7 @@ class WebSocketConnection implements WebSocketInterface
$this->sendPong($payload); $this->sendPong($payload);
return; return;
case self::OP_PONG: case self::OP_PONG:
$this->checkPong($payload);
return; return;
case self::OP_CLOSE: case self::OP_CLOSE:
// TODO implement // TODO implement
@ -103,11 +110,27 @@ class WebSocketConnection implements WebSocketInterface
} }
} }
/**
* Sends a ping, and closes the connection on timeout.
*
*/
public function ping(): void
{
// TODO save the state somehow
$payload = "ping";
$this->send(self::OP_PING, $payload, true);
}
private function sendPong(string $data): void private function sendPong(string $data): void
{ {
$this->send(self::OP_PONG, $data, true); $this->send(self::OP_PONG, $data, true);
} }
private function checkPong(string $data): void
{
// TODO reset the state and any ping timers
}
public function setGroup(?string $name): void public function setGroup(?string $name): void
{ {
if ($this->group) { if ($this->group) {
@ -150,32 +173,50 @@ class WebSocketConnection implements WebSocketInterface
return $this->request->getServerParams()['SERVER_ADDR']; return $this->request->getServerParams()['SERVER_ADDR'];
} }
/**
* {@inheritDoc}
*/
public function isReadable() public function isReadable()
{ {
return $this->inStream->isReadable(); return $this->inStream->isReadable();
} }
/**
* {@inheritDoc}
*/
public function pause() public function pause()
{ {
return $this->inStream->pause(); $this->inStream->pause();
} }
/**
* {@inheritDoc}
*/
public function resume() public function resume()
{ {
return $this->inStream->resume(); $this->inStream->resume();
} }
/**
* {@inheritDoc}
*/
public function isWritable() public function isWritable()
{ {
return $this->outStream->isWritable(); return $this->outStream->isWritable();
} }
/**
* {@inheritDoc}
*/
public function pipe(WritableStreamInterface $dest, array $options = array()) public function pipe(WritableStreamInterface $dest, array $options = array())
{ {
// TODO implement // TODO implement
return $dest; return $dest;
} }
/**
* {@inheritDoc}
*/
public function write($data) public function write($data)
{ {
return $this->send(self::OP_FRAME_TEXT, $data); return $this->send(self::OP_FRAME_TEXT, $data);
@ -204,35 +245,21 @@ class WebSocketConnection implements WebSocketInterface
return true; return true;
} }
/**
* {@inheritDoc}
*/
public function close() public function close()
{ {
$this->outStream->close(); $this->outStream->close();
$this->inStream->close(); $this->inStream->close();
} }
/**
* {@inheritDoc}
*/
public function end($data = null) public function end($data = null)
{ {
} }
// private function hexdump($data): void
// {
// printf("%4d .\n", strlen($data));
// $rows = str_split($data, 16);
// $offs = 0;
// foreach ($rows as $row) {
// $h = []; $a = [];
// for ($n = 0; $n < 16; $n++) {
// if ($n < strlen($row)) {
// $h[] = sprintf("%02x%s", ord($row[$n]), ($n==7)?" ":" ");
// $a[] = sprintf("%s%s", (ctype_print($row[$n])?$row[$n]:"."), ($n==7)?" ":"");
// } else {
// $h[] = (($n==7)?" ":" ");
// $a[] = (($n==7)?" ":" ");
// }
// }
// printf("%04x | %s | %s\n", 16 * $offs++, join("", $h), join("", $a));
// }
// }
} }

View File

@ -13,18 +13,18 @@ class WebSocketCodecTest extends \PHPUnit\Framework\TestCase
$codec = new WebSocketCodec(); $codec = new WebSocketCodec();
$msg = $codec->encode([ $msg = $codec->encode([
'opcode'=>WebSocketCodec::OP_PING, 'opcode'=>WebSocketConnection::OP_PING,
'payload'=>"ping" 'payload'=>"ping"
]); ]);
$this->assertEquals("\x89\x04ping", $msg); $this->assertEquals("\x89\x04ping", $msg);
$msg = $codec->encode([ $msg = $codec->encode([
'opcode'=>WebSocketCodec::OP_FRAME_TEXT, 'opcode'=>WebSocketConnection::OP_FRAME_TEXT,
'payload'=>"abcdefgh"]); 'payload'=>"abcdefgh"]);
$this->assertEquals("\x81\x08abcdefgh", $msg); $this->assertEquals("\x81\x08abcdefgh", $msg);
$msg = $codec->encode([ $msg = $codec->encode([
'opcode'=>WebSocketCodec::OP_FRAME_TEXT, 'opcode'=>WebSocketConnection::OP_FRAME_TEXT,
'payload'=>"abcdefgh", 'payload'=>"abcdefgh",
'masked'=>true, 'masked'=>true,
'mask'=>"\x00\x00\x00\x00" 'mask'=>"\x00\x00\x00\x00"
@ -32,7 +32,7 @@ class WebSocketCodecTest extends \PHPUnit\Framework\TestCase
$this->assertEquals("\x81\x88\x00\x00\x00\x00abcdefgh", $msg); $this->assertEquals("\x81\x88\x00\x00\x00\x00abcdefgh", $msg);
$msg = $codec->encode([ $msg = $codec->encode([
'opcode'=>WebSocketCodec::OP_FRAME_TEXT, 'opcode'=>WebSocketConnection::OP_FRAME_TEXT,
'payload'=>"abcdefgh", 'payload'=>"abcdefgh",
'masked'=>true, 'masked'=>true,
'mask'=>"\x00\xFF\x00\xFF" 'mask'=>"\x00\xFF\x00\xFF"
@ -47,7 +47,7 @@ class WebSocketCodecTest extends \PHPUnit\Framework\TestCase
$msg = $codec->decode("\x89\x04ping"); $msg = $codec->decode("\x89\x04ping");
$this->assertEquals([ $this->assertEquals([
'opcode'=>WebSocketCodec::OP_PING, 'opcode'=>WebSocketConnection::OP_PING,
'payload'=>"ping", 'payload'=>"ping",
'final'=>true, 'final'=>true,
'rsv1'=>false, 'rsv1'=>false,
@ -59,7 +59,7 @@ class WebSocketCodecTest extends \PHPUnit\Framework\TestCase
$msg = $codec->decode("\x81\x08abcdefgh"); $msg = $codec->decode("\x81\x08abcdefgh");
$this->assertEquals([ $this->assertEquals([
'opcode'=>WebSocketCodec::OP_FRAME_TEXT, 'opcode'=>WebSocketConnection::OP_FRAME_TEXT,
'payload'=>"abcdefgh", 'payload'=>"abcdefgh",
'final'=>true, 'final'=>true,
'rsv1'=>false, 'rsv1'=>false,
@ -71,7 +71,7 @@ class WebSocketCodecTest extends \PHPUnit\Framework\TestCase
$msg = $codec->decode("\x81\x88\x00\x00\x00\x00abcdefgh"); $msg = $codec->decode("\x81\x88\x00\x00\x00\x00abcdefgh");
$this->assertEquals([ $this->assertEquals([
'opcode'=>WebSocketCodec::OP_FRAME_TEXT, 'opcode'=>WebSocketConnection::OP_FRAME_TEXT,
'payload'=>"abcdefgh", 'payload'=>"abcdefgh",
'final'=>true, 'final'=>true,
'rsv1'=>false, 'rsv1'=>false,
@ -84,7 +84,7 @@ class WebSocketCodecTest extends \PHPUnit\Framework\TestCase
$msg = $codec->decode("\x81\x88\x00\xFF\x00\xFFa\x9dc\x9be\x99g\x97"); $msg = $codec->decode("\x81\x88\x00\xFF\x00\xFFa\x9dc\x9be\x99g\x97");
$this->assertEquals([ $this->assertEquals([
'opcode'=>WebSocketCodec::OP_FRAME_TEXT, 'opcode'=>WebSocketConnection::OP_FRAME_TEXT,
'payload'=>"abcdefgh", 'payload'=>"abcdefgh",
'final'=>true, 'final'=>true,
'rsv1'=>false, 'rsv1'=>false,