Skip to content

Commit 1a88d47

Browse files
committed
Rewrite client to use packet flows and interfaces instead of simple parameters
1 parent 1643830 commit 1a88d47

File tree

5 files changed

+784
-639
lines changed

5 files changed

+784
-639
lines changed

README.md

Lines changed: 52 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -22,80 +22,103 @@ Connect to a public broker and run forever.
2222
<?php
2323

2424
use BinSoul\Net\Mqtt\Client\React\ReactMqttClient;
25-
use React\SocketClient\Connector;
25+
use BinSoul\Net\Mqtt\Connection;
26+
use BinSoul\Net\Mqtt\DefaultMessage;
27+
use BinSoul\Net\Mqtt\DefaultSubscription;
28+
use BinSoul\Net\Mqtt\Message;
29+
use BinSoul\Net\Mqtt\Subscription;
30+
use React\SocketClient\DnsConnector;
31+
use React\SocketClient\TcpConnector;
2632

2733
include 'vendor/autoload.php';
2834

2935
// Setup client
3036
$loop = React\EventLoop\Factory::create();
3137
$dnsResolverFactory = new React\Dns\Resolver\Factory();
32-
$connector = new Connector($loop, $dnsResolverFactory->createCached('8.8.8.8', $loop));
38+
$connector = new DnsConnector(new TcpConnector($loop), $dnsResolverFactory->createCached('8.8.8.8', $loop));
3339
$client = new ReactMqttClient($connector, $loop);
3440

3541
// Bind to events
36-
$client->on('connect', function () {
37-
echo "Connected.\n";
42+
$client->on('open', function () use ($client) {
43+
// Network connection established
44+
echo sprintf("Open: %s:%s\n", $client->getHost(), $client->getPort());
3845
});
3946

40-
$client->on('disconnect', function () {
41-
echo "Disconnected.\n";
47+
$client->on('close', function () use ($client, $loop) {
48+
// Network connection closed
49+
echo sprintf("Close: %s:%s\n", $client->getHost(), $client->getPort());
50+
51+
$loop->stop();
52+
});
53+
54+
$client->on('connect', function (Connection $connection) {
55+
// Broker connected
56+
echo sprintf("Connect: client=%s\n", $connection->getClientID());
4257
});
4358

44-
$client->on('message', function ($topic, $message, $isDuplicate, $isRetained) {
45-
echo 'Incoming: '.$topic.' => '.mb_strimwidth($message, 0, 50, '...');
46-
47-
if ($isDuplicate) {
59+
$client->on('disconnect', function (Connection $connection) {
60+
// Broker disconnected
61+
echo sprintf("Disconnect: client=%s\n", $connection->getClientID());
62+
});
63+
64+
$client->on('message', function (Message $message) {
65+
// Incoming message
66+
echo 'Message';
67+
68+
if ($message->isDuplicate()) {
4869
echo ' (duplicate)';
4970
}
5071

51-
if ($isRetained) {
72+
if ($message->isRetained()) {
5273
echo ' (retained)';
5374
}
5475

76+
echo ': '.$message->getTopic().' => '.mb_strimwidth($message->getPayload(), 0, 50, '...');
5577
echo "\n";
5678
});
5779

5880
$client->on('warning', function (\Exception $e) {
59-
echo $e->getMessage();
81+
echo sprintf("Warning: %s\n", $e->getMessage());
6082
});
6183

62-
$client->on('error', function (\Exception $e) {
63-
echo $e->getMessage();
64-
die();
84+
$client->on('error', function (\Exception $e) use ($loop) {
85+
echo sprintf("Error: %s\n", $e->getMessage());
86+
87+
$loop->stop();
6588
});
6689

6790
// Connect to broker
6891
$client->connect('test.mosquitto.org')->then(
69-
function (ReactMqttClient $client) {
70-
// Subscribe to all topics below "sensors"
71-
$client->subscribe('sensors/#')
72-
->then(function ($topic) {
73-
echo sprintf("Subscribed to topic '%s'.\n", $topic);
92+
function () use ($client) {
93+
// Subscribe to all topics
94+
$client->subscribe(new DefaultSubscription('#'))
95+
->then(function (Subscription $subscription) {
96+
echo sprintf("Subscribe: %s\n", $subscription->getFilter());
7497
})
7598
->otherwise(function (\Exception $e) {
76-
echo $e->getMessage();
99+
echo sprintf("Error: %s\n", $e->getMessage());
77100
});
78101

79102
// Publish humidity once
80-
$client->publish('sensors/humidity', '55 %', 1)
81-
->then(function ($value) {
82-
echo sprintf("Published message '%s'.\n", $value);
103+
$client->publish(new DefaultMessage('sensors/humidity', '55%'))
104+
->then(function (Message $message) {
105+
echo sprintf("Publish: %s => %s\n", $message->getTopic(), $message->getPayload());
83106
})
84107
->otherwise(function (\Exception $e) {
85-
echo $e->getMessage();
108+
echo sprintf("Error: %s\n", $e->getMessage());
86109
});
87110

88111
// Publish a random temperature every 10 seconds
89112
$generator = function () {
90113
return mt_rand(-20, 30);
91114
};
92115

93-
$client->publishPeriodically(10, 'sensors/temperature', $generator, 1)
94-
->progress(function ($value) {
95-
echo sprintf("Published message '%s'.\n", $value);
116+
$client->publishPeriodically(10, new DefaultMessage('sensors/temperature'), $generator)
117+
->progress(function (Message $message) {
118+
echo sprintf("Publish: %s => %s\n", $message->getTopic(), $message->getPayload());
96119
})
97120
->otherwise(function (\Exception $e) {
98-
echo $e->getMessage();
121+
echo sprintf("Error: %s\n", $e->getMessage());
99122
});
100123
}
101124
);

composer.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
],
1919
"require": {
2020
"php": "~5.6|~7.0",
21-
"binsoul/net-mqtt": "^0.1",
21+
"binsoul/net-mqtt": "~0.2",
2222
"react/promise": "~2.0",
2323
"react/socket-client": "~0.5"
2424
},

src/ReactFlow.php

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
<?php
2+
3+
namespace BinSoul\Net\Mqtt\Client\React;
4+
5+
use BinSoul\Net\Mqtt\Flow;
6+
use BinSoul\Net\Mqtt\Packet;
7+
use React\Promise\Deferred;
8+
9+
/**
10+
* Decorates flows with data required for the {@see ReactMqttClient} class.
11+
*/
12+
class ReactFlow implements Flow
13+
{
14+
/** @var Flow */
15+
private $decorated;
16+
/** @var Deferred */
17+
private $deferred;
18+
/** @var Packet */
19+
private $packet;
20+
21+
/**
22+
* Constructs an instance of this class.
23+
*
24+
* @param Flow $decorated
25+
* @param Deferred $deferred
26+
* @param Packet $packet
27+
*/
28+
public function __construct(Flow $decorated, Deferred $deferred, Packet $packet = null)
29+
{
30+
$this->decorated = $decorated;
31+
$this->deferred = $deferred;
32+
$this->packet = $packet;
33+
}
34+
35+
public function getCode()
36+
{
37+
return $this->decorated->getCode();
38+
}
39+
40+
public function start()
41+
{
42+
$this->packet = $this->decorated->start();
43+
44+
return $this->packet;
45+
}
46+
47+
public function accept(Packet $packet)
48+
{
49+
return $this->decorated->accept($packet);
50+
}
51+
52+
public function next(Packet $packet)
53+
{
54+
$this->packet = $this->decorated->next($packet);
55+
56+
return $this->packet;
57+
}
58+
59+
public function isFinished()
60+
{
61+
return $this->decorated->isFinished();
62+
}
63+
64+
public function isSuccess()
65+
{
66+
return $this->decorated->isSuccess();
67+
}
68+
69+
public function getResult()
70+
{
71+
return $this->decorated->getResult();
72+
}
73+
74+
public function getErrorMessage()
75+
{
76+
return $this->decorated->getErrorMessage();
77+
}
78+
79+
/**
80+
* Returns the associated deferred.
81+
*
82+
* @return Deferred
83+
*/
84+
public function getDeferred()
85+
{
86+
return $this->deferred;
87+
}
88+
89+
/**
90+
* Returns the current packet.
91+
*
92+
* @return Packet
93+
*/
94+
public function getPacket()
95+
{
96+
return $this->packet;
97+
}
98+
}

0 commit comments

Comments
 (0)