@@ -101,7 +101,7 @@ await base.OpenAsync()
101101 /// <exception cref="InvalidOperationException"></exception>
102102 /// <exception cref="NotSupportedException"></exception>
103103 /// <exception cref="PublisherException"></exception>
104- public async Task < PublishResult > PublishAsync ( IMessage message , CancellationToken cancellationToken = default )
104+ public Task < PublishResult > PublishAsync ( IMessage message , CancellationToken cancellationToken = default )
105105 {
106106 ThrowIfClosed ( ) ;
107107
@@ -119,17 +119,17 @@ public async Task<PublishResult> PublishAsync(IMessage message, CancellationToke
119119 stopwatch . Start ( ) ;
120120 }
121121
122+ TaskCompletionSource < PublishResult > publishResultTcs =
123+ Utils . CreateTaskCompletionSource < PublishResult > ( ) ;
124+
122125 try
123126 {
124- TaskCompletionSource < PublishOutcome > messagePublishedTcs =
125- Utils . CreateTaskCompletionSource < PublishOutcome > ( ) ;
126-
127127 Message nativeMessage = ( ( AmqpMessage ) message ) . NativeMessage ;
128128
129129 void OutcomeCallback ( ILink sender , Message inMessage , Outcome outcome , object state )
130130 {
131131 // Note: sometimes `message` is null 🤔
132- System . Diagnostics . Debug . Assert ( Object . ReferenceEquals ( this , state ) ) ;
132+ Debug . Assert ( Object . ReferenceEquals ( this , state ) ) ;
133133
134134 if ( false == Object . ReferenceEquals ( _senderLink , sender ) )
135135 {
@@ -167,7 +167,15 @@ void OutcomeCallback(ILink sender, Message inMessage, Outcome outcome, object st
167167 }
168168 }
169169
170- messagePublishedTcs . SetResult ( publishOutcome ) ;
170+ // TODO cancellation token
171+ if ( _metricsReporter is not null && stopwatch is not null )
172+ {
173+ stopwatch . Stop ( ) ;
174+ _metricsReporter . Published ( stopwatch . Elapsed ) ;
175+ }
176+
177+ var publishResult = new PublishResult ( message , publishOutcome ) ;
178+ publishResultTcs . SetResult ( publishResult ) ;
171179 }
172180
173181 /*
@@ -176,25 +184,16 @@ void OutcomeCallback(ILink sender, Message inMessage, Outcome outcome, object st
176184 */
177185 _senderLink . Send ( nativeMessage , OutcomeCallback , this ) ;
178186
179- // TODO cancellation token
180- // PublishOutcome publishOutcome = await messagePublishedTcs.Task.WaitAsync(TimeSpan.FromSeconds(5), cancellationToken)
181- PublishOutcome publishOutcome = await messagePublishedTcs . Task . WaitAsync ( TimeSpan . FromSeconds ( 5 ) )
182- . ConfigureAwait ( false ) ;
183-
184- if ( _metricsReporter is not null && stopwatch is not null )
185- {
186- stopwatch . Stop ( ) ;
187- _metricsReporter . Published ( stopwatch . Elapsed ) ;
188- }
189-
190- return new PublishResult ( message , publishOutcome ) ;
187+ return publishResultTcs . Task ;
191188 }
192189 catch ( AmqpException ex )
193190 {
194191 stopwatch ? . Stop ( ) ;
195192 _metricsReporter ? . PublishDisposition ( IMetricsReporter . PublishDispositionValue . REJECTED ) ;
196193 var publishOutcome = new PublishOutcome ( OutcomeState . Rejected , Utils . ConvertError ( ex . Error ) ) ;
197- return new PublishResult ( message , publishOutcome ) ;
194+ var publishResult = new PublishResult ( message , publishOutcome ) ;
195+ publishResultTcs . SetResult ( publishResult ) ;
196+ return publishResultTcs . Task ;
198197 }
199198 catch ( Exception e )
200199 {
0 commit comments