Skip to content

Commit 7fd84fd

Browse files
authored
Merge pull request #174 from ExpediaDotCom/traceId-krastogi
Adding metric to track missing trace result from cassandra
2 parents 8b2650e + 0d294a1 commit 7fd84fd

File tree

5 files changed

+30
-10
lines changed

5 files changed

+30
-10
lines changed

reader/build/docker/jmxtrans-agent.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
<query objectName="metrics:name=cassandra.read.time" attributes="99thPercentile,50thPercentile,OneMinuteRate" resultAlias="cassandra.read.time.#attribute#"/>
66
<query objectName="metrics:name=cassandra.read.failures" attributes="OneMinuteRate" resultAlias="cassandra.read.failures.#attribute#"/>
77
<query objectName="metrics:name=search.trace.rejected" attributes="OneMinuteRate" resultAlias="trace-search.rejected.#attribute#"/>
8+
<query objectName="metrics:name=cassandra.traces.failures" attributes="OneMinuteRate" resultAlias="cassandra.traces.failures.#attribute#"/>
89

910
<!-- reader endpoint metrics -->
1011
<query objectName="metrics:name=TraceReader.getFieldNames" attributes="50thPercentile,99thPercentile,OneMinuteRate" resultAlias="trace.reader.getFieldNames.#attribute#"/>

reader/src/main/scala/com/expedia/www/haystack/trace/reader/metrics/AppMetricNames.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ object AppMetricNames {
2424

2525
val CASSANDRA_READ_TIME = "cassandra.read.time"
2626
val CASSANDRA_READ_FAILURES = "cassandra.read.failures"
27+
val CASSANDRA_TRACES_FAILURE = "cassandra.traces.failures"
2728

2829
val SEARCH_TRACE_REJECTED = "search.trace.rejected"
2930
val COUNT_BUCKET_REJECTED = "count.bucket.rejected"

reader/src/main/scala/com/expedia/www/haystack/trace/reader/stores/readers/cassandra/CassandraReadRawTracesResultListener.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,9 +36,11 @@ object CassandraReadRawTracesResultListener {
3636
}
3737

3838
class CassandraReadRawTracesResultListener(asyncResult: ResultSetFuture,
39+
promise: Promise[Seq[Trace]],
3940
timer: Timer.Context,
4041
failure: Meter,
41-
promise: Promise[Seq[Trace]]) extends Runnable {
42+
tracesFailure: Meter,
43+
traceIdCount: Int) extends Runnable {
4244
override def run(): Unit = {
4345
timer.close()
4446

@@ -47,6 +49,7 @@ class CassandraReadRawTracesResultListener(asyncResult: ResultSetFuture,
4749
.flatMap(tryDeserialize)
4850
match {
4951
case Success(traces) =>
52+
tracesFailure.mark(traceIdCount - traces.length)
5053
promise.success(traces)
5154
case Failure(ex) =>
5255
if (fatalError(ex)) {
@@ -56,6 +59,7 @@ class CassandraReadRawTracesResultListener(asyncResult: ResultSetFuture,
5659
LOGGER.error("Failed in reading the record from cassandra", ex)
5760
}
5861
failure.mark()
62+
tracesFailure.mark(traceIdCount)
5963
promise.failure(ex)
6064
}
6165
}

reader/src/main/scala/com/expedia/www/haystack/trace/reader/stores/readers/cassandra/CassandraTraceReader.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ class CassandraTraceReader(cassandra: CassandraSession, config: CassandraConfigu
3030

3131
private val readTimer = metricRegistry.timer(AppMetricNames.CASSANDRA_READ_TIME)
3232
private val readFailures = metricRegistry.meter(AppMetricNames.CASSANDRA_READ_FAILURES)
33+
private val tracesFailures = metricRegistry.meter(AppMetricNames.CASSANDRA_TRACES_FAILURE)
3334

3435
def readTrace(traceId: String): Future[Trace] = {
3536
val timer = readTimer.time()
@@ -56,7 +57,7 @@ class CassandraTraceReader(cassandra: CassandraSession, config: CassandraConfigu
5657
try {
5758
val statement = cassandra.newSelectRawTracesBoundStatement(traceIds)
5859
val asyncResult = cassandra.executeAsync(statement)
59-
asyncResult.addListener(new CassandraReadRawTracesResultListener(asyncResult, timer, readFailures, promise), dispatcher)
60+
asyncResult.addListener(new CassandraReadRawTracesResultListener(asyncResult, promise, timer, readFailures, tracesFailures, traceIds.size), dispatcher)
6061
promise.future
6162
} catch {
6263
case ex: Exception =>

reader/src/test/scala/com/expedia/www/haystack/trace/reader/unit/stores/readers/cassandra/CassandraReadRawTracesResultListenerSpec.scala

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import com.expedia.open.tracing.Span
2424
import com.expedia.open.tracing.api.Trace
2525
import com.expedia.open.tracing.buffer.SpanBuffer
2626
import com.expedia.www.haystack.trace.commons.clients.cassandra.CassandraTableSchema
27-
import com.expedia.www.haystack.trace.reader.stores.readers.cassandra.{CassandraReadRawTracesResultListener, CassandraReadTraceResultListener}
27+
import com.expedia.www.haystack.trace.reader.stores.readers.cassandra.CassandraReadRawTracesResultListener
2828
import com.expedia.www.haystack.trace.reader.unit.BaseUnitTestSpec
2929
import io.grpc.{Status, StatusException}
3030
import org.easymock.EasyMock
@@ -40,6 +40,7 @@ class CassandraReadRawTracesResultListenerSpec extends BaseUnitTestSpec {
4040
val resultSet = mock[ResultSet]
4141
val promise = mock[Promise[Seq[Trace]]]
4242
val failureMeter = mock[Meter]
43+
val tracesFailures = mock[Meter]
4344
val timer = mock[Timer.Context]
4445

4546
val mockSpanBufferRow_1 = mock[Row]
@@ -56,8 +57,10 @@ class CassandraReadRawTracesResultListenerSpec extends BaseUnitTestSpec {
5657

5758

5859
val capturedTraces = EasyMock.newCapture[Seq[Trace]]()
60+
val capturedMeter = EasyMock.newCapture[Int]()
5961
expecting {
6062
timer.close()
63+
tracesFailures.mark(EasyMock.capture(capturedMeter))
6164
mockReadResult.get().andReturn(resultSet)
6265
resultSet.all().andReturn(List(mockSpanBufferRow_1, mockSpanBufferRow_2, mockSpanBufferRow_3).asJava)
6366
mockSpanBufferRow_1.getBytes(CassandraTableSchema.SPANS_COLUMN_NAME).andReturn(ByteBuffer.wrap(spanBuffer_1.toByteArray))
@@ -66,14 +69,16 @@ class CassandraReadRawTracesResultListenerSpec extends BaseUnitTestSpec {
6669
promise.success(EasyMock.capture(capturedTraces)).andReturn(promise)
6770
}
6871

69-
whenExecuting(mockReadResult, promise, failureMeter, timer, resultSet, mockSpanBufferRow_1, mockSpanBufferRow_2, mockSpanBufferRow_3) {
70-
val listener = new CassandraReadRawTracesResultListener(mockReadResult, timer, failureMeter, promise)
72+
whenExecuting(mockReadResult, promise, tracesFailures, failureMeter, timer, resultSet, mockSpanBufferRow_1, mockSpanBufferRow_2, mockSpanBufferRow_3) {
73+
val listener = new CassandraReadRawTracesResultListener(mockReadResult, promise, timer, failureMeter, tracesFailures, 2)
7174
listener.run()
7275
val traceIdSpansMap: Map[String, Set[String]] = capturedTraces.getValue.map(capturedTrace =>
7376
capturedTrace.getTraceId -> capturedTrace.getChildSpansList.asScala.map(_.getSpanId).toSet).toMap
7477

7578
traceIdSpansMap("TRACE_ID1") shouldEqual Set("SPAN_ID_1", "SPAN_ID_2")
7679
traceIdSpansMap("TRACE_ID3") shouldEqual Set("SPAN_ID_3")
80+
81+
capturedMeter.getValue shouldEqual 0
7782
}
7883
}
7984

@@ -82,6 +87,7 @@ class CassandraReadRawTracesResultListenerSpec extends BaseUnitTestSpec {
8287
val resultSet = mock[ResultSet]
8388
val promise = mock[Promise[Seq[Trace]]]
8489
val failureMeter = mock[Meter]
90+
val tracesFailures = mock[Meter]
8591
val timer = mock[Timer.Context]
8692

8793
val mockSpanBufferRow_1 = mock[Row]
@@ -90,42 +96,49 @@ class CassandraReadRawTracesResultListenerSpec extends BaseUnitTestSpec {
9096
val span_1 = Span.newBuilder().setTraceId("TRACE_ID").setSpanId("SPAN_ID_1")
9197
val spanBuffer_1 = SpanBuffer.newBuilder().setTraceId("TRACE_ID").addChildSpans(span_1).build()
9298

99+
val capturedMeter = EasyMock.newCapture[Int]()
93100
expecting {
94101
timer.close()
95102
failureMeter.mark()
103+
tracesFailures.mark(EasyMock.capture(capturedMeter))
96104
mockReadResult.get().andReturn(resultSet)
97105
resultSet.all().andReturn(List(mockSpanBufferRow_1, mockSpanBufferRow_2).asJava)
98106
mockSpanBufferRow_1.getBytes(CassandraTableSchema.SPANS_COLUMN_NAME).andReturn(ByteBuffer.wrap(spanBuffer_1.toByteArray))
99107
mockSpanBufferRow_2.getBytes(CassandraTableSchema.SPANS_COLUMN_NAME).andReturn(ByteBuffer.wrap("illegal bytes".getBytes))
100108
promise.failure(EasyMock.anyObject()).andReturn(promise)
101109
}
102110

103-
whenExecuting(mockReadResult, promise, failureMeter, timer, resultSet, mockSpanBufferRow_1, mockSpanBufferRow_2) {
104-
val listener = new CassandraReadRawTracesResultListener(mockReadResult, timer, failureMeter, promise)
111+
whenExecuting(mockReadResult, promise, failureMeter, tracesFailures, timer, resultSet, mockSpanBufferRow_1, mockSpanBufferRow_2) {
112+
val listener = new CassandraReadRawTracesResultListener(mockReadResult, promise, timer, failureMeter, tracesFailures, 1)
105113
listener.run()
114+
capturedMeter.getValue shouldEqual 1
106115
}
107116
}
108117

109118
it("should return an exception for empty traceId") {
110119
val mockReadResult = mock[ResultSetFuture]
111120
val resultSet = mock[ResultSet]
112-
val promise = mock[Promise[Trace]]
121+
val promise = mock[Promise[Seq[Trace]]]
113122
val failureMeter = mock[Meter]
123+
val tracesFailures = mock[Meter]
114124
val timer = mock[Timer.Context]
115125

116126
val capturedException = EasyMock.newCapture[StatusException]()
127+
val capturedMeter = EasyMock.newCapture[Int]()
117128
expecting {
118129
timer.close()
119130
failureMeter.mark()
131+
tracesFailures.mark(EasyMock.capture(capturedMeter))
120132
mockReadResult.get().andReturn(resultSet)
121133
resultSet.all().andReturn(List[Row]().asJava)
122134
promise.failure(EasyMock.capture(capturedException)).andReturn(promise)
123135
}
124136

125-
whenExecuting(mockReadResult, promise, failureMeter, timer, resultSet) {
126-
val listener = new CassandraReadTraceResultListener(mockReadResult, timer, failureMeter, promise)
137+
whenExecuting(mockReadResult, promise, failureMeter, tracesFailures, timer, resultSet) {
138+
val listener = new CassandraReadRawTracesResultListener(mockReadResult, promise, timer, failureMeter, tracesFailures, 0)
127139
listener.run()
128140
capturedException.getValue.getStatus.getCode shouldEqual Status.NOT_FOUND.getCode
141+
capturedMeter.getValue shouldEqual 0
129142
}
130143
}
131144
}

0 commit comments

Comments
 (0)