Compare commits
2 Commits
Author | SHA1 | Date | |
---|---|---|---|
74960345ba | |||
39f2ea4e11 |
@ -71,6 +71,7 @@ class CommandBusClient implements CommandBusInterface
|
|||||||
$this->isReconnecting = true;
|
$this->isReconnecting = true;
|
||||||
$this->connector->connect($this->address)->then(
|
$this->connector->connect($this->address)->then(
|
||||||
function (DuplexStreamInterface $connection) {
|
function (DuplexStreamInterface $connection) {
|
||||||
|
$this->isReconnecting = false;
|
||||||
$this->connection = $connection;
|
$this->connection = $connection;
|
||||||
$this->emit(self::EVENT_CONNECTED);
|
$this->emit(self::EVENT_CONNECTED);
|
||||||
$connection->on('error', function ($error) {
|
$connection->on('error', function ($error) {
|
||||||
@ -79,14 +80,12 @@ class CommandBusClient implements CommandBusInterface
|
|||||||
if ($this->autoReconnect) {
|
if ($this->autoReconnect) {
|
||||||
Loop::addTimer(1, $this->reconnect(...));
|
Loop::addTimer(1, $this->reconnect(...));
|
||||||
}
|
}
|
||||||
$this->isReconnecting = false;
|
|
||||||
});
|
});
|
||||||
$connection->on('close', function () {
|
$connection->on('close', function () {
|
||||||
$this->emit(self::EVENT_DISCONNECTED);
|
$this->emit(self::EVENT_DISCONNECTED);
|
||||||
if ($this->autoReconnect) {
|
if ($this->autoReconnect) {
|
||||||
Loop::addTimer(1, $this->reconnect(...));
|
Loop::addTimer(1, $this->reconnect(...));
|
||||||
}
|
}
|
||||||
$this->isReconnecting = false;
|
|
||||||
});
|
});
|
||||||
$connection->on('data', function ($data) use ($connection) {
|
$connection->on('data', function ($data) use ($connection) {
|
||||||
try {
|
try {
|
||||||
@ -100,6 +99,7 @@ class CommandBusClient implements CommandBusInterface
|
|||||||
});
|
});
|
||||||
},
|
},
|
||||||
function (Throwable $e) {
|
function (Throwable $e) {
|
||||||
|
$this->isReconnecting = false;
|
||||||
$this->emit(self::EVENT_ERROR, [ $e->getMessage(), $e ]);
|
$this->emit(self::EVENT_ERROR, [ $e->getMessage(), $e ]);
|
||||||
Loop::addTimer(5, $this->reconnect(...));
|
Loop::addTimer(5, $this->reconnect(...));
|
||||||
}
|
}
|
||||||
@ -127,7 +127,7 @@ class CommandBusClient implements CommandBusInterface
|
|||||||
$this->emit(self::EVENT_NOTIFY, [ $event, $data ]);
|
$this->emit(self::EVENT_NOTIFY, [ $event, $data ]);
|
||||||
break;
|
break;
|
||||||
case Message::MSGTYPE_ERROR: // error
|
case Message::MSGTYPE_ERROR: // error
|
||||||
var_dump($message);
|
//var_dump($message);
|
||||||
$uuid = $message->getUuid();
|
$uuid = $message->getUuid();
|
||||||
$error = $message->getData()['error'];
|
$error = $message->getData()['error'];
|
||||||
if ($uuid && array_key_exists($uuid, $this->pending)) {
|
if ($uuid && array_key_exists($uuid, $this->pending)) {
|
||||||
|
Reference in New Issue
Block a user