@@ -49,7 +49,12 @@ final class Connection
49
49
private $ lastWrite = 0 ;
50
50
51
51
/**
52
- * @var string|null
52
+ * @var int
53
+ */
54
+ private $ lastRead = 0 ;
55
+
56
+ /**
57
+ * @var string
53
58
*/
54
59
private $ heartbeatWatcherId ;
55
60
@@ -126,6 +131,7 @@ function () use ($timeout, $maxAttempts, $noDelay) {
126
131
}
127
132
128
133
$ this ->socket = yield connect ($ this ->uri , $ context );
134
+ $ this ->lastRead = Loop::now ();
129
135
130
136
asyncCall (
131
137
function () {
@@ -138,6 +144,7 @@ function () {
138
144
139
145
while ($ frame = $ this ->parser ->parse ()) {
140
146
$ class = \get_class ($ frame );
147
+ $ this ->lastRead = Loop::now ();
141
148
142
149
/**
143
150
* @psalm-var callable(AbstractFrame):Promise<bool> $callback
@@ -157,18 +164,14 @@ function () {
157
164
);
158
165
}
159
166
160
- public function heartbeat (int $ interval ): void
167
+ public function heartbeat (int $ interval, ? callable $ connectionLost = null ): void
161
168
{
162
169
$ this ->heartbeatWatcherId = Loop::repeat (
163
170
$ interval ,
164
- function (string $ watcherId ) use ($ interval ) {
165
- if ($ this ->socket === null ) {
166
- Loop::cancel ($ watcherId );
167
-
168
- return ;
169
- }
171
+ function ($ watcherId ) use ($ interval , $ connectionLost ){
172
+ $ currentTime = Loop::now ();
170
173
171
- $ currentTime = Loop:: now ();
174
+ if ( null !== $ this -> socket ) {
172
175
$ lastWrite = $ this ->lastWrite ?: $ currentTime ;
173
176
174
177
$ nextHeartbeat = $ lastWrite + $ interval ;
@@ -182,9 +185,18 @@ function (string $watcherId) use ($interval) {
182
185
);
183
186
}
184
187
185
- unset($ currentTime , $ lastWrite , $ nextHeartbeat );
188
+ unset($ lastWrite , $ nextHeartbeat );
186
189
}
187
- );
190
+
191
+ if (null !== $ connectionLost && 0 !== $ this ->lastRead ) {
192
+ if ($ currentTime > ($ this ->lastRead + $ interval + 1000 )) {
193
+ $ connectionLost ();
194
+ Loop::cancel ($ watcherId );
195
+ }
196
+ }
197
+
198
+ unset($ currentTime );
199
+ });
188
200
}
189
201
190
202
public function close (): void
0 commit comments