Skip to content
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package com.expedia.www.haystack.trace.commons.clients.es.document

import org.json4s.jackson.Serialization

case class ShowValuesDoc(servicename: String,
fieldname: String,
fieldvalue: String) {
val json: String = Serialization.write(this)(TraceIndexDoc.formats)
}

Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ case class WhitelistIndexField(name: String,
aliases: Set[String] = Set(),
enableRangeQuery: Boolean = false,
searchContext: String = "span",
showValue: Boolean = false,
enabled: Boolean = true)

case class WhiteListIndexFields(fields: List[WhitelistIndexField])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import com.expedia.www.haystack.trace.commons.config.entities.{IndexFieldType, W
import org.json4s.ext.EnumNameSerializer
import org.json4s.jackson.Serialization
import org.json4s.{DefaultFormats, Formats}
import org.scalatest.{Entry, FunSpec, Matchers}
import org.scalatest.{FunSpec, Matchers}

import scala.collection.JavaConverters._

Expand Down
35 changes: 35 additions & 0 deletions indexer/src/main/resources/config/base.conf
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,41 @@ service.metadata {
}
}

show.values {
enabled = true
flush {
interval.sec = 60
operation.count = 10000
}
es {
endpoint = "http://elasticsearch:9200"
conn.timeout.ms = 10000
read.timeout.ms = 5000
consistency.level = "one"
index {
# apply the template before starting the client, if json is empty, no operation is performed
template.json = "{\"template\": \"show-values\", \"index_patterns\": [\"show-values*\"], \"aliases\": {\"show-values\":{}}, \"settings\": {\"number_of_shards\": 4, \"index.mapping.ignore_malformed\": true, \"analysis\": {\"normalizer\": {\"lowercase_normalizer\": {\"type\": \"custom\", \"filter\": [\"lowercase\"]}}}}, \"mappings\": {\"metadata\": {\"_field_names\": {\"enabled\": false}, \"_all\": {\"enabled\": false}, \"properties\": {\"servicename\": {\"type\": \"keyword\", \"norms\": false, \"doc_values\": false}, \"fieldname\": {\"type\": \"keyword\", \"norms\": false, \"doc_values\": false}, \"fieldvalue\": {\"type\": \"keyword\", \"norms\": false, \"doc_values\": false}}}}}"
name = "show-values"
type = "fieldvalues-metadata"
}
# defines settings for bulk operation like max inflight bulks, number of documents and the total size in a single bulk
bulk.max {
docs {
count = 100
size.kb = 1000
}
inflight = 10
}
retries {
max = 10
backoff {
initial.ms = 100
factor = 2
}
}
}
}

elasticsearch {
endpoint = "http://elasticsearch:9200"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ object App extends MetricsSupport {
appConfig.elasticSearchConfig,
appConfig.backendConfig,
appConfig.serviceMetadataWriteConfig,
appConfig.showValuesConfig,
appConfig.indexConfig)

Runtime.getRuntime.addShutdownHook(new Thread {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import com.expedia.www.haystack.trace.indexer.processors._
import com.expedia.www.haystack.trace.indexer.processors.supplier.SpanIndexProcessorSupplier
import com.expedia.www.haystack.trace.indexer.store.SpanBufferMemoryStoreSupplier
import com.expedia.www.haystack.trace.indexer.writers.TraceWriter
import com.expedia.www.haystack.trace.indexer.writers.es.{ElasticSearchWriter, ServiceMetadataWriter}
import com.expedia.www.haystack.trace.indexer.writers.es.{ElasticSearchWriter, ServiceMetadataWriter, ShowValuesWriter}
import com.expedia.www.haystack.trace.indexer.writers.grpc.GrpcTraceWriter
import com.expedia.www.haystack.trace.indexer.writers.kafka.KafkaWriter
import org.apache.commons.lang3.StringUtils
Expand All @@ -44,6 +44,7 @@ class StreamRunner(kafkaConfig: KafkaConfiguration,
esConfig: ElasticSearchConfiguration,
traceWriteConfig: TraceBackendConfiguration,
serviceMetadataWriteConfig: ServiceMetadataWriteConfiguration,
showValuesWriteConfig: ShowValuesConfiguration,
indexConfig: WhitelistIndexFieldConfiguration) extends AutoCloseable with StateListener {

implicit private val executor: ExecutionContextExecutor = scala.concurrent.ExecutionContext.global
Expand All @@ -63,6 +64,10 @@ class StreamRunner(kafkaConfig: KafkaConfiguration,
writers += new ServiceMetadataWriter(serviceMetadataWriteConfig)
}

if (showValuesWriteConfig.enabled) {
writers += new ShowValuesWriter(showValuesWriteConfig, indexConfig)
}

if (StringUtils.isNotEmpty(kafkaConfig.produceTopic)) {
writers += new KafkaWriter(kafkaConfig.producerProps, kafkaConfig.produceTopic)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,44 @@ class ProjectConfiguration extends AutoCloseable {
)
}

/**
* show values write configuration
*/
val showValuesConfig: ShowValuesConfiguration = {
val showValuesConfig = config.getConfig("show.values")
val es = showValuesConfig.getConfig("es")
val templateJsonConfigField = "index.template.json"
val indexTemplateJson = if (es.hasPath(templateJsonConfigField)
&& StringUtils.isNotEmpty(es.getString(templateJsonConfigField))) {
Some(es.getString(templateJsonConfigField))
} else {
None
}
val username = if (es.hasPath("username")) Option(es.getString("username")) else None
val password = if (es.hasPath("password")) Option(es.getString("password")) else None
ShowValuesConfiguration(
enabled = showValuesConfig.getBoolean("enabled"),
flushIntervalInSec = showValuesConfig.getInt("flush.interval.sec"),
flushOnMaxFieldCount = showValuesConfig.getInt("flush.operation.count"),
esEndpoint = es.getString("endpoint"),
username = username,
password = password,
consistencyLevel = es.getString("consistency.level"),
indexName = es.getString("index.name"),
indexType = es.getString("index.type"),
indexTemplateJson = indexTemplateJson,
connectionTimeoutMillis = es.getInt("conn.timeout.ms"),
readTimeoutMillis = es.getInt("read.timeout.ms"),
maxInFlightBulkRequests = es.getInt("bulk.max.inflight"),
maxDocsInBulk = es.getInt("bulk.max.docs.count"),
maxBulkDocSizeInBytes = es.getInt("bulk.max.docs.size.kb") * 1000,
retryConfig = RetryOperation.Config(
es.getInt("retries.max"),
es.getLong("retries.backoff.initial.ms"),
es.getDouble("retries.backoff.factor"))
)
}

/**
*
* elastic search configuration object
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Copyright 2018 Expedia, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.expedia.www.haystack.trace.indexer.config.entities

import com.expedia.www.haystack.commons.retries.RetryOperation

/**
* Configurations for writing service metadata to elastic search
*
* @param enabled: enable writing service metadata, if its set to false, list of service_names and operation names would be fetched from elastic search traces index, which is an expensive aggregation
* @param fieldNames: list of field names to index in show values index
* @param esEndpoint: http endpoint to connect
* @param indexTemplateJson: template as json that will be applied when the app runs, this is optional * @param username
* @param password: password for the es
* @param consistencyLevel: consistency level of writes, for e.g. one, quoram
* @param indexName: name of the elastic search index where the data is written
* @param indexType: elastic search index type
* @param connectionTimeoutMillis : connection timeout in millis
* @param readTimeoutMillis: read timeout in millis
* @param maxInFlightBulkRequests: max bulk writes that can be run in parallel
* @param maxDocsInBulk: maximum number of index documents in a single bulk
* @param maxBulkDocSizeInBytes maximum size (in bytes) of a single bulk request
* @param flushIntervalInSec: interval for collecting service name operation names in memory before flushing to es
* @param flushOnMaxFieldCount: maximum number of unique operations to force flushing to es
* @param retryConfig: retry max retries limit, initial backoff and exponential factor values
*/

case class ShowValuesConfiguration(enabled: Boolean,
esEndpoint: String,
username: Option[String],
password: Option[String],
consistencyLevel: String,
indexTemplateJson: Option[String],
indexName: String,
indexType: String,
connectionTimeoutMillis: Int,
readTimeoutMillis: Int,
maxInFlightBulkRequests: Int,
maxDocsInBulk: Int,
maxBulkDocSizeInBytes: Int,
flushIntervalInSec: Int,
flushOnMaxFieldCount: Int,
retryConfig: RetryOperation.Config
) {
require(maxInFlightBulkRequests > 0)
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import scala.util.{Failure, Success, Try}

class IndexDocumentGenerator(config: WhitelistIndexFieldConfiguration) extends MetricsSupport {

type SpanDocument = mutable.Map[String, Any]
private val MIN_DURATION_FOR_TRUNCATION = TimeUnit.SECONDS.toMicros(20)

/**
Expand All @@ -44,7 +45,7 @@ class IndexDocumentGenerator(config: WhitelistIndexFieldConfiguration) extends M
// We maintain a white list of tags that are to be indexed. The whitelist is maintained as a configuration
// in an external database (outside this app boundary). However, the app periodically reads this whitelist config
// and applies it to the new spans that are read.
val spanIndices = mutable.ListBuffer[mutable.Map[String, Any]]()
val spanIndices = mutable.ListBuffer[SpanDocument]()

var traceStartTime = Long.MaxValue
var rootDuration = 0l
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package com.expedia.www.haystack.trace.indexer.writers.es

import java.time.Instant
import java.util.stream.Collectors

import com.expedia.open.tracing.{Span, Tag}
import com.expedia.www.haystack.commons.metrics.MetricsSupport
import com.expedia.www.haystack.trace.commons.clients.es.document.ShowValuesDoc
import com.expedia.www.haystack.trace.commons.config.entities.{WhitelistIndexField, WhitelistIndexFieldConfiguration}
import com.expedia.www.haystack.trace.indexer.config.entities.ShowValuesConfiguration
import org.apache.commons.lang3.StringUtils

import scala.collection.mutable
import scala.collection.JavaConverters._

class ShowValuesDocumentGenerator(config: ShowValuesConfiguration, whitelistIndexFieldConfiguration: WhitelistIndexFieldConfiguration) extends MetricsSupport {

private var showValuesMap = new mutable.HashMap[String, mutable.HashMap[String, mutable.Set[String]]]()
private var lastFlushInstant = Instant.MIN
private var fieldCount = 0

private def shouldFlush: Boolean = {
config.flushIntervalInSec == 0 || Instant.now().minusSeconds(config.flushIntervalInSec).isAfter(lastFlushInstant)
}

private def areStatementReadyToBeExecuted(): Seq[ShowValuesDoc] = {
if (showValuesMap.nonEmpty && (shouldFlush || fieldCount > config.flushOnMaxFieldCount)) {
val statements = showValuesMap.flatMap {
case (serviceName, fieldValuesMap) =>
createShowValuesDoc(serviceName, fieldValuesMap)
}

lastFlushInstant = Instant.now()
showValuesMap = new mutable.HashMap[String, mutable.HashMap[String, mutable.Set[String]]]()
fieldCount = 0
statements.toSeq
} else {
Nil
}
}

/**
* get the list of unique service metadata documents contained in the list of spans
*
* @param spans : list of service metadata
* @return
*/
def getAndUpdateShowValues(spans: Iterable[Span]): Seq[ShowValuesDoc] = {
this.synchronized {
val showValuesIndexField: List[WhitelistIndexField] = whitelistIndexFieldConfiguration.whitelistIndexFields.filter(p => p.showValue.equals(true))
spans.foreach(span => {
val tagsToSave: List[Tag] = span.getTagsList.stream()
.filter(t => showValuesIndexField.exists(p => p.name.equalsIgnoreCase(t.getKey)))
.collect(Collectors.toList[Tag]()).asScala.toList
if (StringUtils.isNotEmpty(span.getServiceName) && tagsToSave.nonEmpty) {
val serviceInfo = showValuesMap.getOrElseUpdate(span.getServiceName, mutable.HashMap[String, mutable.Set[String]]())
tagsToSave.foreach(tag => {
val tagValues = serviceInfo.getOrElseUpdate(tag.getKey, mutable.Set[String]())
if (tagValues.add(tag.getVStr)) {
fieldCount += 1
}
})
}
})
areStatementReadyToBeExecuted()
}
}

/**
* @return index document that can be put in elastic search
*/
def createShowValuesDoc(serviceName: String, fieldValuesMap: mutable.HashMap[String, mutable.Set[String]]): List[ShowValuesDoc] = {
fieldValuesMap.flatMap(p => p._2.map(values => ShowValuesDoc(serviceName, p._1, values))).toList
}

}
Loading