Skip to content

Commit 9f790be

Browse files
authored
Data channel reliability (#871)
1 parent 8a1900b commit 9f790be

File tree

12 files changed

+1987
-26
lines changed

12 files changed

+1987
-26
lines changed

.changes/dc-reliability

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
patch type="changed" "Improve reliable data channel buffering, sequencing, and add integration tests"

lib/src/core/engine.dart

Lines changed: 159 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,8 @@ import '../track/local/local.dart';
4444
import '../track/local/video.dart';
4545
import '../types/internal.dart';
4646
import '../types/other.dart';
47+
import '../utils/data_packet_buffer.dart';
48+
import '../utils/ttl_map.dart';
4749
import 'signal_client.dart';
4850
import 'transport.dart';
4951

@@ -154,6 +156,24 @@ class Engine extends Disposable with EventsEmittable<EngineEvent> {
154156
_e2eeManager = e2eeManager;
155157
}
156158

159+
// E2E reliability for data channels
160+
int _reliableDataSequence = 1;
161+
final DataPacketBuffer _reliableMessageBuffer = DataPacketBuffer(
162+
maxBufferSize: 64 * 1024 * 1024, // 64MB
163+
maxPacketCount: 1000, // max 1000 packets
164+
);
165+
final TTLMap<String, int> _reliableReceivedState = TTLMap<String, int>(30000);
166+
bool _isReconnecting = false;
167+
168+
String? _reliableParticipantKey(lk_models.DataPacket packet) {
169+
if (packet.hasParticipantSid() && packet.participantSid.isNotEmpty) {
170+
return packet.participantSid;
171+
}
172+
logger.fine(
173+
'Reliable packet missing participant SID (identity: ${packet.participantIdentity}), skipping dedupe handling');
174+
return null;
175+
}
176+
157177
void clearReconnectTimeout() {
158178
if (reconnectTimeout != null) {
159179
reconnectTimeout?.cancel();
@@ -188,6 +208,7 @@ class Engine extends Disposable with EventsEmittable<EngineEvent> {
188208
await cleanUp();
189209
await events.dispose();
190210
await _signalListener.dispose();
211+
_reliableReceivedState.dispose();
191212
});
192213
}
193214

@@ -265,6 +286,12 @@ class Engine extends Disposable with EventsEmittable<EngineEvent> {
265286
fullReconnectOnNext = false;
266287
attemptingReconnect = false;
267288

289+
// Reset reliability state
290+
_reliableDataSequence = 1;
291+
_reliableMessageBuffer.clear();
292+
_reliableReceivedState.clear();
293+
_isReconnecting = false;
294+
268295
clearPendingReconnect();
269296
}
270297

@@ -333,34 +360,67 @@ class Engine extends Disposable with EventsEmittable<EngineEvent> {
333360
return completer.future;
334361
}
335362

363+
Future<void> _resendReliableMessagesForResume(int lastMessageSeq) async {
364+
logger.fine('Resending reliable messages from sequence $lastMessageSeq');
365+
366+
final channel = _publisherDataChannel(Reliability.reliable);
367+
if (channel == null) {
368+
logger.warning('Reliable data channel is null, cannot resend messages');
369+
return;
370+
}
371+
372+
// Remove acknowledged messages from buffer
373+
_reliableMessageBuffer.popToSequence(lastMessageSeq);
374+
375+
// Get remaining messages to resend
376+
final messagesToResend = _reliableMessageBuffer.getAll();
377+
378+
if (messagesToResend.isEmpty) {
379+
logger.fine('No reliable messages to resend');
380+
return;
381+
}
382+
383+
logger.fine('Resending ${messagesToResend.length} reliable messages');
384+
385+
for (final item in messagesToResend) {
386+
try {
387+
await channel.send(item.message);
388+
logger.fine('Resent reliable message with sequence ${item.sequence}');
389+
} catch (e) {
390+
logger.warning('Failed to resend reliable message ${item.sequence}: $e');
391+
}
392+
}
393+
}
394+
336395
@internal
337396
Future<void> sendDataPacket(
338397
lk_models.DataPacket packet, {
339-
bool? reliability = true,
398+
Reliability reliability = Reliability.lossy,
340399
}) async {
400+
// Add sequence number for reliable packets
401+
if (reliability == Reliability.reliable) {
402+
packet.sequence = _reliableDataSequence++;
403+
}
404+
341405
// construct the data channel message
342406
var message = rtc.RTCDataChannelMessage.fromBinary(packet.writeToBuffer());
343407

344-
final reliabilityType = reliability == true ? Reliability.reliable : Reliability.lossy;
345-
346408
if (_subscriberPrimary) {
347409
// make sure publisher transport is connected
348-
349410
await _publisherEnsureConnected();
350411

351412
// wait for data channel to open (if not already)
352-
if (_publisherDataChannelState(reliabilityType) != rtc.RTCDataChannelState.RTCDataChannelOpen) {
353-
logger.fine('Waiting for data channel ${reliabilityType} to open...');
413+
if (_publisherDataChannelState(reliability) != rtc.RTCDataChannelState.RTCDataChannelOpen) {
414+
logger.fine('Waiting for data channel ${reliability} to open...');
354415
await events.waitFor<PublisherDataChannelStateUpdatedEvent>(
355-
filter: (event) => event.type == reliabilityType,
416+
filter: (event) => event.type == reliability,
356417
duration: connectOptions.timeouts.connection,
357418
);
358419
}
359420
}
360421

361422
// chose data channel
362-
final rtc.RTCDataChannel? channel =
363-
_publisherDataChannel(reliability == true ? Reliability.reliable : Reliability.lossy);
423+
final rtc.RTCDataChannel? channel = _publisherDataChannel(reliability);
364424

365425
if (channel == null) {
366426
throw UnexpectedStateException('Data channel for ${packet.kind.toSDKType()} is null');
@@ -388,16 +448,38 @@ class Engine extends Disposable with EventsEmittable<EngineEvent> {
388448
kind: packet.kind,
389449
encryptedPacket: encryptedPacket,
390450
destinationIdentities: packet.destinationIdentities,
451+
sequence: packet.hasSequence() ? packet.sequence : null,
452+
participantSid: packet.hasParticipantSid() ? packet.participantSid : null,
391453
);
392454

393455
message = rtc.RTCDataChannelMessage.fromBinary(dataToSend.writeToBuffer());
394456
}
395457
}
396458

397-
logger.fine('sendDataPacket(label:${channel.label})');
459+
// Buffer reliable packets for potential resending
460+
if (reliability == Reliability.reliable) {
461+
_reliableMessageBuffer.push(BufferedDataPacket(
462+
packet: packet,
463+
message: message,
464+
sequence: packet.sequence,
465+
));
466+
}
467+
468+
// Don't send during reconnection, but keep message buffered for resending
469+
if (_isReconnecting) {
470+
logger.fine('Deferring data packet send during reconnection (will resend when resumed)');
471+
return;
472+
}
473+
474+
logger.fine('sendDataPacket(label:${channel.label}, sequence:${packet.sequence})');
398475
await channel.send(message);
399476

400-
_dcBufferStatus[reliabilityType] = await channel.getBufferedAmount() <= channel.bufferedAmountLowThreshold!;
477+
_dcBufferStatus[reliability] = await channel.getBufferedAmount() <= channel.bufferedAmountLowThreshold!;
478+
479+
// Align buffer with WebRTC buffer for reliable packets
480+
if (reliability == Reliability.reliable) {
481+
_reliableMessageBuffer.alignBufferedAmount(await channel.getBufferedAmount());
482+
}
401483
}
402484

403485
Future<void> _publisherEnsureConnected() async {
@@ -629,7 +711,7 @@ class Engine extends Disposable with EventsEmittable<EngineEvent> {
629711
type: Reliability.lossy,
630712
)));
631713
// _onDCStateUpdated(Reliability.lossy, state)
632-
_lossyDCPub?.bufferedAmountLowThreshold = 65535;
714+
_lossyDCPub?.bufferedAmountLowThreshold = 2 * 1024 * 1024;
633715
_lossyDCPub?.onBufferedAmountLow = (_) {
634716
_dcBufferStatus[Reliability.lossy] = (_lossyDCPub!.bufferedAmount! <= _lossyDCPub!.bufferedAmountLowThreshold!);
635717
};
@@ -648,7 +730,7 @@ class Engine extends Disposable with EventsEmittable<EngineEvent> {
648730
state: state,
649731
type: Reliability.reliable,
650732
)));
651-
_reliableDCPub?.bufferedAmountLowThreshold = 65535;
733+
_reliableDCPub?.bufferedAmountLowThreshold = 2 * 1024 * 1024;
652734
_reliableDCPub?.onBufferedAmountLow = (_) {
653735
_dcBufferStatus[Reliability.reliable] =
654736
(_reliableDCPub!.bufferedAmount! <= _reliableDCPub!.bufferedAmountLowThreshold!);
@@ -722,12 +804,51 @@ class Engine extends Disposable with EventsEmittable<EngineEvent> {
722804
logger.warning('Failed to decrypt data packet');
723805
return;
724806
}
725-
726807
final decryptedPacketPayload = lk_models.EncryptedPacketPayload.fromBuffer(decryptedData);
727808
final newDp = asDataPacket(decryptedPacketPayload);
728809

810+
// Handle sequence numbers for reliable packets (encrypted outer packet)
811+
if (dp.kind == lk_models.DataPacket_Kind.RELIABLE && dp.hasSequence()) {
812+
final participantKey = _reliableParticipantKey(dp);
813+
if (participantKey != null) {
814+
final sequence = dp.sequence;
815+
final lastReceived = _reliableReceivedState.get(participantKey) ?? 0;
816+
if (sequence <= lastReceived) {
817+
logger.fine('Ignoring duplicate or out-of-order packet: '
818+
'sequence=$sequence, lastReceived=$lastReceived, participantSid=$participantKey');
819+
return;
820+
}
821+
_reliableReceivedState.set(participantKey, sequence);
822+
}
823+
}
824+
825+
// Preserve metadata from the outer packet on the decrypted packet
826+
newDp
827+
..kind = dp.kind
828+
..sequence = dp.sequence
829+
..participantIdentity = dp.participantIdentity
830+
..participantSid = dp.participantSid
831+
..destinationIdentities.addAll(dp.destinationIdentities);
832+
729833
_emitDataPacket(newDp, encryptionType: dp.encryptedPacket.encryptionType.toLkType());
730834
} else {
835+
// Handle sequence numbers for reliable packets (plaintext)
836+
if (dp.kind == lk_models.DataPacket_Kind.RELIABLE && dp.hasSequence()) {
837+
final participantKey = _reliableParticipantKey(dp);
838+
if (participantKey != null) {
839+
final sequence = dp.sequence;
840+
final lastReceived = _reliableReceivedState.get(participantKey) ?? 0;
841+
if (sequence <= lastReceived) {
842+
logger.fine('Ignoring duplicate or out-of-order packet: '
843+
'sequence=$sequence, lastReceived=$lastReceived, participantSid=$participantKey');
844+
return;
845+
}
846+
_reliableReceivedState.set(participantKey, sequence);
847+
} else {
848+
logger.fine('Reliable packet without participant SID, skipping dedupe');
849+
}
850+
}
851+
731852
_emitDataPacket(dp);
732853
}
733854
}
@@ -815,6 +936,8 @@ class Engine extends Disposable with EventsEmittable<EngineEvent> {
815936

816937
logger.info('onDisconnected state:${connectionState} reason:${reason.name}');
817938

939+
_isReconnecting = true;
940+
818941
if (reconnectAttempts == 0) {
819942
reconnectStart = DateTime.timestamp();
820943
}
@@ -891,6 +1014,7 @@ class Engine extends Disposable with EventsEmittable<EngineEvent> {
8911014
}
8921015
clearPendingReconnect();
8931016
attemptingReconnect = false;
1017+
_isReconnecting = false;
8941018
} catch (e) {
8951019
reconnectAttempts = reconnectAttempts! + 1;
8961020
bool recoverable = true;
@@ -964,6 +1088,7 @@ class Engine extends Disposable with EventsEmittable<EngineEvent> {
9641088
logger.fine('resumeConnection: primary connected');
9651089
}
9661090

1091+
_isReconnecting = false;
9671092
events.emit(const EngineResumedEvent());
9681093
}
9691094

@@ -1030,12 +1155,25 @@ class Engine extends Disposable with EventsEmittable<EngineEvent> {
10301155
required List<String> trackSidsDisabled,
10311156
}) async {
10321157
final previousAnswer = (await subscriber?.pc.getLocalDescription())?.toPBType();
1158+
1159+
// Build data channel receive states for reliability
1160+
final dataChannelReceiveStates = <lk_rtc.DataChannelReceiveState>[];
1161+
for (final participantId in List.of(_reliableReceivedState.keys)) {
1162+
final lastSequence = _reliableReceivedState.get(participantId);
1163+
if (lastSequence != null) {
1164+
final receiveState = lk_rtc.DataChannelReceiveState();
1165+
receiveState.publisherSid = participantId;
1166+
receiveState.lastSeq = lastSequence;
1167+
dataChannelReceiveStates.add(receiveState);
1168+
}
1169+
}
10331170
signalClient.sendSyncState(
10341171
answer: previousAnswer,
10351172
subscription: subscription,
10361173
publishTracks: publishTracks,
10371174
dataChannelInfo: dataChannelInfo(),
10381175
trackSidsDisabled: trackSidsDisabled,
1176+
dataChannelReceiveStates: dataChannelReceiveStates,
10391177
);
10401178
}
10411179

@@ -1092,7 +1230,8 @@ class Engine extends Disposable with EventsEmittable<EngineEvent> {
10921230

10931231
logger.fine('Handle ReconnectResponse: '
10941232
'iceServers: ${event.response.iceServers}, '
1095-
'forceRelay: $event.response.clientConfiguration.forceRelay');
1233+
'forceRelay: $event.response.clientConfiguration.forceRelay, '
1234+
'lastMessageSeq: ${event.response.lastMessageSeq}');
10961235

10971236
final rtcConfiguration = await _buildRtcConfiguration(
10981237
serverResponseForceRelay: event.response.clientConfiguration.forceRelay,
@@ -1105,6 +1244,11 @@ class Engine extends Disposable with EventsEmittable<EngineEvent> {
11051244
await negotiate();
11061245
}
11071246

1247+
// Handle reliable message resending
1248+
if (event.response.hasLastMessageSeq()) {
1249+
await _resendReliableMessagesForResume(event.response.lastMessageSeq);
1250+
}
1251+
11081252
events.emit(const SignalReconnectedEvent());
11091253
})
11101254
..on<SignalConnectedEvent>((event) async {

lib/src/core/signal_client.dart

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -466,6 +466,7 @@ extension SignalClientRequests on SignalClient {
466466
required Iterable<lk_rtc.TrackPublishedResponse>? publishTracks,
467467
required Iterable<lk_rtc.DataChannelInfo>? dataChannelInfo,
468468
required List<String> trackSidsDisabled,
469+
List<lk_rtc.DataChannelReceiveState>? dataChannelReceiveStates,
469470
}) =>
470471
_sendRequest(lk_rtc.SignalRequest(
471472
syncState: lk_rtc.SyncState(
@@ -474,6 +475,7 @@ extension SignalClientRequests on SignalClient {
474475
publishTracks: publishTracks,
475476
dataChannels: dataChannelInfo,
476477
trackSidsDisabled: trackSidsDisabled,
478+
datachannelReceiveStates: dataChannelReceiveStates,
477479
),
478480
));
479481

lib/src/data_stream/stream_writer.dart

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,10 +57,11 @@ class WritableStream<T> implements StreamWriter<T> {
5757
streamId: streamId,
5858
);
5959
final trailerPacket = lk_models.DataPacket(
60+
kind: lk_models.DataPacket_Kind.RELIABLE,
6061
destinationIdentities: destinationIdentities,
6162
streamTrailer: trailer,
6263
);
63-
await engine.sendDataPacket(trailerPacket, reliability: true);
64+
await engine.sendDataPacket(trailerPacket, reliability: Reliability.reliable);
6465
}
6566

6667
@override
@@ -73,10 +74,11 @@ class WritableStream<T> implements StreamWriter<T> {
7374
chunkIndex: Int64(chunkId),
7475
);
7576
final chunkPacket = lk_models.DataPacket(
77+
kind: lk_models.DataPacket_Kind.RELIABLE,
7678
destinationIdentities: destinationIdentities,
7779
streamChunk: chunk,
7880
);
79-
await engine.sendDataPacket(chunkPacket, reliability: true);
81+
await engine.sendDataPacket(chunkPacket, reliability: Reliability.reliable);
8082
chunkId += 1;
8183
}
8284
}

0 commit comments

Comments
 (0)