Skip to content

Commit 6a80fea

Browse files
valgabinsoul
authored andcommitted
Cleanup timers on close and reject pending flows on connect
1 parent 091bee2 commit 6a80fea

File tree

1 file changed

+25
-0
lines changed

1 file changed

+25
-0
lines changed

src/ReactMqttClient.php

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,10 @@ public function connect($host, $port = 1883, Connection $connection = null, $tim
180180
$connection = new DefaultConnection();
181181
}
182182

183+
if ($connection->isCleanSession()) {
184+
$this->cleanPreviousSession();
185+
}
186+
183187
if ($connection->getClientID() === '') {
184188
$connection = $connection->withClientID($this->identifierGenerator->generateClientID());
185189
}
@@ -561,6 +565,8 @@ private function handleClose()
561565
$this->loop->cancelTimer($timer);
562566
}
563567

568+
$this->timer = [];
569+
564570
$connection = $this->connection;
565571

566572
$this->isConnecting = false;
@@ -673,4 +679,23 @@ private function finishFlow(ReactFlow $flow)
673679
$flow->getDeferred()->reject($result);
674680
}
675681
}
682+
683+
/**
684+
* Cleans previous session by rejecting all pending flows.
685+
*/
686+
private function cleanPreviousSession()
687+
{
688+
$error = new \RuntimeException('Connection has been closed.');
689+
690+
foreach ($this->receivingFlows as $receivingFlow) {
691+
$receivingFlow->getDeferred()->reject($error);
692+
}
693+
694+
foreach ($this->sendingFlows as $sendingFlow) {
695+
$sendingFlow->getDeferred()->reject($error);
696+
}
697+
698+
$this->receivingFlows = [];
699+
$this->sendingFlows = [];
700+
}
676701
}

0 commit comments

Comments
 (0)