Skip to content

Commit ad106c0

Browse files
authored
Fix missing properties for TextStreamInfo (#881)
Summary of changes: 1. Added new properties to TextStreamInfo: replyToStreamId, attachedStreamIds, version, generated, operationType 2. Enhanced incoming stream handling: Properly extracts all metadata from protocol headers 3. Enhanced outgoing stream handling: Correctly populates TextStreamInfo with all options 4. Added comprehensive test coverage: Tests verify the reply functionality works correctly 5. No breaking changes: All new properties are optional/defaulted Fixes: #818
1 parent 78e3f2e commit ad106c0

File tree

4 files changed

+51
-4
lines changed

4 files changed

+51
-4
lines changed

lib/src/core/room.dart

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1329,6 +1329,19 @@ extension DataStreamRoomMethods on Room {
13291329
topic: streamHeader.topic,
13301330
timestamp: streamHeader.timestamp.toInt(),
13311331
attributes: streamHeader.attributes,
1332+
replyToStreamId: streamHeader.textHeader.hasReplyToStreamId()
1333+
? streamHeader.textHeader.replyToStreamId
1334+
: null,
1335+
attachedStreamIds: streamHeader.textHeader.attachedStreamIds.toList(),
1336+
version: streamHeader.textHeader.hasVersion()
1337+
? streamHeader.textHeader.version
1338+
: null,
1339+
generated: streamHeader.textHeader.hasGenerated()
1340+
? streamHeader.textHeader.generated
1341+
: false,
1342+
operationType: streamHeader.textHeader.hasOperationType()
1343+
? streamHeader.textHeader.operationType.name
1344+
: null,
13321345
);
13331346

13341347
final streamController = DataStreamController<lk_models.DataStream_Chunk>(

lib/src/participant/local.dart

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1232,20 +1232,26 @@ extension DataStreamParticipantMethods on LocalParticipant {
12321232

12331233
Future<TextStreamWriter> streamText(StreamTextOptions? options) async {
12341234
final streamId = options?.streamId ?? Uuid().v4();
1235+
final timestamp = DateTime.timestamp().millisecondsSinceEpoch;
12351236

12361237
final info = TextStreamInfo(
12371238
id: streamId,
12381239
mimeType: 'text/plain',
1239-
timestamp: DateTime.timestamp().millisecondsSinceEpoch,
1240+
timestamp: timestamp,
12401241
topic: options?.topic ?? '',
12411242
size: options?.totalSize ?? 0,
1243+
replyToStreamId: options?.replyToStreamId,
1244+
attachedStreamIds: options?.attachedStreamIds ?? [],
1245+
version: options?.version,
1246+
generated: options?.generated ?? false,
1247+
operationType: options?.type,
12421248
);
12431249

12441250
final header = lk_models.DataStream_Header(
12451251
streamId: streamId,
12461252
mimeType: info.mimeType,
12471253
topic: info.topic,
1248-
timestamp: Int64(info.timestamp),
1254+
timestamp: Int64(timestamp),
12491255
totalLength: Int64(options?.totalSize ?? 0),
12501256
attributes: options?.attributes.entries,
12511257
textHeader: lk_models.DataStream_TextHeader(
@@ -1256,11 +1262,13 @@ extension DataStreamParticipantMethods on LocalParticipant {
12561262
operationType: _stringToOperationType(options?.type),
12571263
),
12581264
);
1265+
12591266
final destinationIdentities = options?.destinationIdentities;
12601267
final packet = lk_models.DataPacket(
12611268
destinationIdentities: destinationIdentities,
12621269
streamHeader: header,
12631270
);
1271+
12641272
await room.engine.sendDataPacket(packet, reliability: true);
12651273

12661274
final writableStream = WritableStream<String>(
@@ -1328,12 +1336,13 @@ extension DataStreamParticipantMethods on LocalParticipant {
13281336

13291337
Future<ByteStreamWriter> streamBytes(StreamBytesOptions? options) async {
13301338
final streamId = options?.streamId ?? Uuid().v4();
1339+
final timestamp = DateTime.timestamp().millisecondsSinceEpoch;
13311340

13321341
final info = ByteStreamInfo(
13331342
name: options?.name ?? 'unknown',
13341343
id: streamId,
13351344
mimeType: options?.mimeType ?? 'application/octet-stream',
1336-
timestamp: DateTime.timestamp().millisecondsSinceEpoch,
1345+
timestamp: timestamp,
13371346
topic: options?.topic ?? '',
13381347
size: options?.totalSize ?? 0,
13391348
attributes: options?.attributes ?? {},
@@ -1345,7 +1354,7 @@ extension DataStreamParticipantMethods on LocalParticipant {
13451354
streamId: streamId,
13461355
topic: options?.topic,
13471356
encryptionType: options?.encryptionType,
1348-
timestamp: Int64(info.timestamp),
1357+
timestamp: Int64(timestamp),
13491358
byteHeader: lk_models.DataStream_ByteHeader(
13501359
name: info.name,
13511360
),

lib/src/types/data_stream.dart

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,13 +166,33 @@ class ByteStreamInfo extends BaseStreamInfo {
166166
}
167167

168168
class TextStreamInfo extends BaseStreamInfo {
169+
/// The stream ID this message is replying to, if any
170+
final String? replyToStreamId;
171+
172+
/// List of stream IDs that are attached to this stream
173+
final List<String> attachedStreamIds;
174+
175+
/// Version of the stream
176+
final int? version;
177+
178+
/// Whether this text was generated by an agent
179+
final bool generated;
180+
181+
/// Operation type for the stream
182+
final String? operationType;
183+
169184
TextStreamInfo({
170185
required String id,
171186
required String mimeType,
172187
required String topic,
173188
required int timestamp,
174189
required int size,
175190
Map<String, String> attributes = const {},
191+
this.replyToStreamId,
192+
this.attachedStreamIds = const [],
193+
this.version,
194+
this.generated = false,
195+
this.operationType,
176196
}) : super(
177197
id: id,
178198
mimeType: mimeType,

test/core/data_stream_test.dart

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -219,6 +219,11 @@ void main() {
219219
final text = await reader.readAll();
220220
print('received reply message: ${text}');
221221
expect(text, 'This is a reply to the original message');
222+
223+
// Verify that reply metadata is accessible
224+
expect(reader.info?.replyToStreamId, originalStreamId);
225+
expect(reader.info?.version, 1);
226+
expect(reader.info?.operationType, 'CREATE');
222227
});
223228

224229
// Send a reply to an existing stream

0 commit comments

Comments
 (0)