diff --git a/src/CommandBus.php b/src/CommandBus.php index 8dc7f4f..cddfbb4 100644 --- a/src/CommandBus.php +++ b/src/CommandBus.php @@ -51,8 +51,10 @@ class CommandBus implements CommandBusInterface $this->connections->attach($client); $client->on('data', function ($data) use ($client) { try { - $message = Message::fromString($data); - $this->onClientMessage($client, $message); + $messages = Message::fromStringMulti($data); + foreach ($messages as $message) { + $this->onClientMessage($client, $message); + } } catch (MessageException $e) { $client->end('{"msg":"error","data":{"error":"Bad message format"}}'); } @@ -71,6 +73,9 @@ class CommandBus implements CommandBusInterface $this->executeContext($context)->then( function ($result) use ($message, $client) { $client->write($message->asResult($result)->toJson()."\n"); + }, + function (\Throwable $error) use ($message, $client) { + $client->write($message->asError($error->getMessage())->toJson()."\n"); } ); break; diff --git a/src/CommandBusClient.php b/src/CommandBusClient.php index 2e79f9f..262e993 100644 --- a/src/CommandBusClient.php +++ b/src/CommandBusClient.php @@ -74,7 +74,8 @@ class CommandBusClient implements CommandBusInterface $this->connection = $connection; $this->emit(self::EVENT_CONNECTED); $connection->on('error', function ($error) { - $this->emit(self::EVENT_DISCONNECTED, [ $error ]); + $this->emit(self::EVENT_ERROR, [ $error ]); + $this->emit(self::EVENT_DISCONNECTED); if ($this->autoReconnect) { Loop::addTimer(1, $this->reconnect(...)); } @@ -89,8 +90,10 @@ class CommandBusClient implements CommandBusInterface }); $connection->on('data', function ($data) use ($connection) { try { - $message = Message::fromString($data); - $this->onServerMessage($message); + $messages = Message::fromStringMulti($data); + foreach ($messages as $message) { + $this->onServerMessage($message); + } } catch (MessageException $e) { $connection->end('{"msg":"error","data":{"error":"Bad message format"}}'); } diff --git a/src/Message.php b/src/Message.php index ff599df..b02d5f4 100644 --- a/src/Message.php +++ b/src/Message.php @@ -66,6 +66,11 @@ class Message implements JsonSerializable return new Message($json['msg'], $json['data'], $json['uuid']??false); } + public static function fromStringMulti(string $data): array + { + return array_map(fn($v) => Message::fromString($v), array_filter(explode("\n", $data))); + } + public function asResult($result): Message { return new Message(self::MSGTYPE_RESULT, [ @@ -73,6 +78,13 @@ class Message implements JsonSerializable ], $this->uuid); } + public function asError($error): Message + { + return new Message(self::MSGTYPE_ERROR, [ + 'error' => $error + ], $this->uuid); + } + public function toJson(): string { return json_encode($this, JSON_UNESCAPED_SLASHES);