26
26
use React \EventLoop \LoopInterface ;
27
27
use React \Promise \ExtendedPromiseInterface ;
28
28
use React \Promise \RejectedPromise ;
29
- use React \SocketClient \ConnectorInterface ;
30
- use React \Stream \Stream ;
29
+ use React \Socket \ConnectorInterface ;
30
+ use React \Stream \DuplexStreamInterface ;
31
31
32
32
/**
33
33
* Connects to a MQTT broker and subscribes to topics or publishes messages.
@@ -52,7 +52,7 @@ class ReactMqttClient extends EventEmitter
52
52
private $ connector ;
53
53
/** @var LoopInterface */
54
54
private $ loop ;
55
- /** @var Stream */
55
+ /** @var DuplexStreamInterface */
56
56
private $ stream ;
57
57
/** @var StreamParser */
58
58
private $ parser ;
@@ -147,7 +147,7 @@ public function isConnected()
147
147
/**
148
148
* Returns the underlying stream or null if the client is not connected.
149
149
*
150
- * @return Stream |null
150
+ * @return DuplexStreamInterface |null
151
151
*/
152
152
public function getStream ()
153
153
{
@@ -187,7 +187,7 @@ public function connect($host, $port = 1883, Connection $connection = null, $tim
187
187
$ deferred = new Deferred ();
188
188
189
189
$ this ->establishConnection ($ this ->host , $ this ->port , $ timeout )
190
- ->then (function (Stream $ stream ) use ($ connection , $ deferred , $ timeout ) {
190
+ ->then (function (DuplexStreamInterface $ stream ) use ($ connection , $ deferred , $ timeout ) {
191
191
$ this ->stream = $ stream ;
192
192
193
193
$ this ->emit ('open ' , [$ connection , $ this ]);
@@ -377,19 +377,15 @@ function () use ($deferred, $timeout) {
377
377
}
378
378
);
379
379
380
- $ this ->connector ->create ($ host, $ port )
380
+ $ this ->connector ->connect ($ host. ' : ' . $ port )
381
381
->always (function () use ($ timer ) {
382
382
$ this ->loop ->cancelTimer ($ timer );
383
383
})
384
- ->then (function (Stream $ stream ) use ($ deferred, $ timeout ) {
384
+ ->then (function (DuplexStreamInterface $ stream ) use ($ deferred ) {
385
385
$ stream ->on ('data ' , function ($ data ) {
386
386
$ this ->handleReceive ($ data );
387
387
});
388
388
389
- $ stream ->getBuffer ()->on ('full-drain ' , function () {
390
- $ this ->handleSend ();
391
- });
392
-
393
389
$ stream ->on ('close ' , function () {
394
390
$ this ->handleClose ();
395
391
});
@@ -467,6 +463,8 @@ private function handleReceive($data)
467
463
if ($ flowCount > count ($ this ->receivingFlows )) {
468
464
$ this ->receivingFlows = array_values ($ this ->receivingFlows );
469
465
}
466
+
467
+ $ this ->handleSend ();
470
468
}
471
469
472
470
/**
@@ -603,11 +601,12 @@ private function startFlow(Flow $flow, $isSilent = false)
603
601
$ internalFlow = new ReactFlow ($ flow , $ deferred , $ packet , $ isSilent );
604
602
605
603
if ($ packet !== null ) {
606
- if ($ this ->stream -> getBuffer ()-> listening ) {
604
+ if ($ this ->writtenFlow !== null ) {
607
605
$ this ->sendingFlows [] = $ internalFlow ;
608
606
} else {
609
607
$ this ->stream ->write ($ packet );
610
608
$ this ->writtenFlow = $ internalFlow ;
609
+ $ this ->handleSend ();
611
610
}
612
611
} else {
613
612
$ this ->loop ->nextTick (function () use ($ internalFlow ) {
@@ -635,11 +634,12 @@ private function continueFlow(ReactFlow $flow, Packet $packet)
635
634
}
636
635
637
636
if ($ response !== null ) {
638
- if ($ this ->stream -> getBuffer ()-> listening ) {
637
+ if ($ this ->writtenFlow !== null ) {
639
638
$ this ->sendingFlows [] = $ flow ;
640
639
} else {
641
640
$ this ->stream ->write ($ response );
642
641
$ this ->writtenFlow = $ flow ;
642
+ $ this ->handleSend ();
643
643
}
644
644
} elseif ($ flow ->isFinished ()) {
645
645
$ this ->loop ->nextTick (function () use ($ flow ) {
0 commit comments