codec = new WebSocketCodec(); $this->request = $request; $this->inStream = $inStream; $this->outStream = $outStream; $this->groupManager = $groupManager; $this->inStream->on('data', $this->onWebSocketData(...)); } private function onWebSocketData($data) { $decoded = $this->codec->decode($data); $opcode = $decoded['opcode']; $final = $decoded['final']; $payload = $decoded['payload']; if (!$final) { if ($this->bufferedOp === null) { $this->buffer = $payload; $this->bufferedOp = $opcode; } else { $this->buffer .= $payload; } } if ($final) { $payload = $this->buffer . $payload; $this->buffer = null; if ($this->bufferedOp !== null) { $opcode = $this->bufferedOp; $this->bufferedOp = null; } } switch ($opcode) { case self::OP_PING: $this->sendPong($payload); return; case self::OP_PONG: return; case self::OP_CLOSE: // TODO implement return; case self::OP_CONTINUATION: $this->buffer .= $payload; return; case self::OP_FRAME_TEXT: $this->emit('text', [ $payload ]); return; case self::OP_FRAME_BINARY: $this->emit('binary', [ $payload ]); return; } } private function sendPong(string $data): void { $this->send(self::OP_PONG, $data, true); } public function setGroup(?string $name): void { if ($this->group) { $this->group->remove($this); $this->group = null; } $group = $this->groupManager->getConnectionGroup($name); $group->add($this); $this->group = $group; $this->groupName = $name; } public function getGroupName(): ?string { return $this->groupName; } /** * @return WebSocketInterface[] */ public function getGroupMembers(): array { return $this->group ? $this->group->getAll() : []; } public function getGroup(): ?ConnectionGroup { return $this->group; } public function getRemoteAddress() { return $this->request->getServerParams()['REMOTE_ADDR']; } public function getLocalAddress() { return $this->request->getServerParams()['SERVER_ADDR']; } public function isReadable() { return $this->inStream->isReadable(); } public function pause() { return $this->inStream->pause(); } public function resume() { return $this->inStream->resume(); } public function isWritable() { return $this->outStream->isWritable(); } public function pipe(WritableStreamInterface $dest, array $options = array()) { // TODO implement return $dest; } public function write($data) { return $this->send(self::OP_FRAME_TEXT, $data); } /** * */ public function send(int $opcode, string $data, bool $final = true, bool $masked = false) { $frame = $this->codec->encode([ 'opcode' => $opcode, 'payload' => $data, 'final' => $final, 'masked' => $masked ]); $this->outStream->write($frame); return true; } public function close() { $this->outStream->close(); $this->inStream->close(); } public function end($data = null) { } // private function hexdump($data): void // { // printf("%4d .\n", strlen($data)); // $rows = str_split($data, 16); // $offs = 0; // foreach ($rows as $row) { // $h = []; $a = []; // for ($n = 0; $n < 16; $n++) { // if ($n < strlen($row)) { // $h[] = sprintf("%02x%s", ord($row[$n]), ($n==7)?" ":" "); // $a[] = sprintf("%s%s", (ctype_print($row[$n])?$row[$n]:"."), ($n==7)?" ":""); // } else { // $h[] = (($n==7)?" ":" "); // $a[] = (($n==7)?" ":" "); // } // } // printf("%04x | %s | %s\n", 16 * $offs++, join("", $h), join("", $a)); // } // } }