2 Commits

View File

@ -71,6 +71,7 @@ class CommandBusClient implements CommandBusInterface
$this->isReconnecting = true; $this->isReconnecting = true;
$this->connector->connect($this->address)->then( $this->connector->connect($this->address)->then(
function (DuplexStreamInterface $connection) { function (DuplexStreamInterface $connection) {
$this->isReconnecting = false;
$this->connection = $connection; $this->connection = $connection;
$this->emit(self::EVENT_CONNECTED); $this->emit(self::EVENT_CONNECTED);
$connection->on('error', function ($error) { $connection->on('error', function ($error) {
@ -79,14 +80,12 @@ class CommandBusClient implements CommandBusInterface
if ($this->autoReconnect) { if ($this->autoReconnect) {
Loop::addTimer(1, $this->reconnect(...)); Loop::addTimer(1, $this->reconnect(...));
} }
$this->isReconnecting = false;
}); });
$connection->on('close', function () { $connection->on('close', function () {
$this->emit(self::EVENT_DISCONNECTED); $this->emit(self::EVENT_DISCONNECTED);
if ($this->autoReconnect) { if ($this->autoReconnect) {
Loop::addTimer(1, $this->reconnect(...)); Loop::addTimer(1, $this->reconnect(...));
} }
$this->isReconnecting = false;
}); });
$connection->on('data', function ($data) use ($connection) { $connection->on('data', function ($data) use ($connection) {
try { try {
@ -100,6 +99,7 @@ class CommandBusClient implements CommandBusInterface
}); });
}, },
function (Throwable $e) { function (Throwable $e) {
$this->isReconnecting = false;
$this->emit(self::EVENT_ERROR, [ $e->getMessage(), $e ]); $this->emit(self::EVENT_ERROR, [ $e->getMessage(), $e ]);
Loop::addTimer(5, $this->reconnect(...)); Loop::addTimer(5, $this->reconnect(...));
} }
@ -127,7 +127,7 @@ class CommandBusClient implements CommandBusInterface
$this->emit(self::EVENT_NOTIFY, [ $event, $data ]); $this->emit(self::EVENT_NOTIFY, [ $event, $data ]);
break; break;
case Message::MSGTYPE_ERROR: // error case Message::MSGTYPE_ERROR: // error
var_dump($message); //var_dump($message);
$uuid = $message->getUuid(); $uuid = $message->getUuid();
$error = $message->getData()['error']; $error = $message->getData()['error'];
if ($uuid && array_key_exists($uuid, $this->pending)) { if ($uuid && array_key_exists($uuid, $this->pending)) {