Skip to content

Commit 097fd01

Browse files
authored
fix: filter CRDT State response (#5823)
* implemented CRDT State Response filtering + test coverage * fix: nullref on communities breaking the chat
1 parent 39f9129 commit 097fd01

File tree

5 files changed

+243
-68
lines changed

5 files changed

+243
-68
lines changed
Lines changed: 72 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,15 @@
11
#nullable enable
22

33
using CRDT.Protocol;
4+
using DCL.Diagnostics;
45
using System;
56
using Utility;
67

78
namespace CRDT
89
{
910
public class CRDTFilter
1011
{
12+
// “asset-packs::VideoControlState” is 2092194694
1113
private static readonly uint NO_SYNC_COMPONENT_ID = 2092194694;
1214

1315
/// <summary>
@@ -17,18 +19,77 @@ public static void FilterSceneMessageBatch(ReadOnlySpan<byte> memory, Span<byte>
1719
{
1820
totalWrite = 0;
1921

20-
// write first - byte, it's for
22+
// write first byte (message type: CRDT = 1)
2123
const int CRDT_STATE_LENGTH = 1;
2224
totalWrite += CRDT_STATE_LENGTH;
2325
memory.Slice(0, CRDT_STATE_LENGTH).CopyTo(output);
2426
memory = memory.Slice(CRDT_STATE_LENGTH);
2527
output = output.Slice(CRDT_STATE_LENGTH);
2628

27-
// While we have a header to read
28-
while (memory.Length > CRDTConstants.MESSAGE_HEADER_LENGTH)
29+
// Filter the CRDT messages
30+
FilterCRDTMessages(memory, output, out int filteredLength);
31+
totalWrite += filteredLength;
32+
}
33+
34+
/// <summary>
35+
/// Filters CRDT state messages (RES_CRDT_STATE).
36+
/// Encoding happens at the SDK runtime: https://github.com/decentraland/js-sdk-toolchain/blob/f122eaa2acaaed80db7ee0302e8d60ca7d2337bf/packages/@dcl/sdk/src/network/message-bus-sync.ts#L177-L208
37+
/// The format is: [message type byte] + [1 byte: address length] + [address bytes] + [raw CRDT messages]
38+
/// Output must be equal or bigger than memory
39+
/// </summary>
40+
public static void FilterCRDTState(ReadOnlySpan<byte> memory, Span<byte> output, out int totalWrite)
41+
{
42+
totalWrite = 0;
43+
44+
// Need at least 3 -> 1 byte type + 1 byte address length + some data
45+
// Schema at https://github.com/decentraland/js-sdk-toolchain/blob/f122eaa2acaaed80db7ee0302e8d60ca7d2337bf/packages/@dcl/sdk/src/network/message-bus-sync.ts#L197-L208
46+
if (memory.Length < 3)
47+
return;
48+
49+
const int MIN_OFFSET = 2;
50+
51+
// Write the first byte (message type: RES_CRDT_STATE = 3)
52+
output[0] = memory[0];
53+
totalWrite = 1;
54+
55+
// Read the address length (1 byte)
56+
byte addressLength = memory[1];
57+
58+
ReportHub.Log(ReportCategory.CRDT, $"FilterCRDTState - Message type: {memory[0]}, Address length: {addressLength}, Total length: {memory.Length}");
59+
60+
if (memory.Length < MIN_OFFSET + addressLength)
61+
return; // Not enough data
62+
63+
// Copy the address length and address bytes as-is
64+
output[1] = addressLength;
65+
memory.Slice(MIN_OFFSET, addressLength).CopyTo(output.Slice(2));
66+
totalWrite += 1 + addressLength;
67+
68+
// The CRDT messages start after: type byte (1) + address length (1) + address bytes
69+
int crdtStartOffset = MIN_OFFSET + addressLength;
70+
ReadOnlySpan<byte> crdtMessages = memory.Slice(crdtStartOffset);
71+
72+
// Filter the CRDT messages into the output after the address
73+
int outputCrdtOffset = MIN_OFFSET + addressLength;
74+
FilterCRDTMessages(crdtMessages, output.Slice(outputCrdtOffset), out int filteredLength);
75+
totalWrite += filteredLength; // Already counted: type byte + address length + address bytes, now add filtered data
76+
}
77+
78+
private static uint ComponentIdOfPutNetworkComponentType(ReadOnlySpan<byte> memory) =>
79+
memory.Slice(4).ReadConst<uint>(); // offset entityId
80+
81+
/// <summary>
82+
/// Filters CRDT messages from a span, removing PUT_COMPONENT_NETWORK messages with NO_SYNC_COMPONENT_ID.
83+
/// This is the core filtering logic shared by both FilterSceneMessageBatch and FilterCRDTState.
84+
/// </summary>
85+
private static void FilterCRDTMessages(ReadOnlySpan<byte> crdtMessages, Span<byte> output, out int totalWrite)
86+
{
87+
totalWrite = 0;
88+
89+
while (crdtMessages.Length > CRDTConstants.MESSAGE_HEADER_LENGTH)
2990
{
30-
uint messageLength = memory.Read<uint>();
31-
CRDTMessageType messageType = memory.ReadEnumAs<CRDTMessageType, uint>();
91+
uint messageLength = crdtMessages.Read<uint>();
92+
CRDTMessageType messageType = crdtMessages.ReadEnumAs<CRDTMessageType, uint>();
3293

3394
// Message length lower than minimal, it's an invalid message
3495
if (messageLength <= CRDTConstants.MESSAGE_HEADER_LENGTH)
@@ -37,27 +98,25 @@ public static void FilterSceneMessageBatch(ReadOnlySpan<byte> memory, Span<byte>
3798
// Do we have the bytes computed in the header?
3899
uint remainingBytesToRead = messageLength - CRDTConstants.MESSAGE_HEADER_LENGTH;
39100

40-
if (remainingBytesToRead > memory.Length)
101+
if (remainingBytesToRead > crdtMessages.Length)
41102
break;
42103

43-
uint bodyLength = CRDTMessageTypeUtils.TypeLengthBytes(messageType, memory);
104+
uint bodyLength = CRDTMessageTypeUtils.TypeLengthBytes(messageType, crdtMessages);
44105

45-
if (messageType is not CRDTMessageType.PUT_COMPONENT_NETWORK || ComponentIdOfPutNetworkComponentType(memory) != NO_SYNC_COMPONENT_ID)
106+
// Filter out PUT_COMPONENT_NETWORK messages with NO_SYNC_COMPONENT_ID
107+
if (messageType is not CRDTMessageType.PUT_COMPONENT_NETWORK || ComponentIdOfPutNetworkComponentType(crdtMessages) != NO_SYNC_COMPONENT_ID)
46108
{
47109
output.Write(messageLength);
48110
output.Write((uint)messageType);
49111
totalWrite += 8;
50112

51-
memory.Slice(0, (int)bodyLength).CopyTo(output);
113+
crdtMessages.Slice(0, (int)bodyLength).CopyTo(output);
52114
output = output.Slice((int)bodyLength);
53115
totalWrite += (int)bodyLength;
54116
}
55117

56-
memory = memory.Slice((int)bodyLength);
118+
crdtMessages = crdtMessages.Slice((int)bodyLength);
57119
}
58120
}
59-
60-
private static uint ComponentIdOfPutNetworkComponentType(ReadOnlySpan<byte> memory) =>
61-
memory.Slice(4).ReadConst<uint>(); // offset entityId
62121
}
63122
}

Explorer/Assets/DCL/Infrastructure/CrdtEcsBridge/JsModulesImplementation/Communications/CommunicationsControllerAPIImplementationBase.cs

Lines changed: 33 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -61,11 +61,30 @@ public void SendBinary(IReadOnlyList<PoolableByteArray> broadcastData, string? r
6161
if (poolable.Length > 0)
6262
{
6363
byte firstByte = poolable.Span[0];
64+
6465
ISceneCommunicationPipe.ConnectivityAssertiveness assertiveness = firstByte == (int)CommsMessageType.REQ_CRDT_STATE
6566
? ISceneCommunicationPipe.ConnectivityAssertiveness.DELIVERY_ASSERTED
6667
: ISceneCommunicationPipe.ConnectivityAssertiveness.DROP_IF_NOT_CONNECTED;
6768

68-
EncodeAndSendMessage(ISceneCommunicationPipe.MsgType.Uint8Array, poolable.Memory.Span, assertiveness, recipient, useFilter: firstByte == (int)CommsMessageType.CRDT);
69+
// Filter CRDT messages before sending
70+
if (firstByte == (int)CommsMessageType.CRDT)
71+
{
72+
Span<byte> filtered = stackalloc byte[poolable.Memory.Span.Length];
73+
int filteredLength = FilterCRDTMessage(poolable.Memory.Span, filtered);
74+
EncodeAndSendMessage(ISceneCommunicationPipe.MsgType.Uint8Array, filtered.Slice(0, filteredLength), assertiveness, recipient);
75+
continue;
76+
}
77+
78+
// Filter RES_CRDT_STATE messages before sending
79+
if (firstByte == (int)CommsMessageType.RES_CRDT_STATE)
80+
{
81+
Span<byte> filtered = stackalloc byte[poolable.Memory.Span.Length];
82+
int filteredLength = FilterCRDTStateMessage(poolable.Memory.Span, filtered);
83+
EncodeAndSendMessage(ISceneCommunicationPipe.MsgType.Uint8Array, filtered.Slice(0, filteredLength), assertiveness, recipient);
84+
continue;
85+
}
86+
87+
EncodeAndSendMessage(ISceneCommunicationPipe.MsgType.Uint8Array, poolable.Memory.Span, assertiveness, recipient);
6988
}
7089
}
7190

@@ -83,23 +102,23 @@ public ScriptObject GetResult()
83102
}
84103
}
85104

86-
protected void EncodeAndSendMessage(ISceneCommunicationPipe.MsgType msgType, ReadOnlySpan<byte> message, ISceneCommunicationPipe.ConnectivityAssertiveness assertivenes, string? specialRecipient, bool useFilter)
105+
private static int FilterCRDTMessage(ReadOnlySpan<byte> message, Span<byte> output)
87106
{
88-
Span<byte> filtered = stackalloc byte[message.Length];
107+
CRDTFilter.FilterSceneMessageBatch(message, output, out int totalWrite);
108+
return totalWrite;
109+
}
89110

90-
if (useFilter)
91-
{
92-
CRDTFilter.FilterSceneMessageBatch(message, filtered, out int totalWrite);
93-
filtered = filtered.Slice(0, totalWrite);
94-
}
95-
else
96-
{
97-
message.CopyTo(filtered);
98-
}
111+
private static int FilterCRDTStateMessage(ReadOnlySpan<byte> message, Span<byte> output)
112+
{
113+
CRDTFilter.FilterCRDTState(message, output, out int totalWrite);
114+
return totalWrite;
115+
}
99116

100-
Span<byte> encodedMessage = stackalloc byte[filtered.Length + 1];
117+
protected void EncodeAndSendMessage(ISceneCommunicationPipe.MsgType msgType, ReadOnlySpan<byte> message, ISceneCommunicationPipe.ConnectivityAssertiveness assertivenes, string? specialRecipient)
118+
{
119+
Span<byte> encodedMessage = stackalloc byte[message.Length + 1];
101120
encodedMessage[0] = (byte)msgType;
102-
filtered.CopyTo(encodedMessage[1..]);
121+
message.CopyTo(encodedMessage[1..]);
103122

104123
sceneCommunicationPipe.SendMessage(encodedMessage, sceneId, assertivenes, cancellationTokenSource.Token, specialRecipient);
105124
}

Explorer/Assets/DCL/Infrastructure/CrdtEcsBridge/JsModulesImplementation/Communications/SDKMessageBus/SDKMessageBusCommsAPIImplementation.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ public void ClearMessages()
2525
public void Send(string data)
2626
{
2727
byte[] dataBytes = Encoding.UTF8.GetBytes(data);
28-
EncodeAndSendMessage(ISceneCommunicationPipe.MsgType.String, dataBytes, ISceneCommunicationPipe.ConnectivityAssertiveness.DROP_IF_NOT_CONNECTED, null, false);
28+
EncodeAndSendMessage(ISceneCommunicationPipe.MsgType.String, dataBytes, ISceneCommunicationPipe.ConnectivityAssertiveness.DROP_IF_NOT_CONNECTED, null);
2929
}
3030

3131
protected override void OnMessageReceived(ISceneCommunicationPipe.DecodedMessage message)

0 commit comments

Comments
 (0)