Skip to content

Commit 5a789d8

Browse files
committed
CSHARP-5619: Replace IConnection.SendMessages with the method to send a single message
1 parent 0cbd39b commit 5a789d8

21 files changed

+155
-437
lines changed

src/MongoDB.Driver/Core/ConnectionPools/ExclusiveConnectionPool.Helpers.cs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -461,11 +461,11 @@ public async Task<ResponseMessage> ReceiveMessageAsync(int responseTo, IMessageE
461461
}
462462
}
463463

464-
public void SendMessages(IEnumerable<RequestMessage> messages, MessageEncoderSettings messageEncoderSettings, CancellationToken cancellationToken)
464+
public void SendMessage(RequestMessage message, MessageEncoderSettings messageEncoderSettings, CancellationToken cancellationToken)
465465
{
466466
try
467467
{
468-
_connection.SendMessages(messages, messageEncoderSettings, cancellationToken);
468+
_connection.SendMessage(message, messageEncoderSettings, cancellationToken);
469469
}
470470
catch (MongoConnectionException ex)
471471
{
@@ -474,11 +474,11 @@ public void SendMessages(IEnumerable<RequestMessage> messages, MessageEncoderSet
474474
}
475475
}
476476

477-
public async Task SendMessagesAsync(IEnumerable<RequestMessage> messages, MessageEncoderSettings messageEncoderSettings, CancellationToken cancellationToken)
477+
public async Task SendMessageAsync(RequestMessage message, MessageEncoderSettings messageEncoderSettings, CancellationToken cancellationToken)
478478
{
479479
try
480480
{
481-
await _connection.SendMessagesAsync(messages, messageEncoderSettings, cancellationToken).ConfigureAwait(false);
481+
await _connection.SendMessageAsync(message, messageEncoderSettings, cancellationToken).ConfigureAwait(false);
482482
}
483483
catch (MongoConnectionException ex)
484484
{
@@ -623,16 +623,16 @@ public ResponseMessage ReceiveMessage(int responseTo, IMessageEncoderSelector en
623623
return _reference.Instance.ReceiveMessage(responseTo, encoderSelector, messageEncoderSettings, cancellationToken);
624624
}
625625

626-
public void SendMessages(IEnumerable<RequestMessage> messages, MessageEncoderSettings messageEncoderSettings, CancellationToken cancellationToken)
626+
public void SendMessage(RequestMessage message, MessageEncoderSettings messageEncoderSettings, CancellationToken cancellationToken)
627627
{
628628
ThrowIfDisposed();
629-
_reference.Instance.SendMessages(messages, messageEncoderSettings, cancellationToken);
629+
_reference.Instance.SendMessage(message, messageEncoderSettings, cancellationToken);
630630
}
631631

632-
public Task SendMessagesAsync(IEnumerable<RequestMessage> messages, MessageEncoderSettings messageEncoderSettings, CancellationToken cancellationToken)
632+
public Task SendMessageAsync(RequestMessage message, MessageEncoderSettings messageEncoderSettings, CancellationToken cancellationToken)
633633
{
634634
ThrowIfDisposed();
635-
return _reference.Instance.SendMessagesAsync(messages, messageEncoderSettings, cancellationToken);
635+
return _reference.Instance.SendMessageAsync(message, messageEncoderSettings, cancellationToken);
636636
}
637637

638638
public void SetCheckOutReasonIfNotAlreadySet(CheckOutReason reason)

src/MongoDB.Driver/Core/Connections/BinaryConnection.cs

Lines changed: 54 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -582,22 +582,22 @@ private async Task SendBufferAsync(IByteBuffer buffer, CancellationToken cancell
582582
}
583583
}
584584

585-
public void SendMessages(IEnumerable<RequestMessage> messages, MessageEncoderSettings messageEncoderSettings, CancellationToken cancellationToken)
585+
public void SendMessage(RequestMessage message, MessageEncoderSettings messageEncoderSettings, CancellationToken cancellationToken)
586586
{
587-
Ensure.IsNotNull(messages, nameof(messages));
587+
Ensure.IsNotNull(message, nameof(message));
588588
ThrowIfCancelledOrDisposedOrNotOpen(cancellationToken);
589589

590-
var helper = new SendMessagesHelper(this, messages, messageEncoderSettings);
590+
var helper = new SendMessagesHelper(this, message, messageEncoderSettings);
591591
try
592592
{
593-
helper.EncodingMessages();
594-
using (var uncompressedBuffer = helper.EncodeMessages(cancellationToken, out var sentMessages))
593+
helper.EncodingMessage();
594+
using (var uncompressedBuffer = helper.EncodeMessage(cancellationToken, out var sentMessages))
595595
{
596-
helper.SendingMessages(uncompressedBuffer);
596+
helper.SendingMessage(uncompressedBuffer);
597597
int sentLength;
598-
if (AnyMessageNeedsToBeCompressed(sentMessages))
598+
if (ShouldBeCompressed(sentMessages))
599599
{
600-
using (var compressedBuffer = CompressMessages(sentMessages, uncompressedBuffer, messageEncoderSettings))
600+
using (var compressedBuffer = CompressMessage(sentMessages, uncompressedBuffer, messageEncoderSettings))
601601
{
602602
SendBuffer(compressedBuffer, cancellationToken);
603603
sentLength = compressedBuffer.Length;
@@ -619,22 +619,22 @@ public void SendMessages(IEnumerable<RequestMessage> messages, MessageEncoderSet
619619
}
620620
}
621621

622-
public async Task SendMessagesAsync(IEnumerable<RequestMessage> messages, MessageEncoderSettings messageEncoderSettings, CancellationToken cancellationToken)
622+
public async Task SendMessageAsync(RequestMessage message, MessageEncoderSettings messageEncoderSettings, CancellationToken cancellationToken)
623623
{
624-
Ensure.IsNotNull(messages, nameof(messages));
624+
Ensure.IsNotNull(message, nameof(message));
625625
ThrowIfCancelledOrDisposedOrNotOpen(cancellationToken);
626626

627-
var helper = new SendMessagesHelper(this, messages, messageEncoderSettings);
627+
var helper = new SendMessagesHelper(this, message, messageEncoderSettings);
628628
try
629629
{
630-
helper.EncodingMessages();
631-
using (var uncompressedBuffer = helper.EncodeMessages(cancellationToken, out var sentMessages))
630+
helper.EncodingMessage();
631+
using (var uncompressedBuffer = helper.EncodeMessage(cancellationToken, out var sentMessages))
632632
{
633-
helper.SendingMessages(uncompressedBuffer);
633+
helper.SendingMessage(uncompressedBuffer);
634634
int sentLength;
635-
if (AnyMessageNeedsToBeCompressed(sentMessages))
635+
if (ShouldBeCompressed(sentMessages))
636636
{
637-
using (var compressedBuffer = CompressMessages(sentMessages, uncompressedBuffer, messageEncoderSettings))
637+
using (var compressedBuffer = CompressMessage(sentMessages, uncompressedBuffer, messageEncoderSettings))
638638
{
639639
await SendBufferAsync(compressedBuffer, cancellationToken).ConfigureAwait(false);
640640
sentLength = compressedBuffer.Length;
@@ -663,9 +663,9 @@ public void SetReadTimeout(TimeSpan timeout)
663663
}
664664

665665
// private methods
666-
private bool AnyMessageNeedsToBeCompressed(IEnumerable<RequestMessage> messages)
666+
private bool ShouldBeCompressed(RequestMessage message)
667667
{
668-
return _sendCompressorType.HasValue && messages.Any(m => m.MayBeCompressed);
668+
return _sendCompressorType.HasValue && message.MayBeCompressed;
669669
}
670670

671671
private CompressorType? ChooseSendCompressorTypeIfAny(ConnectionDescription connectionDescription)
@@ -674,8 +674,8 @@ private bool AnyMessageNeedsToBeCompressed(IEnumerable<RequestMessage> messages)
674674
return availableCompressors.Count > 0 ? (CompressorType?)availableCompressors[0] : null;
675675
}
676676

677-
private IByteBuffer CompressMessages(
678-
IEnumerable<RequestMessage> messages,
677+
private IByteBuffer CompressMessage(
678+
RequestMessage message,
679679
IByteBuffer uncompressedBuffer,
680680
MessageEncoderSettings messageEncoderSettings)
681681
{
@@ -685,24 +685,22 @@ private IByteBuffer CompressMessages(
685685
using (var uncompressedStream = new ByteBufferStream(uncompressedBuffer, ownsBuffer: false))
686686
using (var compressedStream = new ByteBufferStream(compressedBuffer, ownsBuffer: false))
687687
{
688-
foreach (var message in messages)
689-
{
690-
var uncompressedMessageLength = uncompressedStream.ReadInt32();
691-
uncompressedStream.Position -= 4;
688+
var uncompressedMessageLength = uncompressedStream.ReadInt32();
689+
uncompressedStream.Position -= 4;
692690

693-
using (var uncompressedMessageSlice = uncompressedBuffer.GetSlice((int)uncompressedStream.Position, uncompressedMessageLength))
694-
using (var uncompressedMessageStream = new ByteBufferStream(uncompressedMessageSlice, ownsBuffer: false))
691+
using (var uncompressedMessageSlice = uncompressedBuffer.GetSlice((int)uncompressedStream.Position, uncompressedMessageLength))
692+
using (var uncompressedMessageStream = new ByteBufferStream(uncompressedMessageSlice, ownsBuffer: false))
693+
{
694+
if (message.MayBeCompressed)
695695
{
696-
if (message.MayBeCompressed)
697-
{
698-
CompressMessage(message, uncompressedMessageStream, compressedStream, messageEncoderSettings);
699-
}
700-
else
701-
{
702-
uncompressedMessageStream.EfficientCopyTo(compressedStream);
703-
}
696+
CompressMessage(message, uncompressedMessageStream, compressedStream, messageEncoderSettings);
697+
}
698+
else
699+
{
700+
uncompressedMessageStream.EfficientCopyTo(compressedStream);
704701
}
705702
}
703+
706704
compressedBuffer.Length = (int)compressedStream.Length;
707705
}
708706

@@ -983,24 +981,24 @@ private class SendMessagesHelper
983981
private readonly Stopwatch _commandStopwatch;
984982
private readonly BinaryConnection _connection;
985983
private readonly MessageEncoderSettings _messageEncoderSettings;
986-
private readonly List<RequestMessage> _messages;
987-
private Lazy<List<int>> _requestIds;
984+
private readonly RequestMessage _message;
985+
private Lazy<IReadOnlyList<int>> _requestIds;
988986
private TimeSpan _serializationDuration;
989987
private Stopwatch _networkStopwatch;
990988

991-
public SendMessagesHelper(BinaryConnection connection, IEnumerable<RequestMessage> messages, MessageEncoderSettings messageEncoderSettings)
989+
public SendMessagesHelper(BinaryConnection connection, RequestMessage message, MessageEncoderSettings messageEncoderSettings)
992990
{
993991
_connection = connection;
994-
_messages = messages.ToList();
992+
_message = message;
995993
_messageEncoderSettings = messageEncoderSettings;
996994

997995
_commandStopwatch = Stopwatch.StartNew();
998-
_requestIds = new Lazy<List<int>>(() => _messages.Select(m => m.RequestId).ToList());
996+
_requestIds = new Lazy<IReadOnlyList<int>>(() => [_message.RequestId]);
999997
}
1000998

1001-
public IByteBuffer EncodeMessages(CancellationToken cancellationToken, out List<RequestMessage> sentMessages)
999+
public IByteBuffer EncodeMessage(CancellationToken cancellationToken, out RequestMessage sentMessage)
10021000
{
1003-
sentMessages = new List<RequestMessage>();
1001+
sentMessage = null;
10041002
cancellationToken.ThrowIfCancellationRequested();
10051003

10061004
var serializationStopwatch = Stopwatch.StartNew();
@@ -1009,21 +1007,17 @@ public IByteBuffer EncodeMessages(CancellationToken cancellationToken, out List<
10091007
using (var stream = new ByteBufferStream(buffer, ownsBuffer: false))
10101008
{
10111009
var encoderFactory = new BinaryMessageEncoderFactory(stream, _messageEncoderSettings, compressorSource: null);
1012-
foreach (var message in _messages)
1013-
{
1014-
if (message.ShouldBeSent == null || message.ShouldBeSent())
1015-
{
1016-
var encoder = message.GetEncoder(encoderFactory);
1017-
encoder.WriteMessage(message);
1018-
message.WasSent = true;
1019-
sentMessages.Add(message);
1020-
}
10211010

1022-
// Encoding messages includes serializing the
1023-
// documents, so encoding message could be expensive
1024-
// and worthy of us honoring cancellation here.
1025-
cancellationToken.ThrowIfCancellationRequested();
1026-
}
1011+
var encoder = _message.GetEncoder(encoderFactory);
1012+
encoder.WriteMessage(_message);
1013+
_message.WasSent = true;
1014+
sentMessage = _message;
1015+
1016+
// Encoding messages includes serializing the
1017+
// documents, so encoding message could be expensive
1018+
// and worthy of us honoring cancellation here.
1019+
cancellationToken.ThrowIfCancellationRequested();
1020+
10271021
buffer.Length = (int)stream.Length;
10281022
buffer.MakeReadOnly();
10291023
}
@@ -1033,7 +1027,7 @@ public IByteBuffer EncodeMessages(CancellationToken cancellationToken, out List<
10331027
return buffer;
10341028
}
10351029

1036-
public void EncodingMessages()
1030+
public void EncodingMessage()
10371031
{
10381032
_connection._eventLogger.LogAndPublish(new ConnectionSendingMessagesEvent(_connection.ConnectionId, _requestIds.Value, EventContext.OperationId));
10391033
}
@@ -1042,17 +1036,17 @@ public void FailedSendingMessages(Exception ex)
10421036
{
10431037
if (_connection._commandEventHelper.ShouldCallErrorSending)
10441038
{
1045-
_connection._commandEventHelper.ErrorSending(_messages, _connection._connectionId, _connection._description?.ServiceId, ex, _connection.IsInitializing);
1039+
_connection._commandEventHelper.ErrorSending(_message, _connection._connectionId, _connection._description?.ServiceId, ex, _connection.IsInitializing);
10461040
}
10471041

10481042
_connection._eventLogger.LogAndPublish(new ConnectionSendingMessagesFailedEvent(_connection.ConnectionId, _requestIds.Value, ex, EventContext.OperationId));
10491043
}
10501044

1051-
public void SendingMessages(IByteBuffer buffer)
1045+
public void SendingMessage(IByteBuffer buffer)
10521046
{
10531047
if (_connection._commandEventHelper.ShouldCallBeforeSending)
10541048
{
1055-
_connection._commandEventHelper.BeforeSending(_messages, _connection.ConnectionId, _connection.Description?.ServiceId, buffer, _messageEncoderSettings, _commandStopwatch, _connection.IsInitializing);
1049+
_connection._commandEventHelper.BeforeSending(_message, _connection.ConnectionId, _connection.Description?.ServiceId, buffer, _messageEncoderSettings, _commandStopwatch, _connection.IsInitializing);
10561050
}
10571051

10581052
_networkStopwatch = Stopwatch.StartNew();
@@ -1065,7 +1059,7 @@ public void SentMessages(int bufferLength)
10651059

10661060
if (_connection._commandEventHelper.ShouldCallAfterSending)
10671061
{
1068-
_connection._commandEventHelper.AfterSending(_messages, _connection._connectionId, _connection.Description?.ServiceId, _connection.IsInitializing);
1062+
_connection._commandEventHelper.AfterSending(_message, _connection._connectionId, _connection.Description?.ServiceId, _connection.IsInitializing);
10691063
}
10701064

10711065
_connection._eventLogger.LogAndPublish(new ConnectionSentMessagesEvent(_connection.ConnectionId, _requestIds.Value, bufferLength, networkDuration, _serializationDuration, EventContext.OperationId));

0 commit comments

Comments
 (0)