Skip to content

Commit 253b4e5

Browse files
Merge pull request #250 from ExpediaDotCom/multi-backend-reads
adding support to read from multiple backend stores
2 parents c0cfbb5 + c6ecf64 commit 253b4e5

File tree

13 files changed

+213
-33
lines changed

13 files changed

+213
-33
lines changed
Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,16 @@
1717

1818
package com.expedia.www.haystack.trace.commons.config.entities
1919

20+
2021
/**
2122
* defines the configuration parameters for trace-backend *
2223
* @param host : trace backend grpc hostname
2324
* @param port : trace backend grpc port
2425
*/
25-
case class TraceBackendClientConfiguration(host: String, port:Int)
26+
case class GrpcClientConfig(host: String, port: Int)
27+
28+
/**
29+
* multiple store backends
30+
* @param backends configuration of all trace store backends
31+
*/
32+
case class TraceStoreBackends(backends: Seq[GrpcClientConfig])

indexer/src/main/scala/com/expedia/www/haystack/trace/indexer/config/ProjectConfiguration.scala

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import org.apache.kafka.clients.producer.ProducerConfig
3333
import org.apache.kafka.common.serialization.{ByteArraySerializer, StringDeserializer, StringSerializer}
3434

3535
import scala.collection.JavaConverters._
36+
import scala.collection.mutable
3637
import scala.util.Try
3738

3839
class ProjectConfiguration extends AutoCloseable {
@@ -126,13 +127,19 @@ class ProjectConfiguration extends AutoCloseable {
126127
* trace backend configuration object
127128
*/
128129
val backendConfig: TraceBackendConfiguration = {
130+
val traceBackendConfig = config.getConfig("backend")
129131

132+
val grpcClients = traceBackendConfig.entrySet().asScala
133+
.map(k => StringUtils.split(k.getKey, '.')(0)).toSeq
134+
.map(cl => traceBackendConfig.getConfig(cl))
135+
.filter(cl => cl.hasPath("host") && cl.hasPath("port"))
136+
.map(cl => GrpcClientConfig(cl.getString("host"), cl.getInt("port")))
130137

131-
val traceBackendConfig = config.getConfig("backend")
132-
val clientConfig = traceBackendConfig.getConfig("client")
138+
// we dont support multiple backends for write operations
139+
require(grpcClients.size == 1)
133140

134141
TraceBackendConfiguration(
135-
TraceBackendClientConfiguration(clientConfig.getString("host"), clientConfig.getInt("port")),
142+
TraceStoreBackends(grpcClients),
136143
maxInFlightRequests = traceBackendConfig.getInt("max.inflight.requests"))
137144

138145
}

indexer/src/main/scala/com/expedia/www/haystack/trace/indexer/config/entities/TraceBackendConfiguration.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,11 @@
1717

1818
package com.expedia.www.haystack.trace.indexer.config.entities
1919

20-
import com.expedia.www.haystack.trace.commons.config.entities.TraceBackendClientConfiguration
20+
import com.expedia.www.haystack.trace.commons.config.entities.TraceStoreBackends
2121

2222
/**
2323
* @param clientConfig defines the grpc client configuration for connecting to the trace backend
2424
* @param maxInFlightRequests defines the max parallel writes to trace-backend
2525
*/
26-
case class TraceBackendConfiguration(clientConfig: TraceBackendClientConfiguration,
26+
case class TraceBackendConfiguration(clientConfig: TraceStoreBackends,
2727
maxInFlightRequests: Int)

indexer/src/main/scala/com/expedia/www/haystack/trace/indexer/writers/grpc/GrpcTraceWriter.scala

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,9 +41,12 @@ class GrpcTraceWriter(config: TraceBackendConfiguration)(implicit val dispatcher
4141
private val writeTimer = metricRegistry.timer(AppMetricNames.BACKEND_WRITE_TIME)
4242
private val writeFailures = metricRegistry.meter(AppMetricNames.BACKEND_WRITE_FAILURE)
4343

44-
private val channel = ManagedChannelBuilder.forAddress(config.clientConfig.host,config.clientConfig.port)
45-
.usePlaintext(true)
46-
.build()
44+
private val channel = {
45+
val grpcConfig = config.clientConfig.backends.head
46+
ManagedChannelBuilder.forAddress(grpcConfig.host, grpcConfig.port)
47+
.usePlaintext(true)
48+
.build()
49+
}
4750
private val client = StorageBackendGrpc.newStub(channel)
4851

4952
// this semaphore controls the parallel writes to trace-backend

indexer/src/test/scala/com/expedia/www/haystack/trace/indexer/integration/clients/GrpcTestClient.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ package com.expedia.www.haystack.trace.indexer.integration.clients
2020
import java.util.concurrent.Executors
2121

2222
import com.expedia.open.tracing.backend.{ReadSpansRequest, StorageBackendGrpc, TraceRecord}
23-
import com.expedia.www.haystack.trace.commons.config.entities.TraceBackendClientConfiguration
23+
import com.expedia.www.haystack.trace.commons.config.entities.{GrpcClientConfig, TraceStoreBackends}
2424
import com.expedia.www.haystack.trace.indexer.config.entities.TraceBackendConfiguration
2525
import com.expedia.www.haystack.trace.indexer.integration.TraceDescription
2626
import com.expedia.www.haystack.trace.storage.backends.memory.Service
@@ -44,7 +44,7 @@ class GrpcTestClient {
4444

4545

4646
def buildConfig = TraceBackendConfiguration(
47-
TraceBackendClientConfiguration("localhost", port), 10)
47+
TraceStoreBackends(Seq(GrpcClientConfig("localhost", port))), 10)
4848

4949
def queryTraces(traceDescriptions: Seq[TraceDescription]): Seq[TraceRecord] = {
5050
val traceIds = traceDescriptions.map(traceDescription => traceDescription.traceId).toList

reader/src/main/resources/config/base.conf

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,14 @@ backend {
1515
host = "localhost"
1616
port = 8090
1717
}
18+
19+
# we support multiple grpc based backends, to provide another one use something like following.
20+
# you are required to provide host and port
21+
22+
# another_client {
23+
# host = "localhost"
24+
# port = 8092
25+
# }
1826
}
1927

2028
elasticsearch {

reader/src/main/scala/com/expedia/www/haystack/trace/reader/config/ProviderConfiguration.scala

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import com.expedia.www.haystack.trace.reader.config.entities._
2525
import com.expedia.www.haystack.trace.reader.readers.transformers.{PartialSpanTransformer, SpanTreeTransformer, TraceTransformer}
2626
import com.expedia.www.haystack.trace.reader.readers.validators.TraceValidator
2727
import com.typesafe.config.Config
28+
import org.apache.commons.lang3.StringUtils
2829

2930
import scala.collection.JavaConverters._
3031
import scala.reflect.ClassTag
@@ -44,9 +45,18 @@ class ProviderConfiguration {
4445
/**
4546
* trace backend configuration object
4647
*/
47-
val traceBackendConfiguration: TraceBackendClientConfiguration = {
48-
val clientConfig = config.getConfig("backend.client")
49-
TraceBackendClientConfiguration(clientConfig.getString("host"), clientConfig.getInt("port"))
48+
val traceBackendConfiguration: TraceStoreBackends = {
49+
val traceBackendConfig = config.getConfig("backend")
50+
51+
val grpcClients = traceBackendConfig.entrySet().asScala
52+
.map(k => StringUtils.split(k.getKey, '.')(0)).toSeq
53+
.map(cl => traceBackendConfig.getConfig(cl))
54+
.filter(cl => cl.hasPath("host") && cl.hasPath("port"))
55+
.map(cl => GrpcClientConfig(cl.getString("host"), cl.getInt("port")))
56+
57+
require(grpcClients.nonEmpty)
58+
59+
TraceStoreBackends(grpcClients)
5060
}
5161

5262

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
/*
2+
* Copyright 2019 Expedia, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.expedia.www.haystack.trace.reader.readers.utils
18+
19+
import com.expedia.open.tracing.api.Trace
20+
21+
object TraceMerger {
22+
def merge(traces: Seq[Trace]): Seq[Trace] = {
23+
traces.groupBy(_.getTraceId).mapValues {
24+
seq => {
25+
if (seq.size == 1) {
26+
seq.head
27+
} else {
28+
val head = seq.head.toBuilder
29+
seq.tail.foreach(t => head.addAllChildSpans(t.getChildSpansList))
30+
head.build()
31+
}
32+
}
33+
}.values.toSeq
34+
}
35+
}

reader/src/main/scala/com/expedia/www/haystack/trace/reader/stores/EsIndexedTraceStore.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,25 +19,25 @@ package com.expedia.www.haystack.trace.reader.stores
1919
import com.expedia.open.tracing.api._
2020
import com.expedia.www.haystack.commons.metrics.MetricsSupport
2121
import com.expedia.www.haystack.trace.commons.clients.es.document.TraceIndexDoc
22-
import com.expedia.www.haystack.trace.commons.config.entities.{TraceBackendClientConfiguration, WhitelistIndexField, WhitelistIndexFieldConfiguration}
22+
import com.expedia.www.haystack.trace.commons.config.entities.{TraceStoreBackends, WhitelistIndexFieldConfiguration}
2323
import com.expedia.www.haystack.trace.reader.config.entities.ElasticSearchConfiguration
2424
import com.expedia.www.haystack.trace.reader.stores.readers.es.ElasticSearchReader
2525
import com.expedia.www.haystack.trace.reader.stores.readers.es.query.{FieldValuesQueryGenerator, ServiceMetadataQueryGenerator, TraceCountsQueryGenerator, TraceSearchQueryGenerator}
26-
import com.expedia.www.haystack.trace.reader.stores.readers.grpc.GrpcTraceReader
26+
import com.expedia.www.haystack.trace.reader.stores.readers.grpc.GrpcTraceReaders
2727
import io.searchbox.core.SearchResult
2828
import org.elasticsearch.index.IndexNotFoundException
2929
import org.slf4j.LoggerFactory
3030

3131
import scala.collection.JavaConverters._
3232
import scala.concurrent.{ExecutionContextExecutor, Future}
3333

34-
class EsIndexedTraceStore(traceBackendConfig: TraceBackendClientConfiguration,
34+
class EsIndexedTraceStore(traceStoreBackendConfig: TraceStoreBackends,
3535
elasticSearchConfiguration: ElasticSearchConfiguration,
3636
whitelistedFieldsConfiguration: WhitelistIndexFieldConfiguration)(implicit val executor: ExecutionContextExecutor)
3737
extends TraceStore with MetricsSupport with ResponseParser {
3838
private val LOGGER = LoggerFactory.getLogger(classOf[ElasticSearchReader])
3939

40-
private val traceReader: GrpcTraceReader = new GrpcTraceReader(traceBackendConfig)
40+
private val traceReader: GrpcTraceReaders = new GrpcTraceReaders(traceStoreBackendConfig)
4141
private val esReader: ElasticSearchReader = new ElasticSearchReader(elasticSearchConfiguration.clientConfiguration)
4242
private val traceSearchQueryGenerator = new TraceSearchQueryGenerator(elasticSearchConfiguration.spansIndexConfiguration, ES_NESTED_DOC_NAME, whitelistedFieldsConfiguration)
4343
private val traceCountsQueryGenerator = new TraceCountsQueryGenerator(elasticSearchConfiguration.spansIndexConfiguration, ES_NESTED_DOC_NAME, whitelistedFieldsConfiguration)
Lines changed: 46 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -19,37 +19,68 @@ package com.expedia.www.haystack.trace.reader.stores.readers.grpc
1919
import com.expedia.open.tracing.api.Trace
2020
import com.expedia.open.tracing.backend.{ReadSpansRequest, StorageBackendGrpc}
2121
import com.expedia.www.haystack.commons.metrics.MetricsSupport
22-
import com.expedia.www.haystack.trace.commons.config.entities.TraceBackendClientConfiguration
22+
import com.expedia.www.haystack.trace.commons.config.entities.TraceStoreBackends
23+
import com.expedia.www.haystack.trace.reader.exceptions.TraceNotFoundException
2324
import com.expedia.www.haystack.trace.reader.metrics.AppMetricNames
24-
import io.grpc.ManagedChannelBuilder
25+
import com.expedia.www.haystack.trace.reader.readers.utils.TraceMerger
26+
import io.grpc.{ManagedChannel, ManagedChannelBuilder}
2527
import org.slf4j.LoggerFactory
2628

2729
import scala.collection.JavaConverters._
2830
import scala.concurrent.{ExecutionContextExecutor, Future, Promise}
2931

30-
class GrpcTraceReader(config: TraceBackendClientConfiguration)
31-
(implicit val dispatcher: ExecutionContextExecutor) extends MetricsSupport with AutoCloseable {
32-
private val LOGGER = LoggerFactory.getLogger(classOf[GrpcTraceReader])
32+
class GrpcTraceReaders(config: TraceStoreBackends)
33+
(implicit val dispatcher: ExecutionContextExecutor) extends MetricsSupport with AutoCloseable {
34+
private val LOGGER = LoggerFactory.getLogger(classOf[GrpcTraceReaders])
3335

3436
private val readTimer = metricRegistry.timer(AppMetricNames.BACKEND_READ_TIME)
3537
private val readFailures = metricRegistry.meter(AppMetricNames.BACKEND_READ_FAILURES)
3638
private val tracesFailures = metricRegistry.meter(AppMetricNames.BACKEND_TRACES_FAILURE)
37-
private val channel = ManagedChannelBuilder
38-
.forAddress(config.host, config.port)
39-
.usePlaintext(true)
40-
.build()
4139

42-
val client: StorageBackendGrpc.StorageBackendFutureStub = StorageBackendGrpc.newFutureStub(channel)
40+
private val clients: Seq[GrpcChannelClient] = config.backends.map {
41+
backend => {
42+
val channel = ManagedChannelBuilder
43+
.forAddress(backend.host, backend.port)
44+
.usePlaintext(true)
45+
.build()
4346

47+
val client = StorageBackendGrpc.newFutureStub(channel)
48+
GrpcChannelClient(channel, client)
49+
}
50+
}
4451

4552
def readTraces(traceIds: List[String]): Future[Seq[Trace]] = {
53+
val allFutures = clients.map {
54+
client =>
55+
readTraces(traceIds, client.stub) recoverWith {
56+
case _: Exception => Future.successful(Seq.empty[Trace])
57+
}
58+
}
59+
60+
Future.sequence(allFutures)
61+
.map(traceSeq => traceSeq.flatten)
62+
.map {
63+
traces =>
64+
if (traces.isEmpty) throw new TraceNotFoundException() else TraceMerger.merge(traces)
65+
}
66+
}
67+
68+
private def readTraces(traceIds: List[String], client: StorageBackendGrpc.StorageBackendFutureStub): Future[Seq[Trace]] = {
4669
val timer = readTimer.time()
4770
val promise = Promise[Seq[Trace]]
4871

4972
try {
50-
val readSpansRequest = ReadSpansRequest.newBuilder().addAllTraceIds(traceIds.asJava).build()
73+
val readSpansRequest = ReadSpansRequest.newBuilder().addAllTraceIds(traceIds.asJavaCollection).build()
5174
val futureResponse = client.readSpans(readSpansRequest)
52-
futureResponse.addListener(new ReadSpansResponseListener(futureResponse, promise, timer, readFailures, tracesFailures, traceIds.size), dispatcher)
75+
futureResponse.addListener(new ReadSpansResponseListener(
76+
futureResponse,
77+
promise,
78+
timer,
79+
readFailures,
80+
tracesFailures,
81+
traceIds.size), dispatcher)
82+
83+
// return the future with the results for the given client
5384
promise.future
5485
} catch {
5586
case ex: Exception =>
@@ -61,6 +92,8 @@ class GrpcTraceReader(config: TraceBackendClientConfiguration)
6192
}
6293

6394
override def close(): Unit = {
64-
channel.shutdown()
95+
clients.foreach(_.channel.shutdown())
6596
}
97+
98+
case class GrpcChannelClient(channel: ManagedChannel, stub: StorageBackendGrpc.StorageBackendFutureStub)
6699
}

0 commit comments

Comments
 (0)