|
| 1 | +/* |
| 2 | + * Copyright 2018 Expedia, Inc. |
| 3 | + * |
| 4 | + * Licensed under the Apache License, Version 2.0 (the "License"); |
| 5 | + * you may not use this file except in compliance with the License. |
| 6 | + * You may obtain a copy of the License at |
| 7 | + * |
| 8 | + * http://www.apache.org/licenses/LICENSE-2.0 |
| 9 | + * |
| 10 | + * Unless required by applicable law or agreed to in writing, software |
| 11 | + * distributed under the License is distributed on an "AS IS" BASIS, |
| 12 | + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 13 | + * See the License for the specific language governing permissions and |
| 14 | + * limitations under the License. |
| 15 | + */ |
| 16 | + |
| 17 | +package com.expedia.www.haystack.trace.reader.readers.utils |
| 18 | + |
| 19 | +import com.expedia.open.tracing.{Span, Tag} |
| 20 | +import com.expedia.www.haystack.trace.reader.readers.utils.TagBuilders.{buildBoolTag, buildLongTag, buildStringTag} |
| 21 | +import com.expedia.www.haystack.trace.reader.readers.utils.TagExtractors.extractTagStringValue |
| 22 | + |
| 23 | +import scala.collection.JavaConverters._ |
| 24 | + |
| 25 | +object SpanMerger { |
| 26 | + |
| 27 | + def mergeParentChildSpans(parentSpan: Span, childSpan: Span): Span = { |
| 28 | + val clientSpan = if (SpanUtils.containsClientLogTag(parentSpan)) parentSpan else SpanUtils.addClientLogTag(parentSpan) |
| 29 | + val serverSpan = if (SpanUtils.containsServerLogTag(childSpan)) childSpan else SpanUtils.addServerLogTag(childSpan) |
| 30 | + merge(clientSpan, serverSpan) |
| 31 | + } |
| 32 | + |
| 33 | + def mergeSpans(spans: Seq[Span]): Span = { |
| 34 | + val serverSpanOptional = collapseSpans(spans.filter(SpanUtils.containsServerLogTag)) |
| 35 | + val clientSpanOptional = collapseSpans(spans.filter(SpanUtils.containsClientLogTag)) |
| 36 | + (clientSpanOptional, serverSpanOptional) match { |
| 37 | + // ideally there should be one server and one client span |
| 38 | + // merging these partial spans to form a new single span |
| 39 | + case (Some(clientSpan), Some(serverSpan)) => merge(clientSpan, serverSpan) |
| 40 | + |
| 41 | + // imperfect scenario, fallback to return available server span |
| 42 | + case (None, Some(serverSpan)) => serverSpan |
| 43 | + |
| 44 | + // imperfect scenario, fallback to return available client span |
| 45 | + case (Some(clientSpan), None) => clientSpan |
| 46 | + |
| 47 | + // imperfect scenario, fallback to collapse all spans |
| 48 | + case _ => collapseSpans(spans).get |
| 49 | + } |
| 50 | + } |
| 51 | + |
| 52 | + private def merge(clientSpan: Span, serverSpan: Span): Span = { |
| 53 | + Span |
| 54 | + .newBuilder(serverSpan) |
| 55 | + .setParentSpanId(clientSpan.getParentSpanId) // use the parentSpanId of the client span to stitch in the client's trace tree |
| 56 | + .addAllTags((clientSpan.getTagsList.asScala ++ auxiliaryCommonTags(clientSpan, serverSpan) ++ auxiliaryClientTags(clientSpan) ++ auxiliaryServerTags(serverSpan)).asJavaCollection) |
| 57 | + .clearLogs().addAllLogs((clientSpan.getLogsList.asScala ++ serverSpan.getLogsList.asScala.sortBy(_.getTimestamp)).asJavaCollection) |
| 58 | + .build() |
| 59 | + } |
| 60 | + |
| 61 | + // collapse all spans of a type(eg. client or server) if needed, |
| 62 | + // ideally there would be just one span in the list and hence no need of collapsing |
| 63 | + private def collapseSpans(spans: Seq[Span]): Option[Span] = { |
| 64 | + spans match { |
| 65 | + case Nil => None |
| 66 | + case Seq(span) => Some(span) |
| 67 | + case _ => |
| 68 | + // if there are multiple spans fallback to collapse all the spans in a single span |
| 69 | + // start the collapsed span from startTime of the first and end at ending of last such span |
| 70 | + // also add an error marker in the collapsed span |
| 71 | + val firstSpan = spans.minBy(_.getStartTime) |
| 72 | + val lastSpan = spans.maxBy(span => span.getStartTime + span.getDuration) |
| 73 | + val allTags = spans.flatMap(span => span.getTagsList.asScala) |
| 74 | + val allLogs = spans.flatMap(span => span.getLogsList.asScala) |
| 75 | + val opName = spans.map(_.getOperationName).reduce((a, b) => a + " & " + b) |
| 76 | + |
| 77 | + Some( |
| 78 | + Span |
| 79 | + .newBuilder(firstSpan) |
| 80 | + .setOperationName(opName) |
| 81 | + .setDuration(lastSpan.getStartTime + lastSpan.getDuration - firstSpan.getStartTime) |
| 82 | + .clearTags().addAllTags(allTags.asJava) |
| 83 | + .addTags(buildBoolTag(AuxiliaryTags.ERR_IS_MULTI_PARTIAL_SPAN, tagValue = true)) |
| 84 | + .clearLogs().addAllLogs(allLogs.asJava) |
| 85 | + .build()) |
| 86 | + } |
| 87 | + } |
| 88 | + |
| 89 | + // Network delta - difference between server and client duration |
| 90 | + // calculate only if serverDuration is smaller then client |
| 91 | + private def calculateNetworkDelta(clientSpan: Span, serverSpan: Span): Option[Long] = { |
| 92 | + val clientDuration = SpanUtils.getEventTimestamp(clientSpan, SpanMarkers.CLIENT_RECV_EVENT) - SpanUtils.getEventTimestamp(clientSpan, SpanMarkers.CLIENT_SEND_EVENT) |
| 93 | + val serverDuration = SpanUtils.getEventTimestamp(serverSpan, SpanMarkers.SERVER_SEND_EVENT) - SpanUtils.getEventTimestamp(serverSpan, SpanMarkers.SERVER_RECV_EVENT) |
| 94 | + |
| 95 | + // difference of duration of spans |
| 96 | + if (serverDuration < clientDuration) { |
| 97 | + Some(clientDuration - serverDuration) |
| 98 | + } else { |
| 99 | + None |
| 100 | + } |
| 101 | + } |
| 102 | + |
| 103 | + private def auxiliaryCommonTags(clientSpan: Span, serverSpan: Span): List[Tag] = |
| 104 | + List( |
| 105 | + buildBoolTag(AuxiliaryTags.IS_MERGED_SPAN, tagValue = true), |
| 106 | + buildLongTag(AuxiliaryTags.NETWORK_DELTA, calculateNetworkDelta(clientSpan, serverSpan).getOrElse(-1)) |
| 107 | + ) |
| 108 | + |
| 109 | + private def auxiliaryClientTags(span: Span): List[Tag] = |
| 110 | + List( |
| 111 | + buildStringTag(AuxiliaryTags.CLIENT_SERVICE_NAME, span.getServiceName), |
| 112 | + buildStringTag(AuxiliaryTags.CLIENT_OPERATION_NAME, span.getOperationName), |
| 113 | + buildStringTag(AuxiliaryTags.CLIENT_INFRASTRUCTURE_PROVIDER, extractTagStringValue(span, AuxiliaryTags.INFRASTRUCTURE_PROVIDER)), |
| 114 | + buildStringTag(AuxiliaryTags.CLIENT_INFRASTRUCTURE_LOCATION, extractTagStringValue(span, AuxiliaryTags.INFRASTRUCTURE_LOCATION)), |
| 115 | + buildLongTag(AuxiliaryTags.CLIENT_START_TIME, span.getStartTime), |
| 116 | + buildLongTag(AuxiliaryTags.CLIENT_DURATION, span.getDuration) |
| 117 | + ) |
| 118 | + |
| 119 | + private def auxiliaryServerTags(span: Span): List[Tag] = { |
| 120 | + List( |
| 121 | + buildStringTag(AuxiliaryTags.SERVER_SERVICE_NAME, span.getServiceName), |
| 122 | + buildStringTag(AuxiliaryTags.SERVER_OPERATION_NAME, span.getOperationName), |
| 123 | + buildStringTag(AuxiliaryTags.SERVER_INFRASTRUCTURE_PROVIDER, extractTagStringValue(span, AuxiliaryTags.INFRASTRUCTURE_PROVIDER)), |
| 124 | + buildStringTag(AuxiliaryTags.SERVER_INFRASTRUCTURE_LOCATION, extractTagStringValue(span, AuxiliaryTags.INFRASTRUCTURE_LOCATION)), |
| 125 | + buildLongTag(AuxiliaryTags.SERVER_START_TIME, span.getStartTime), |
| 126 | + buildLongTag(AuxiliaryTags.SERVER_DURATION, span.getDuration) |
| 127 | + ) |
| 128 | + } |
| 129 | + |
| 130 | + def isAlreadyMergedSpan(span: Span): Boolean = { |
| 131 | + span.getTagsList.asScala.exists(tag => tag.getKey.equals(AuxiliaryTags.IS_MERGED_SPAN)) |
| 132 | + } |
| 133 | +} |
0 commit comments