@@ -232,17 +232,16 @@ public void configure(Map<String, ?> configs) {
232
232
MessageListenerContainer listenerContainer1 = rler .getListenerContainer ("obs1" );
233
233
listenerContainer1 .stop ();
234
234
235
- template .send (OBSERVATION_TEST_1 , "test" )
236
- .thenAccept ((sendResult ) -> spanFromCallback .set (tracer .currentSpan ()))
237
- .get ( 10 , TimeUnit . SECONDS );
235
+ assertThat ( template .send (OBSERVATION_TEST_1 , "test" )
236
+ .thenAccept ((sendResult ) -> spanFromCallback .set (tracer .currentSpan ())))
237
+ .succeedsWithin ( Duration . ofSeconds ( 20 ) );
238
238
239
239
Deque <SimpleSpan > spans = tracer .getSpans ();
240
240
assertThat (spans ).hasSize (1 );
241
241
242
242
SimpleSpan templateSpan = spans .peek ();
243
243
assertThat (templateSpan ).isNotNull ();
244
- assertThat (templateSpan .getTags ()).containsAllEntriesOf (Map .of (
245
- "key" , "value" ));
244
+ assertThat (templateSpan .getTags ()).containsAllEntriesOf (Map .of ("key" , "value" ));
246
245
247
246
assertThat (spanFromCallback .get ()).isNotNull ();
248
247
listenerContainer1 .start ();
@@ -352,7 +351,7 @@ private void assertThatTemplateSpanTags(Deque<SimpleSpan> spans, int tagSize, St
352
351
"messaging.system" , "kafka" ,
353
352
"messaging.destination.kind" , "topic" ,
354
353
"messaging.destination.name" , destName ));
355
- if (keyValues != null && keyValues .length > 0 ) {
354
+ if (keyValues .length > 0 ) {
356
355
Arrays .stream (keyValues ).forEach (entry -> assertThat (span .getTags ()).contains (entry ));
357
356
}
358
357
assertThat (span .getName ()).isEqualTo (destName + " send" );
@@ -382,7 +381,7 @@ private SimpleSpan assertThatListenerSpanTags(Deque<SimpleSpan> spans, int tagSi
382
381
Map .entry ("messaging.source.kind" , "topic" ),
383
382
Map .entry ("messaging.source.name" , sourceName ),
384
383
Map .entry ("messaging.system" , "kafka" )));
385
- if (keyValues != null && keyValues .length > 0 ) {
384
+ if (keyValues .length > 0 ) {
386
385
Arrays .stream (keyValues ).forEach (entry -> assertThat (span .getTags ()).contains (entry ));
387
386
}
388
387
assertThat (span .getName ()).isEqualTo (sourceName + " receive" );
@@ -479,7 +478,8 @@ void observationErrorException(@Autowired ExceptionListener listener, @Autowired
479
478
}
480
479
481
480
@ Test
482
- void observationErrorExceptionWhenCompletableFutureReturned (@ Autowired ExceptionListener listener , @ Autowired SimpleTracer tracer ,
481
+ void observationErrorExceptionWhenCompletableFutureReturned (@ Autowired ExceptionListener listener ,
482
+ @ Autowired SimpleTracer tracer ,
483
483
@ Autowired @ Qualifier ("throwableTemplate" ) KafkaTemplate <Integer , String > errorTemplate ,
484
484
@ Autowired KafkaListenerEndpointRegistry endpointRegistry )
485
485
throws ExecutionException , InterruptedException , TimeoutException {
@@ -665,8 +665,11 @@ KafkaTemplate<Integer, String> reuseAdminBeanKafkaTemplate(
665
665
}
666
666
667
667
@ Bean
668
- ReplyingKafkaTemplate <Integer , String , String > replyingKafkaTemplate (ProducerFactory <Integer , String > pf , ConcurrentKafkaListenerContainerFactory <Integer , String > containerFactory ) {
669
- ReplyingKafkaTemplate <Integer , String , String > kafkaTemplate = new ReplyingKafkaTemplate <>(pf , containerFactory .createContainer (OBSERVATION_REPLY ));
668
+ ReplyingKafkaTemplate <Integer , String , String > replyingKafkaTemplate (
669
+ ProducerFactory <Integer , String > pf ,
670
+ ConcurrentKafkaListenerContainerFactory <Integer , String > containerFactory ) {
671
+
672
+ var kafkaTemplate = new ReplyingKafkaTemplate <>(pf , containerFactory .createContainer (OBSERVATION_REPLY ));
670
673
kafkaTemplate .setObservationEnabled (true );
671
674
return kafkaTemplate ;
672
675
}
@@ -734,7 +737,8 @@ ObservationRegistry observationRegistry(Tracer tracer, Propagator propagator, Me
734
737
new PropagatingSenderTracingObservationHandler <>(tracer , propagator ),
735
738
// This is responsible for creating a default span
736
739
new DefaultTracingObservationHandler (tracer )))
737
- .observationHandler (new TracingAwareMeterObservationHandler <>(new DefaultMeterObservationHandler (meterRegistry ), tracer ));
740
+ .observationHandler (new TracingAwareMeterObservationHandler <>(
741
+ new DefaultMeterObservationHandler (meterRegistry ), tracer ));
738
742
return observationRegistry ;
739
743
}
740
744
@@ -803,6 +807,7 @@ AsyncFailureListener asyncFailureListener(SimpleTracer tracer) {
803
807
public TaskScheduler taskExecutor () {
804
808
return new ThreadPoolTaskScheduler ();
805
809
}
810
+
806
811
}
807
812
808
813
public static class Listener {
@@ -932,6 +937,7 @@ void handleDlt(ConsumerRecord<Integer, String> record, Exception exception) {
932
937
this .capturedSpanInDlt = this .tracer .currentSpan ();
933
938
this .asyncFailureLatch .countDown ();
934
939
}
940
+
935
941
}
936
942
937
943
}
0 commit comments