From 984faba39eb644863b9ab6cda6aba6f29d0816ff Mon Sep 17 00:00:00 2001 From: anurag-dubey_bveng Date: Sat, 26 Oct 2024 08:54:07 +0000 Subject: [PATCH] added datadog metrics changes --- queue/pom.xml | 6 ++++++ .../bazaarvoice/emodb/queue/core/AbstractQueueService.java | 7 ++++++- .../emodb/queue/core/DefaultDedupQueueService.java | 5 +++-- .../bazaarvoice/emodb/queue/core/DefaultQueueService.java | 5 +++-- .../bazaarvoice/emodb/queue/core/SizeQueueCacheTest.java | 3 ++- 5 files changed, 20 insertions(+), 6 deletions(-) diff --git a/queue/pom.xml b/queue/pom.xml index 4c3de7c7e..b188d69e5 100644 --- a/queue/pom.xml +++ b/queue/pom.xml @@ -135,5 +135,11 @@ com.amazonaws aws-java-sdk-ssm + + com.datadoghq + java-dogstatsd-client + 2.3 + compile + diff --git a/queue/src/main/java/com/bazaarvoice/emodb/queue/core/AbstractQueueService.java b/queue/src/main/java/com/bazaarvoice/emodb/queue/core/AbstractQueueService.java index 9ee5c92d4..c9faee53f 100644 --- a/queue/src/main/java/com/bazaarvoice/emodb/queue/core/AbstractQueueService.java +++ b/queue/src/main/java/com/bazaarvoice/emodb/queue/core/AbstractQueueService.java @@ -45,6 +45,7 @@ import static com.google.common.base.Preconditions.checkArgument; import static java.util.Objects.requireNonNull; +import com.timgroup.statsd.StatsDClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -63,6 +64,7 @@ abstract class AbstractQueueService implements BaseQueueService { public static final int MAX_MESSAGE_SIZE_IN_BYTES = 30 * 1024; private final StepFunctionService stepFunctionService; private final ParameterStoreUtil parameterStoreUtil; + private final StatsDClient _statsDClient; // Cache for the isExperiment value with a TTL of 5 minutes private final Cache experimentCache = CacheBuilder.newBuilder() @@ -80,7 +82,7 @@ abstract class AbstractQueueService implements BaseQueueService { protected AbstractQueueService(BaseEventStore eventStore, JobService jobService, JobHandlerRegistry jobHandlerRegistry, JobType moveQueueJobType, - Clock clock, KafkaAdminService adminService, KafkaProducerService producerService, StepFunctionService stepFunctionService) { + Clock clock, KafkaAdminService adminService, KafkaProducerService producerService, StepFunctionService stepFunctionService, StatsDClient statsDClient) { _eventStore = eventStore; _jobService = jobService; _moveQueueJobType = moveQueueJobType; @@ -88,6 +90,7 @@ protected AbstractQueueService(BaseEventStore eventStore, JobService jobService, this.producerService = producerService; this.stepFunctionService = stepFunctionService; this.parameterStoreUtil = new ParameterStoreUtil(); + _statsDClient = statsDClient; registerMoveQueueJobHandler(jobHandlerRegistry); @@ -232,6 +235,7 @@ public void sendAll(Map> messagesByQueue) { validateMessage(message); events.add(message.toString()); } + _statsDClient.recordGaugeValue("queue.messages.size", events.size(), "queue:" + queue); builder.putAll(queue, events); } @@ -268,6 +272,7 @@ public void sendAll(String queue, Collection messages, boolean fromKafka) { "Message size (" + messageByteBuffer.limit() + ") is greater than the maximum allowed (" + MAX_MESSAGE_SIZE_IN_BYTES + ") message size"); events.add(messageByteBuffer); } + _statsDClient.recordGaugeValue("queue.messages.size", events.size(), "queue:" + queue); builder.putAll(queue, events); Multimap eventsByChannel = builder.build(); diff --git a/queue/src/main/java/com/bazaarvoice/emodb/queue/core/DefaultDedupQueueService.java b/queue/src/main/java/com/bazaarvoice/emodb/queue/core/DefaultDedupQueueService.java index 9ec8d3660..62fb8ff9c 100644 --- a/queue/src/main/java/com/bazaarvoice/emodb/queue/core/DefaultDedupQueueService.java +++ b/queue/src/main/java/com/bazaarvoice/emodb/queue/core/DefaultDedupQueueService.java @@ -8,13 +8,14 @@ import com.bazaarvoice.emodb.queue.core.kafka.KafkaProducerService; import com.bazaarvoice.emodb.queue.core.stepfn.StepFunctionService; import com.google.inject.Inject; +import com.timgroup.statsd.StatsDClient; import java.time.Clock; public class DefaultDedupQueueService extends AbstractQueueService implements DedupQueueService { @Inject public DefaultDedupQueueService(DedupEventStore eventStore, JobService jobService, JobHandlerRegistry jobHandlerRegistry, - Clock clock, KafkaAdminService adminService, KafkaProducerService producerService, StepFunctionService stepFunctionService) { - super(eventStore, jobService, jobHandlerRegistry, MoveDedupQueueJob.INSTANCE, clock,adminService,producerService,stepFunctionService ); + Clock clock, KafkaAdminService adminService, KafkaProducerService producerService, StepFunctionService stepFunctionService, StatsDClient statsDClient) { + super(eventStore, jobService, jobHandlerRegistry, MoveDedupQueueJob.INSTANCE, clock,adminService,producerService,stepFunctionService, statsDClient); } } diff --git a/queue/src/main/java/com/bazaarvoice/emodb/queue/core/DefaultQueueService.java b/queue/src/main/java/com/bazaarvoice/emodb/queue/core/DefaultQueueService.java index 524ca5003..8a8a72ad8 100644 --- a/queue/src/main/java/com/bazaarvoice/emodb/queue/core/DefaultQueueService.java +++ b/queue/src/main/java/com/bazaarvoice/emodb/queue/core/DefaultQueueService.java @@ -8,13 +8,14 @@ import com.bazaarvoice.emodb.queue.core.kafka.KafkaProducerService; import com.bazaarvoice.emodb.queue.core.stepfn.StepFunctionService; import com.google.inject.Inject; +import com.timgroup.statsd.StatsDClient; import java.time.Clock; public class DefaultQueueService extends AbstractQueueService implements QueueService { @Inject public DefaultQueueService(EventStore eventStore, JobService jobService, JobHandlerRegistry jobHandlerRegistry, - Clock clock, KafkaAdminService adminService, KafkaProducerService producerService, StepFunctionService stepFunctionService) { - super(eventStore, jobService, jobHandlerRegistry, MoveQueueJob.INSTANCE, clock,adminService, producerService,stepFunctionService); + Clock clock, KafkaAdminService adminService, KafkaProducerService producerService, StepFunctionService stepFunctionService, StatsDClient statsDClient) { + super(eventStore, jobService, jobHandlerRegistry, MoveQueueJob.INSTANCE, clock,adminService, producerService,stepFunctionService, statsDClient); } } diff --git a/queue/src/test/java/com/bazaarvoice/emodb/queue/core/SizeQueueCacheTest.java b/queue/src/test/java/com/bazaarvoice/emodb/queue/core/SizeQueueCacheTest.java index 0ac9a0ed8..36d4232d4 100644 --- a/queue/src/test/java/com/bazaarvoice/emodb/queue/core/SizeQueueCacheTest.java +++ b/queue/src/test/java/com/bazaarvoice/emodb/queue/core/SizeQueueCacheTest.java @@ -7,6 +7,7 @@ import com.bazaarvoice.emodb.queue.core.kafka.KafkaAdminService; import com.bazaarvoice.emodb.queue.core.kafka.KafkaProducerService; import com.bazaarvoice.emodb.queue.core.stepfn.StepFunctionService; +import com.timgroup.statsd.StatsDClient; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.testng.annotations.Test; @@ -41,7 +42,7 @@ public void testSizeCache() { BaseEventStore mockEventStore = mock(BaseEventStore.class); AbstractQueueService queueService = new AbstractQueueService(mockEventStore, mock(JobService.class), - mock(JobHandlerRegistry.class), mock(JobType.class), clock, mock(KafkaAdminService.class), mock(KafkaProducerService.class), mock(StepFunctionService.class)){}; + mock(JobHandlerRegistry.class), mock(JobType.class), clock, mock(KafkaAdminService.class), mock(KafkaProducerService.class), mock(StepFunctionService.class), mock(StatsDClient.class)){}; // At limit=500, size estimate should be at 4800 // At limit=50, size estimate should be at 5000