Compare commits
5 Commits
Author | SHA1 | Date | |
---|---|---|---|
1c9d8a4a05 | |||
054e052da7 | |||
3abdced846 | |||
ca84671f33 | |||
b0aede55b9 |
66
README.md
66
README.md
@ -14,6 +14,14 @@ Ratchet is great! I've used Ratchet in the past, and it is a fantastic piece of
|
||||
|
||||
TL;DR - If you need to build an application with neatly wrapped classes without caring to much about the internals, go with Ratchet. If you want to work with websockets in the same way you work with sockets in ReactPHP, go with this library.
|
||||
|
||||
## Missing Features
|
||||
|
||||
The following features are missing, or work in progress:
|
||||
|
||||
* Idle timeout and ping timeout
|
||||
* Protocol errors should close with error codes
|
||||
* Exceptions
|
||||
|
||||
## Server
|
||||
|
||||
The WebSocket handler is built as a HttpServer middleware. This makes sense as WebSocket as a protocol is running over HTTP. Connections are set up by the middleware and exposed via the `connect` event.
|
||||
@ -55,30 +63,86 @@ $http->listen($socket);
|
||||
|
||||
```
|
||||
|
||||
### Server Events
|
||||
### WebSocketMiddleware Events
|
||||
|
||||
#### connection
|
||||
|
||||
```php
|
||||
function (WebSocketInterface $member)
|
||||
```
|
||||
|
||||
This event is emitted when a new WebSocket request has been accepted. The `WebSocketConnection` is passed as the first argument.
|
||||
|
||||
### WebSocketConnection events
|
||||
|
||||
#### ping
|
||||
|
||||
```php
|
||||
function (string $payload)
|
||||
```
|
||||
|
||||
This event will be emitted upon receiving a frame with a ping opcode. The pong response has already been sent automatically, unless 'no_auto_pong' is set in the context.
|
||||
|
||||
#### pong
|
||||
|
||||
```php
|
||||
function (string $payload)
|
||||
```
|
||||
|
||||
This event will be emitted upon receiving a frame with a pong opcode.
|
||||
|
||||
#### text
|
||||
|
||||
```php
|
||||
function (string $payload)
|
||||
```
|
||||
|
||||
This event will be emitted when a text data frame have been received and decoded.
|
||||
|
||||
#### binary
|
||||
|
||||
```php
|
||||
function (string $payload)
|
||||
```
|
||||
|
||||
This event will be emitted when a binary data frame have been received and decoded.
|
||||
|
||||
#### close
|
||||
|
||||
```php
|
||||
function ()
|
||||
```
|
||||
|
||||
#### error
|
||||
|
||||
```php
|
||||
function (?string $reason, ?int $code)
|
||||
```
|
||||
|
||||
### GroupManager events
|
||||
|
||||
#### create
|
||||
|
||||
```php
|
||||
function (ConnectionGroup $group)
|
||||
```
|
||||
|
||||
#### destroy
|
||||
|
||||
```php
|
||||
function (ConnectionGroup $group)
|
||||
```
|
||||
|
||||
### ConnectionGroup events
|
||||
|
||||
#### join
|
||||
|
||||
```php
|
||||
function (WebSocketInterface $member)
|
||||
```
|
||||
|
||||
#### leave
|
||||
|
||||
```php
|
||||
function (WebSocketInterface $member)
|
||||
```
|
||||
|
@ -36,6 +36,14 @@ $groupManager->on('created', function (ConnectionGroup $group) {
|
||||
$middleware = new WebSocketMiddleware($groupManager);
|
||||
```
|
||||
|
||||
## Sending messages
|
||||
|
||||
You can use the `ConnectionGoup::writeAll(string $payload)` method to send the payload to all members of the group.
|
||||
|
||||
## Disconnecting clients
|
||||
|
||||
You can disconnect clients cleanly on shutdown by using the `GroupManager::closeAll(string $reason, int $code)` method. You can also call on `ConnectionGrroup::closeAll` manually do disconnect a whole group.
|
||||
|
||||
## Future
|
||||
|
||||
* Add a GroupManagerImplementation so custom logic can be provided.
|
||||
|
@ -61,8 +61,20 @@ class ConnectionGroup implements EventEmitterInterface, IteratorAggregate, Count
|
||||
return $this->name;
|
||||
}
|
||||
|
||||
public function write(string $payload)
|
||||
public function writeAll(string $payload)
|
||||
{
|
||||
foreach ($this->connections as $connection) {
|
||||
if ($connection->isWritable()) {
|
||||
$connection->write($payload);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public function closeAll(string $reason, int $code=1001)
|
||||
{
|
||||
foreach ($this->connections as $connection) {
|
||||
$connection->closeWithReason($reason, $code);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
}
|
@ -13,11 +13,11 @@ class GroupManager implements EventEmitterInterface
|
||||
/**
|
||||
* @var string emitted when a new group is created
|
||||
*/
|
||||
const EVENT_CREATED = 'created';
|
||||
const EVENT_CREATE = 'create';
|
||||
/**
|
||||
* @var string emitted after the last member leaves, when the group is destroyed
|
||||
*/
|
||||
const EVENT_DESTROYED = 'destroyed';
|
||||
const EVENT_DESTROY = 'destroy';
|
||||
|
||||
/** @var array<string,ConnectionGroup> */
|
||||
private array $groups = [];
|
||||
@ -31,18 +31,24 @@ class GroupManager implements EventEmitterInterface
|
||||
$group->on(ConnectionGroup::EVENT_LEAVE, function () use ($group) {
|
||||
Loop::futureTick(function () use ($group) {
|
||||
if (count($group) === 0) {
|
||||
$this->emit(self::EVENT_DESTROYED, [ $group ]);
|
||||
$this->emit(self::EVENT_DESTROY, [ $group ]);
|
||||
$group->removeAllListeners();
|
||||
unset($this->groups[$group->getName()]);
|
||||
}
|
||||
});
|
||||
});
|
||||
$this->emit(self::EVENT_CREATED, [ $group ]);
|
||||
$this->emit(self::EVENT_CREATE, [ $group ]);
|
||||
} else {
|
||||
$group = $this->groups[$name];
|
||||
}
|
||||
return $group;
|
||||
}
|
||||
|
||||
public function closeAll(string $reason, int $code=1001)
|
||||
{
|
||||
foreach ($this->groups as $group) {
|
||||
$group->closeAll($reason, $code);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
@ -28,8 +28,8 @@ class WebSocketConnection implements WebSocketInterface
|
||||
/** @var string|null The name of the group that this connection has joined, or null */
|
||||
private ?string $groupName = null;
|
||||
|
||||
/** @var WebSocketCodec The frame encoder/decoder */
|
||||
private WebSocketCodec $codec;
|
||||
/** @var WebSocketProtocol The frame encoder/decoder */
|
||||
private WebSocketProtocol $codec;
|
||||
|
||||
private ?ConnectionGroup $group = null;
|
||||
|
||||
@ -41,14 +41,16 @@ class WebSocketConnection implements WebSocketInterface
|
||||
|
||||
private ServerRequestInterface $request;
|
||||
|
||||
/** @var string|null Buffer for fragmented messages */
|
||||
private ?string $buffer = null;
|
||||
|
||||
/** @var int|null The opcode of a fragmented message */
|
||||
private ?int $bufferedOp = null;
|
||||
|
||||
public function __construct(ServerRequestInterface $request, ReadableStreamInterface $inStream, WritableStreamInterface $outStream, GroupManager $groupManager)
|
||||
{
|
||||
// The codec is used to encode and decode frames
|
||||
$this->codec = new WebSocketCodec();
|
||||
$this->codec = new WebSocketProtocol();
|
||||
|
||||
$this->request = $request;
|
||||
$this->inStream = $inStream;
|
||||
@ -250,8 +252,17 @@ class WebSocketConnection implements WebSocketInterface
|
||||
*/
|
||||
public function close()
|
||||
{
|
||||
// TODO send close
|
||||
$this->outStream->close();
|
||||
$this->inStream->close();
|
||||
// TODO emit close event
|
||||
}
|
||||
|
||||
public function closeWithReason(string $reason, int $code=1000)
|
||||
{
|
||||
// TODO send close
|
||||
$payload = chr(($code >> 8) & 0xFF) . chr($code & 0xFF) . $reason;
|
||||
$this->send(self::OP_CLOSE, $payload);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -23,4 +23,6 @@ interface WebSocketInterface extends ConnectionInterface
|
||||
|
||||
public function getGroup(): ?ConnectionGroup;
|
||||
|
||||
public function closeWithReason(string $reason, int $code=1000);
|
||||
|
||||
}
|
@ -14,7 +14,7 @@ use React\Stream\ReadableStreamInterface;
|
||||
use React\Stream\ThroughStream;
|
||||
use React\Stream\WritableStreamInterface;
|
||||
|
||||
class WebSocketCodec
|
||||
class WebSocketProtocol
|
||||
{
|
||||
|
||||
/**
|
52
tests/Group/ConnectionGroupTest.php
Normal file
52
tests/Group/ConnectionGroupTest.php
Normal file
@ -0,0 +1,52 @@
|
||||
<?php
|
||||
|
||||
namespace NoccyLabs\React\WebSocket\Group;
|
||||
|
||||
use NoccyLabs\React\WebSocket\WebSocketConnection;
|
||||
use PHPUnit\Framework\Attributes\CoversClass;
|
||||
use React\Http\Message\ServerRequest;
|
||||
use React\Stream\ThroughStream;
|
||||
|
||||
#[CoversClass(ConnectionGroup::class)]
|
||||
class ConnectionGroupTest extends \PHPUnit\Framework\TestCase
|
||||
{
|
||||
|
||||
public function testThatGroupsAreCountable()
|
||||
{
|
||||
$groupManager = new GroupManager();
|
||||
|
||||
$group = new ConnectionGroup();
|
||||
$this->assertEquals(0, count($group));
|
||||
$group->add(new WebSocketConnection(new ServerRequest('GET','/'),new ThroughStream(), new ThroughStream(), $groupManager));
|
||||
$this->assertEquals(1, count($group));
|
||||
$group->add(new WebSocketConnection(new ServerRequest('GET','/'),new ThroughStream(), new ThroughStream(), $groupManager));
|
||||
$this->assertEquals(2, count($group));
|
||||
$group->add(new WebSocketConnection(new ServerRequest('GET','/'),new ThroughStream(), new ThroughStream(), $groupManager));
|
||||
$this->assertEquals(3, count($group));
|
||||
}
|
||||
|
||||
public function testThatGroupNamesAreAssignedUniquely()
|
||||
{
|
||||
$group = new ConnectionGroup();
|
||||
$this->assertNotEmpty($group->getName());
|
||||
|
||||
$group2 = new ConnectionGroup();
|
||||
$this->assertNotEmpty($group2->getName());
|
||||
|
||||
$this->assertNotEquals($group->getName(), $group2->getName());
|
||||
}
|
||||
|
||||
public function testWritingToAll()
|
||||
{
|
||||
$groupManager = new GroupManager();
|
||||
|
||||
$r = [];
|
||||
$group = new ConnectionGroup();
|
||||
$group->add(new WebSocketConnection(new ServerRequest('GET','/'),new ThroughStream(), new ThroughStream(function ($data) use (&$r) { $r[]=$data; }), $groupManager));
|
||||
$group->add(new WebSocketConnection(new ServerRequest('GET','/'),new ThroughStream(), new ThroughStream(function ($data) use (&$r) { $r[]=$data; }), $groupManager));
|
||||
$group->add(new WebSocketConnection(new ServerRequest('GET','/'),new ThroughStream(), new ThroughStream(function ($data) use (&$r) { $r[]=$data; }), $groupManager));
|
||||
$group->writeAll("test");
|
||||
$this->assertEquals(3, count($r));
|
||||
}
|
||||
|
||||
}
|
@ -4,13 +4,13 @@ namespace NoccyLabs\React\WebSocket;
|
||||
|
||||
use PHPUnit\Framework\Attributes\CoversClass;
|
||||
|
||||
#[CoversClass(WebSocketCodec::class)]
|
||||
class WebSocketCodecTest extends \PHPUnit\Framework\TestCase
|
||||
#[CoversClass(WebSocketProtocol::class)]
|
||||
class WebSocketProtocolTest extends \PHPUnit\Framework\TestCase
|
||||
{
|
||||
|
||||
public function testEncodingFrames()
|
||||
{
|
||||
$codec = new WebSocketCodec();
|
||||
$codec = new WebSocketProtocol();
|
||||
|
||||
$msg = $codec->encode([
|
||||
'opcode'=>WebSocketConnection::OP_PING,
|
||||
@ -43,7 +43,7 @@ class WebSocketCodecTest extends \PHPUnit\Framework\TestCase
|
||||
|
||||
public function testDecodingFrames()
|
||||
{
|
||||
$codec = new WebSocketCodec();
|
||||
$codec = new WebSocketProtocol();
|
||||
|
||||
$msg = $codec->decode("\x89\x04ping");
|
||||
$this->assertEquals([
|
Reference in New Issue
Block a user