codec = new WebSocketProtocol(); $this->request = $request; $this->inStream = $inStream; $this->outStream = $outStream; $this->groupManager = $groupManager; $this->inStream->on('data', $this->onWebSocketData(...)); $this->inStream->on('close', $this->close(...)); } private function onWebSocketData($data) { $decoded = $this->codec->decode($data); $opcode = $decoded['opcode']; $final = $decoded['final']; $payload = $decoded['payload']; if (!$final) { if ($this->bufferedOp === null) { $this->buffer = $payload; $this->bufferedOp = $opcode; } else { $this->buffer .= $payload; } // Break out to avoid processing partial messages return; } if ($this->bufferedOp !== null) { $payload = $this->buffer . $payload; $this->buffer = null; $opcode = $this->bufferedOp; $this->bufferedOp = null; } switch ($opcode) { case self::OP_PING: $this->sendPong($payload); return; case self::OP_PONG: $this->checkPong($payload); return; case self::OP_CLOSE: // TODO implement return; case self::OP_CONTINUATION: $this->buffer .= $payload; return; case self::OP_FRAME_TEXT: $this->emit('text', [ $payload ]); return; case self::OP_FRAME_BINARY: $this->emit('binary', [ $payload ]); return; } } /** * Sends a ping, and closes the connection on timeout. * */ public function ping(): void { // TODO save the state somehow $payload = "ping"; $this->send(self::OP_PING, $payload, true); } private function sendPong(string $data): void { $this->send(self::OP_PONG, $data, true); } private function checkPong(string $data): void { // TODO reset the state and any ping timers } public function setGroup(?string $name): void { if ($this->group) { $this->group->remove($this); $this->group = null; } $group = $this->groupManager->getConnectionGroup($name); $group->add($this); $this->group = $group; $this->groupName = $name; } public function getGroupName(): ?string { return $this->groupName; } /** * @return WebSocketInterface[] */ public function getGroupMembers(): array { return $this->group ? $this->group->getAll() : []; } public function getGroup(): ?ConnectionGroup { return $this->group; } public function getServerRequest(): ServerRequestInterface { return $this->request; } public function getRemoteAddress() { return $this->request->getServerParams()['REMOTE_ADDR']; } public function getLocalAddress() { return $this->request->getServerParams()['SERVER_ADDR']; } /** * {@inheritDoc} */ public function isReadable() { return $this->inStream->isReadable(); } /** * {@inheritDoc} */ public function pause() { $this->inStream->pause(); } /** * {@inheritDoc} */ public function resume() { $this->inStream->resume(); } /** * {@inheritDoc} */ public function isWritable() { return $this->outStream->isWritable(); } /** * {@inheritDoc} */ public function pipe(WritableStreamInterface $dest, array $options = array()) { // TODO implement return $dest; } /** * {@inheritDoc} * * @see writeBinary() to write binary frames as opposed to text frames. */ public function write($data) { return $this->send(self::OP_FRAME_TEXT, $data); } /** * Write binary frames. * * @param string $data * @return bool */ public function writeBinary($data) { return $this->send(self::OP_FRAME_BINARY, $data); } /** * Encode and send a frame. * * @param int $opcode * @param string $data * @param bool $final * @param bool $masked * @return bool */ public function send(int $opcode, string $data, bool $final = true, bool $masked = false) { $frame = $this->codec->encode([ 'opcode' => $opcode, 'payload' => $data, 'final' => $final, 'masked' => $masked ]); return $this->outStream->write($frame); } /** * {@inheritDoc} */ public function close() { $this->outStream->close(); $this->inStream->close(); $this->emit('close', []); } /** * {@inheritDoc} */ public function closeWithReason(string $reason, int $code=1000): void { $payload = chr(($code >> 8) & 0xFF) . chr($code & 0xFF) . $reason; $this->send(self::OP_CLOSE, $payload); } /** * {@inheritDoc} */ public function end($data = null) { // TODO implement me } }