Skip to content

Commit 01d4983

Browse files
authored
Merge pull request #35 from php-service-bus/connection_monitor
Connection monitor
2 parents d44df87 + 78aa1f9 commit 01d4983

File tree

8 files changed

+63
-25
lines changed

8 files changed

+63
-25
lines changed

composer.json

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,13 @@
2020
"require": {
2121
"php": ">=8.0",
2222
"amphp/amp": "v2.6.*",
23-
"amphp/socket": "v1.1.*",
23+
"amphp/socket": "v1.2.*",
2424
"phpinnacle/buffer": "v1.2.*"
2525
},
2626
"require-dev": {
2727
"phpunit/phpunit": "v9.5.*",
28-
"vimeo/psalm": "v4.13.*",
29-
"phpstan/phpstan": "v1.2.*"
28+
"vimeo/psalm": "v4.18.*",
29+
"phpstan/phpstan": "v1.4.*"
3030
},
3131
"prefer-stable": true,
3232
"autoload": {
@@ -52,6 +52,9 @@
5252
},
5353
"config": {
5454
"sort-packages": true,
55-
"optimize-autoloader": true
55+
"optimize-autoloader": true,
56+
"allow-plugins": {
57+
"composer/package-versions-deprecated": false
58+
}
5659
}
5760
}

examples/basic.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
}
2929

3030
yield $channel->consume(function (Message $message, Channel $channel) {
31-
echo $message->content() . \PHP_EOL;
31+
echo $message->content . \PHP_EOL;
3232

3333
yield $channel->ack($message);
3434
}, 'basic_queue');

examples/worker.php

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@
3232
echo '[*] Waiting for messages. To exit press CTRL+C', \PHP_EOL;
3333

3434
$tag = yield $channel->consume(function (Message $message, Channel $channel) {
35-
echo "[x] Received message: {$message->content()}.", \PHP_EOL;
35+
echo "[x] Received message: {$message->content}.", \PHP_EOL;
3636

3737
// Do some work - we generate password hashes with a high cost
3838
// sleep() gets interrupted by Ctrl+C so it's not very good for demos
@@ -43,12 +43,12 @@
4343
password_hash(random_bytes(255), PASSWORD_BCRYPT, ["cost" => 15]);
4444
}
4545

46-
echo "[x] Done ", $message->content(), \PHP_EOL;
46+
echo "[x] Done ", $message->content, \PHP_EOL;
4747

4848
try {
4949
yield $channel->ack($message);
5050

51-
echo "ACK SUCCESS:: {$message->content()}", \PHP_EOL;
51+
echo "ACK SUCCESS:: {$message->content}", \PHP_EOL;
5252
} catch (\Throwable $error) {
5353

5454
echo "ACK FAILED:: {$error->getMessage()}", \PHP_EOL;

phpstan.neon

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
parameters:
22
checkMissingIterableValueType: false
33
checkGenericClassInNonGenericObjectType: false
4-
ignoreErrors:
5-
- '#Cannot cast mixed to int#'
4+

src/Client.php

Lines changed: 39 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212

1313
namespace PHPinnacle\Ridge;
1414

15+
use Amp\Loop;
1516
use function Amp\asyncCall;
1617
use function Amp\call;
1718
use Amp\Deferred;
@@ -24,6 +25,8 @@ final class Client
2425
private const STATE_CONNECTED = 2;
2526
private const STATE_DISCONNECTING = 3;
2627

28+
private const CONNECTION_MONITOR_INTERVAL = 5000;
29+
2730
/**
2831
* @var Config
2932
*/
@@ -54,6 +57,11 @@ final class Client
5457
*/
5558
private $properties;
5659

60+
/**
61+
* @var string|null
62+
*/
63+
private $connectionMonitorWatcherId;
64+
5765
public function __construct(Config $config)
5866
{
5967
$this->config = $config;
@@ -91,7 +99,7 @@ function () {
9199

92100
$this->state = self::STATE_CONNECTING;
93101

94-
$this->connection = new Connection($this->config->uri(), fn() => $this->state = self::STATE_NOT_CONNECTED);
102+
$this->connection = new Connection($this->config->uri());
95103

96104
yield $this->connection->open(
97105
$this->config->timeout,
@@ -128,10 +136,22 @@ function () {
128136

129137
$this->connection->write($buffer);
130138
$this->connection->close();
139+
140+
$this->disableConnectionMonitor();
131141
}
132142
);
133143

134144
$this->state = self::STATE_CONNECTED;
145+
146+
$this->connectionMonitorWatcherId = Loop::repeat(
147+
self::CONNECTION_MONITOR_INTERVAL,
148+
function(): void
149+
{
150+
if($this->connection->connected() === false) {
151+
throw Exception\ClientException::disconnected();
152+
}
153+
}
154+
);
135155
}
136156
);
137157
}
@@ -143,6 +163,8 @@ function () {
143163
*/
144164
public function disconnect(int $code = 0, string $reason = ''): Promise
145165
{
166+
$this->disableConnectionMonitor();
167+
146168
return call(
147169
function () use ($code, $reason) {
148170
if (\in_array($this->state, [self::STATE_NOT_CONNECTED, self::STATE_DISCONNECTING])) {
@@ -153,6 +175,12 @@ function () use ($code, $reason) {
153175
throw Exception\ClientException::notConnected();
154176
}
155177

178+
if($this->connectionMonitorWatcherId !== null){
179+
Loop::cancel($this->connectionMonitorWatcherId);
180+
181+
$this->connectionMonitorWatcherId = null;
182+
}
183+
156184
$this->state = self::STATE_DISCONNECTING;
157185

158186
if ($code === 0) {
@@ -231,7 +259,7 @@ function () {
231259

232260
public function isConnected(): bool
233261
{
234-
return $this->state === self::STATE_CONNECTED;
262+
return $this->state === self::STATE_CONNECTED && $this->connection->connected();
235263
}
236264

237265
/**
@@ -422,4 +450,13 @@ static function (Protocol\AbstractFrame $frame) use ($deferred) {
422450

423451
return $deferred->promise();
424452
}
453+
454+
private function disableConnectionMonitor(): void {
455+
if($this->connectionMonitorWatcherId !== null) {
456+
457+
Loop::cancel($this->connectionMonitorWatcherId);
458+
459+
$this->connectionMonitorWatcherId = null;
460+
}
461+
}
425462
}

src/Connection.php

Lines changed: 6 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -58,16 +58,15 @@ final class Connection
5858
*/
5959
private $heartbeatWatcherId;
6060

61-
/**
62-
* @var callable|null
63-
*/
64-
private $connectionLost;
65-
66-
public function __construct(string $uri, ?callable $connectionLost = null)
61+
public function __construct(string $uri)
6762
{
6863
$this->uri = $uri;
6964
$this->parser = new Parser;
70-
$this->connectionLost = $connectionLost;
65+
}
66+
67+
public function connected(): bool
68+
{
69+
return $this->socket !== null && $this->socket->isClosed() === false;
7170
}
7271

7372
/**
@@ -195,12 +194,10 @@ function (string $watcherId) use ($interval){
195194
}
196195

197196
if (
198-
null !== $this->connectionLost &&
199197
0 !== $this->lastRead &&
200198
$currentTime > ($this->lastRead + $interval + 1000)
201199
)
202200
{
203-
call_user_func($this->connectionLost);
204201
Loop::cancel($watcherId);
205202
}
206203

@@ -218,10 +215,6 @@ public function close(): void
218215
$this->heartbeatWatcherId = null;
219216
}
220217

221-
if ($this->connectionLost !== null) {
222-
call_user_func($this->connectionLost);
223-
}
224-
225218
if ($this->socket !== null) {
226219
$this->socket->close();
227220
}

src/Exception/ClientException.php

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,10 @@ public static function notConnected(): self
2626
return new self('Client is not connected to server.');
2727
}
2828

29+
public static function disconnected(): self {
30+
return new self('The client was unexpectedly disconnected from the server');
31+
}
32+
2933
public static function alreadyConnected(): self
3034
{
3135
return new self('Client is already connected/connecting.');

tests/AsyncTest.php

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,8 @@ protected function runTestAsync(...$args)
5353

5454
$return = yield call([$this, $this->realTestName], ...$args);
5555

56+
yield $client->disconnect();
57+
5658
$info = Loop::getInfo();
5759
$count = $info['enabled_watchers']['referenced'];
5860

0 commit comments

Comments
 (0)