From 649dd6adfe8615ddd2d931f430f9cede86cb556e Mon Sep 17 00:00:00 2001 From: Raghvendra Singh Date: Wed, 7 Aug 2019 03:28:09 +0530 Subject: [PATCH 01/12] init commit --- .../clients/es/document/ShowValuesDoc.scala | 10 ++ .../WhitelistIndexFieldConfiguration.scala | 1 + indexer/src/main/resources/config/base.conf | 36 +++++ .../haystack/trace/indexer/StreamRunner.scala | 7 +- .../indexer/config/ProjectConfiguration.scala | 39 +++++ .../entities/ShowValuesConfiguration.scala | 61 ++++++++ .../ShowValuesWriteConfiguration.scala | 61 ++++++++ .../writers/es/IndexDocumentGenerator.scala | 5 +- .../es/ShowValuesDocumentGenerator.scala | 69 +++++++++ .../indexer/writers/es/ShowValuesWriter.scala | 139 ++++++++++++++++++ 10 files changed, 425 insertions(+), 3 deletions(-) create mode 100644 commons/src/main/scala/com/expedia/www/haystack/trace/commons/clients/es/document/ShowValuesDoc.scala create mode 100644 indexer/src/main/scala/com/expedia/www/haystack/trace/indexer/config/entities/ShowValuesConfiguration.scala create mode 100644 indexer/src/main/scala/com/expedia/www/haystack/trace/indexer/config/entities/ShowValuesWriteConfiguration.scala create mode 100644 indexer/src/main/scala/com/expedia/www/haystack/trace/indexer/writers/es/ShowValuesDocumentGenerator.scala create mode 100644 indexer/src/main/scala/com/expedia/www/haystack/trace/indexer/writers/es/ShowValuesWriter.scala diff --git a/commons/src/main/scala/com/expedia/www/haystack/trace/commons/clients/es/document/ShowValuesDoc.scala b/commons/src/main/scala/com/expedia/www/haystack/trace/commons/clients/es/document/ShowValuesDoc.scala new file mode 100644 index 00000000..66462554 --- /dev/null +++ b/commons/src/main/scala/com/expedia/www/haystack/trace/commons/clients/es/document/ShowValuesDoc.scala @@ -0,0 +1,10 @@ +package com.expedia.www.haystack.trace.commons.clients.es.document + +import org.json4s.jackson.Serialization + +case class ShowValuesDoc(servicename: String, + fieldname: String, + fieldvalue: String) { + val json: String = Serialization.write(this)(TraceIndexDoc.formats) +} + diff --git a/commons/src/main/scala/com/expedia/www/haystack/trace/commons/config/entities/WhitelistIndexFieldConfiguration.scala b/commons/src/main/scala/com/expedia/www/haystack/trace/commons/config/entities/WhitelistIndexFieldConfiguration.scala index 19a1ac42..c8f5eedc 100644 --- a/commons/src/main/scala/com/expedia/www/haystack/trace/commons/config/entities/WhitelistIndexFieldConfiguration.scala +++ b/commons/src/main/scala/com/expedia/www/haystack/trace/commons/config/entities/WhitelistIndexFieldConfiguration.scala @@ -40,6 +40,7 @@ case class WhitelistIndexField(name: String, aliases: Set[String] = Set(), enableRangeQuery: Boolean = false, searchContext: String = "span", + showValue: Boolean = false, enabled: Boolean = true) case class WhiteListIndexFields(fields: List[WhitelistIndexField]) diff --git a/indexer/src/main/resources/config/base.conf b/indexer/src/main/resources/config/base.conf index 03070a08..9bc48647 100644 --- a/indexer/src/main/resources/config/base.conf +++ b/indexer/src/main/resources/config/base.conf @@ -93,6 +93,42 @@ service.metadata { } } +show.values { + enabled = true + fieldnames = [ "page-name" ] + flush { + interval.sec = 60 + operation.count = 10000 + } + es { + endpoint = "http://elasticsearch:9200" + conn.timeout.ms = 10000 + read.timeout.ms = 5000 + consistency.level = "one" + index { + # apply the template before starting the client, if json is empty, no operation is performed + template.json = "{\"template\": \”show-values\", \"index_patterns\": [\”show-values*\"], \"aliases\": {\”show-values\":{}}, \"settings\": {\"number_of_shards\": 4, \"index.mapping.ignore_malformed\": true, \"analysis\": {\"normalizer\": {\"lowercase_normalizer\": {\"type\": \"custom\", \"filter\": [\"lowercase\"]}}}}, \"mappings\": {\"metadata\": {\"_field_names\": {\"enabled\": false}, \"_all\": {\"enabled\": false}, \"properties\": {\"servicename\": {\"type\": \"keyword\", \"norms\": false}, \"fieldname\": {\"type\": \"keyword\", \"norms\": false}, \"fieldvalue\": {\"type\": \"keyword\", \"norms\": false}}}}}" + name = "show-values" + type = "fieldvalues-metadata" + } + # defines settings for bulk operation like max inflight bulks, number of documents and the total size in a single bulk + bulk.max { + docs { + count = 100 + size.kb = 1000 + } + inflight = 10 + } + retries { + max = 10 + backoff { + initial.ms = 100 + factor = 2 + } + } + } +} + elasticsearch { endpoint = "http://elasticsearch:9200" diff --git a/indexer/src/main/scala/com/expedia/www/haystack/trace/indexer/StreamRunner.scala b/indexer/src/main/scala/com/expedia/www/haystack/trace/indexer/StreamRunner.scala index 5c15959d..7c3086e7 100644 --- a/indexer/src/main/scala/com/expedia/www/haystack/trace/indexer/StreamRunner.scala +++ b/indexer/src/main/scala/com/expedia/www/haystack/trace/indexer/StreamRunner.scala @@ -29,7 +29,7 @@ import com.expedia.www.haystack.trace.indexer.processors._ import com.expedia.www.haystack.trace.indexer.processors.supplier.SpanIndexProcessorSupplier import com.expedia.www.haystack.trace.indexer.store.SpanBufferMemoryStoreSupplier import com.expedia.www.haystack.trace.indexer.writers.TraceWriter -import com.expedia.www.haystack.trace.indexer.writers.es.{ElasticSearchWriter, ServiceMetadataWriter} +import com.expedia.www.haystack.trace.indexer.writers.es.{ElasticSearchWriter, ServiceMetadataWriter, ShowValuesWriter} import com.expedia.www.haystack.trace.indexer.writers.grpc.GrpcTraceWriter import com.expedia.www.haystack.trace.indexer.writers.kafka.KafkaWriter import org.apache.commons.lang3.StringUtils @@ -44,6 +44,7 @@ class StreamRunner(kafkaConfig: KafkaConfiguration, esConfig: ElasticSearchConfiguration, traceWriteConfig: TraceBackendConfiguration, serviceMetadataWriteConfig: ServiceMetadataWriteConfiguration, + showValuesWriteConfig: ShowValuesConfiguration, indexConfig: WhitelistIndexFieldConfiguration) extends AutoCloseable with StateListener { implicit private val executor: ExecutionContextExecutor = scala.concurrent.ExecutionContext.global @@ -63,6 +64,10 @@ class StreamRunner(kafkaConfig: KafkaConfiguration, writers += new ServiceMetadataWriter(serviceMetadataWriteConfig) } + if (showValuesWriteConfig.enabled) { + writers += new ShowValuesWriter(showValuesWriteConfig) + } + if (StringUtils.isNotEmpty(kafkaConfig.produceTopic)) { writers += new KafkaWriter(kafkaConfig.producerProps, kafkaConfig.produceTopic) } diff --git a/indexer/src/main/scala/com/expedia/www/haystack/trace/indexer/config/ProjectConfiguration.scala b/indexer/src/main/scala/com/expedia/www/haystack/trace/indexer/config/ProjectConfiguration.scala index 0e677b89..5236ad51 100644 --- a/indexer/src/main/scala/com/expedia/www/haystack/trace/indexer/config/ProjectConfiguration.scala +++ b/indexer/src/main/scala/com/expedia/www/haystack/trace/indexer/config/ProjectConfiguration.scala @@ -182,6 +182,45 @@ class ProjectConfiguration extends AutoCloseable { ) } + /** + * show values write configuration + */ + val showValuesConfig: ShowValuesConfiguration = { + val showValuesConfig = config.getConfig("show.values") + val es = showValuesConfig.getConfig("es") + val templateJsonConfigField = "index.template.json" + val indexTemplateJson = if (es.hasPath(templateJsonConfigField) + && StringUtils.isNotEmpty(es.getString(templateJsonConfigField))) { + Some(es.getString(templateJsonConfigField)) + } else { + None + } + val username = if (es.hasPath("username")) Option(es.getString("username")) else None + val password = if (es.hasPath("password")) Option(es.getString("password")) else None + ShowValuesConfiguration( + enabled = showValuesConfig.getBoolean("enabled"), + fieldNames = showValuesConfig.getStringList("fieldnames").asScala.toList, + flushIntervalInSec = showValuesConfig.getInt("flush.interval.sec"), + flushOnMaxFieldCount = showValuesConfig.getInt("flush.operation.count"), + esEndpoint = es.getString("endpoint"), + username = username, + password = password, + consistencyLevel = es.getString("consistency.level"), + indexName = es.getString("index.name"), + indexType = es.getString("index.type"), + indexTemplateJson = indexTemplateJson, + connectionTimeoutMillis = es.getInt("conn.timeout.ms"), + readTimeoutMillis = es.getInt("read.timeout.ms"), + maxInFlightBulkRequests = es.getInt("bulk.max.inflight"), + maxDocsInBulk = es.getInt("bulk.max.docs.count"), + maxBulkDocSizeInBytes = es.getInt("bulk.max.docs.size.kb") * 1000, + retryConfig = RetryOperation.Config( + es.getInt("retries.max"), + es.getLong("retries.backoff.initial.ms"), + es.getDouble("retries.backoff.factor")) + ) + } + /** * * elastic search configuration object diff --git a/indexer/src/main/scala/com/expedia/www/haystack/trace/indexer/config/entities/ShowValuesConfiguration.scala b/indexer/src/main/scala/com/expedia/www/haystack/trace/indexer/config/entities/ShowValuesConfiguration.scala new file mode 100644 index 00000000..b1fa2ee9 --- /dev/null +++ b/indexer/src/main/scala/com/expedia/www/haystack/trace/indexer/config/entities/ShowValuesConfiguration.scala @@ -0,0 +1,61 @@ +/* + * Copyright 2018 Expedia, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.expedia.www.haystack.trace.indexer.config.entities + +import com.expedia.www.haystack.commons.retries.RetryOperation + +/** + * Configurations for writing service metadata to elastic search + * + * @param enabled: enable writing service metadata, if its set to false, list of service_names and operation names would be fetched from elastic search traces index, which is an expensive aggregation + * @param fieldNames: list of field names to index in show values index + * @param esEndpoint: http endpoint to connect + * @param indexTemplateJson: template as json that will be applied when the app runs, this is optional * @param username + * @param password: password for the es + * @param consistencyLevel: consistency level of writes, for e.g. one, quoram + * @param indexName: name of the elastic search index where the data is written + * @param indexType: elastic search index type + * @param connectionTimeoutMillis : connection timeout in millis + * @param readTimeoutMillis: read timeout in millis + * @param maxInFlightBulkRequests: max bulk writes that can be run in parallel + * @param maxDocsInBulk: maximum number of index documents in a single bulk + * @param maxBulkDocSizeInBytes maximum size (in bytes) of a single bulk request + * @param flushIntervalInSec: interval for collecting service name operation names in memory before flushing to es + * @param flushOnMaxFieldCount: maximum number of unique operations to force flushing to es + * @param retryConfig: retry max retries limit, initial backoff and exponential factor values + */ + +case class ShowValuesConfiguration(enabled: Boolean, + fieldNames: List[String], + esEndpoint: String, + username: Option[String], + password: Option[String], + consistencyLevel: String, + indexTemplateJson: Option[String], + indexName: String, + indexType: String, + connectionTimeoutMillis: Int, + readTimeoutMillis: Int, + maxInFlightBulkRequests: Int, + maxDocsInBulk: Int, + maxBulkDocSizeInBytes: Int, + flushIntervalInSec: Int, + flushOnMaxFieldCount: Int, + retryConfig: RetryOperation.Config + ) { + require(maxInFlightBulkRequests > 0) +} diff --git a/indexer/src/main/scala/com/expedia/www/haystack/trace/indexer/config/entities/ShowValuesWriteConfiguration.scala b/indexer/src/main/scala/com/expedia/www/haystack/trace/indexer/config/entities/ShowValuesWriteConfiguration.scala new file mode 100644 index 00000000..b1fa2ee9 --- /dev/null +++ b/indexer/src/main/scala/com/expedia/www/haystack/trace/indexer/config/entities/ShowValuesWriteConfiguration.scala @@ -0,0 +1,61 @@ +/* + * Copyright 2018 Expedia, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.expedia.www.haystack.trace.indexer.config.entities + +import com.expedia.www.haystack.commons.retries.RetryOperation + +/** + * Configurations for writing service metadata to elastic search + * + * @param enabled: enable writing service metadata, if its set to false, list of service_names and operation names would be fetched from elastic search traces index, which is an expensive aggregation + * @param fieldNames: list of field names to index in show values index + * @param esEndpoint: http endpoint to connect + * @param indexTemplateJson: template as json that will be applied when the app runs, this is optional * @param username + * @param password: password for the es + * @param consistencyLevel: consistency level of writes, for e.g. one, quoram + * @param indexName: name of the elastic search index where the data is written + * @param indexType: elastic search index type + * @param connectionTimeoutMillis : connection timeout in millis + * @param readTimeoutMillis: read timeout in millis + * @param maxInFlightBulkRequests: max bulk writes that can be run in parallel + * @param maxDocsInBulk: maximum number of index documents in a single bulk + * @param maxBulkDocSizeInBytes maximum size (in bytes) of a single bulk request + * @param flushIntervalInSec: interval for collecting service name operation names in memory before flushing to es + * @param flushOnMaxFieldCount: maximum number of unique operations to force flushing to es + * @param retryConfig: retry max retries limit, initial backoff and exponential factor values + */ + +case class ShowValuesConfiguration(enabled: Boolean, + fieldNames: List[String], + esEndpoint: String, + username: Option[String], + password: Option[String], + consistencyLevel: String, + indexTemplateJson: Option[String], + indexName: String, + indexType: String, + connectionTimeoutMillis: Int, + readTimeoutMillis: Int, + maxInFlightBulkRequests: Int, + maxDocsInBulk: Int, + maxBulkDocSizeInBytes: Int, + flushIntervalInSec: Int, + flushOnMaxFieldCount: Int, + retryConfig: RetryOperation.Config + ) { + require(maxInFlightBulkRequests > 0) +} diff --git a/indexer/src/main/scala/com/expedia/www/haystack/trace/indexer/writers/es/IndexDocumentGenerator.scala b/indexer/src/main/scala/com/expedia/www/haystack/trace/indexer/writers/es/IndexDocumentGenerator.scala index ec53bffc..ec6e4a5a 100644 --- a/indexer/src/main/scala/com/expedia/www/haystack/trace/indexer/writers/es/IndexDocumentGenerator.scala +++ b/indexer/src/main/scala/com/expedia/www/haystack/trace/indexer/writers/es/IndexDocumentGenerator.scala @@ -34,6 +34,7 @@ import scala.util.{Failure, Success, Try} class IndexDocumentGenerator(config: WhitelistIndexFieldConfiguration) extends MetricsSupport { + type SpanDocument = mutable.Map[String, Any] private val MIN_DURATION_FOR_TRUNCATION = TimeUnit.SECONDS.toMicros(20) /** @@ -44,7 +45,7 @@ class IndexDocumentGenerator(config: WhitelistIndexFieldConfiguration) extends M // We maintain a white list of tags that are to be indexed. The whitelist is maintained as a configuration // in an external database (outside this app boundary). However, the app periodically reads this whitelist config // and applies it to the new spans that are read. - val spanIndices = mutable.ListBuffer[mutable.Map[String, Any]]() + val spanIndices = mutable.ListBuffer[SpanDocument]() var traceStartTime = Long.MaxValue var rootDuration = 0l @@ -89,7 +90,7 @@ class IndexDocumentGenerator(config: WhitelistIndexFieldConfiguration) extends M for (tag <- span.getTagsList.asScala; normalizedTagKey = tag.getKey.toLowerCase; - indexField = config.indexFieldMap.get(normalizedTagKey); if indexField != null && indexField.enabled; + indexField = config.indexFieldMap.get(normalizedTagKey); if indexField != null && indexField.enabled && !indexField.showValue; v = readTagValue(tag); indexableValue = transformValueForIndexing(indexField.`type`, v); if indexableValue.isDefined) { append(indexField.name, indexableValue) diff --git a/indexer/src/main/scala/com/expedia/www/haystack/trace/indexer/writers/es/ShowValuesDocumentGenerator.scala b/indexer/src/main/scala/com/expedia/www/haystack/trace/indexer/writers/es/ShowValuesDocumentGenerator.scala new file mode 100644 index 00000000..8eccce95 --- /dev/null +++ b/indexer/src/main/scala/com/expedia/www/haystack/trace/indexer/writers/es/ShowValuesDocumentGenerator.scala @@ -0,0 +1,69 @@ +package com.expedia.www.haystack.trace.indexer.writers.es + +import java.time.Instant + +import com.expedia.open.tracing.Span +import com.expedia.www.haystack.commons.metrics.MetricsSupport +import com.expedia.www.haystack.trace.commons.clients.es.document.ShowValuesDoc +import com.expedia.www.haystack.trace.indexer.config.entities.ShowValuesConfiguration +import org.apache.commons.lang3.StringUtils + +import scala.collection.mutable + +class ShowValuesDocumentGenerator(config: ShowValuesConfiguration) extends MetricsSupport { + + private var showValuesMap = new mutable.HashMap[String, mutable.Set[String]]() + private var lastFlushInstant = Instant.MIN + private val PAGE_NAME_KEY = "page-name" + private var fieldCount = 0 + + private def shouldFlush: Boolean = { + config.flushIntervalInSec == 0 || Instant.now().minusSeconds(config.flushIntervalInSec).isAfter(lastFlushInstant) + } + + private def areStatementReadyToBeExecuted(): Seq[ShowValuesDoc] = { + if (showValuesMap.nonEmpty && (shouldFlush || fieldCount > config.flushOnMaxFieldCount)) { + val statements = showValuesMap.flatMap { + case (serviceName, fieldList) => + createShowValuesDoc(serviceName, fieldList) + } + + lastFlushInstant = Instant.now() + showValuesMap = new mutable.HashMap[String, mutable.Set[String]]() + fieldCount = 0 + statements.toSeq + } else { + Nil + } + } + + /** + * get the list of unique service metadata documents contained in the list of spans + * + * @param spans : list of service metadata + * @return + */ + def getAndUpdateShowValues(spans: Iterable[Span]): Seq[ShowValuesDoc] = { + this.synchronized { + spans.foreach(span => { + val fieldInfo = span.getTagsList.stream().filter(p => p.getKey.equalsIgnoreCase(PAGE_NAME_KEY)) + if (StringUtils.isNotEmpty(span.getServiceName) && fieldInfo.count() > 0) { + val pageList = showValuesMap.getOrElseUpdate(span.getServiceName, mutable.Set[String]()) + val fieldValue = fieldInfo.findFirst().get().getVStr + if (pageList.add(fieldValue)) { + fieldCount += 1 + } + } + }) + areStatementReadyToBeExecuted() + } + } + + /** + * @return index document that can be put in elastic search + */ + def createShowValuesDoc(serviceName: String, fieldValueList: mutable.Set[String]): List[ShowValuesDoc] = { + fieldValueList.map(v => ShowValuesDoc(serviceName, PAGE_NAME_KEY, v)).toList + } + +} diff --git a/indexer/src/main/scala/com/expedia/www/haystack/trace/indexer/writers/es/ShowValuesWriter.scala b/indexer/src/main/scala/com/expedia/www/haystack/trace/indexer/writers/es/ShowValuesWriter.scala new file mode 100644 index 00000000..672aa0b7 --- /dev/null +++ b/indexer/src/main/scala/com/expedia/www/haystack/trace/indexer/writers/es/ShowValuesWriter.scala @@ -0,0 +1,139 @@ +package com.expedia.www.haystack.trace.indexer.writers.es + +import java.util.concurrent.{Semaphore, TimeUnit} + +import com.expedia.open.tracing.buffer.SpanBuffer +import com.expedia.www.haystack.commons.metrics.MetricsSupport +import com.expedia.www.haystack.trace.commons.clients.es.document.ShowValuesDoc +import com.expedia.www.haystack.commons.retries.RetryOperation.withRetryBackoff +import com.expedia.www.haystack.trace.commons.packer.PackedMessage +import com.expedia.www.haystack.trace.indexer.config.entities.ShowValuesConfiguration +import com.expedia.www.haystack.trace.indexer.metrics.AppMetricNames +import com.expedia.www.haystack.trace.indexer.writers.TraceWriter +import io.searchbox.client.{JestClient, JestClientFactory} +import io.searchbox.client.config.HttpClientConfig +import io.searchbox.core.{Bulk, Index} +import io.searchbox.indices.template.PutTemplate +import io.searchbox.params.Parameters +import org.joda.time.{DateTime, DateTimeZone} +import org.joda.time.format.DateTimeFormat +import org.slf4j.{Logger, LoggerFactory} + +import scala.collection.JavaConverters._ +import scala.util.Try + + +object ShowValuesUtils { + // creates an index name based on current date. following example illustrates the naming convention of + // elastic search indices for service metadata: + // service-metadata-2019-02-20 + def indexName(prefix: String): String = { + val eventTime = new DateTime(DateTimeZone.UTC) + val dataFormatter = DateTimeFormat.forPattern("yyyy-MM-dd") + s"$prefix-${dataFormatter.print(eventTime)}" + } +} +class ShowValuesWriter(config: ShowValuesConfiguration) extends TraceWriter with MetricsSupport { + private val LOGGER: Logger= LoggerFactory.getLogger(ShowValuesConfiguration.getClass) + + // a timer that measures the amount of time it takes to complete one bulk write + private val writeTimer = metricRegistry.timer(AppMetricNames.METADATA_WRITE_TIME) + + // meter that measures the write failures + private val failureMeter = metricRegistry.meter(AppMetricNames.METADATA_WRITE_FAILURE) + + // converts a serviceMetadata object into an indexable document + private val documentGenerator = new ShowValuesDocumentGenerator(config) + + + // this semaphore controls the parallel writes to service metadata index + private val inflightRequestsSemaphore = new Semaphore(config.maxInFlightBulkRequests, true) + + // initialize the elastic search client + private val esClient: JestClient = { + LOGGER.info("Initializing the http elastic search client with endpoint={}", config.esEndpoint) + + val factory = new JestClientFactory() + val builder = new HttpClientConfig.Builder(config.esEndpoint) + .multiThreaded(true) + .maxConnectionIdleTime(config.flushIntervalInSec + 10, TimeUnit.SECONDS) + .connTimeout(config.connectionTimeoutMillis) + .readTimeout(config.readTimeoutMillis) + + if (config.username.isDefined && config.password.isDefined) { + builder.defaultCredentials(config.username.get, config.password.get) + } + + factory.setHttpClientConfig(builder.build()) + factory.getObject + } + + private val bulkBuilder = new ThreadSafeBulkBuilder(config.maxDocsInBulk, config.maxBulkDocSizeInBytes) + + if (config.indexTemplateJson.isDefined) applyTemplate(config.indexTemplateJson.get) + + override def close(): Unit = { + LOGGER.info("Closing the elastic search client now.") + Try(esClient.shutdownClient()) + } + + /** + * converts the spans to an index document and writes to elastic search. Also if the parallel writes + * exceed the max inflight requests, then we block and this puts backpressure on upstream + * + * @param traceId trace id + * @param packedSpanBuffer list of spans belonging to this traceId - packed bytes of span buffer + * @param isLastSpanBuffer tells if this is the last record, so the writer can flush + * @return + */ + override def writeAsync(traceId: String, packedSpanBuffer: PackedMessage[SpanBuffer], isLastSpanBuffer: Boolean): Unit = { + var isSemaphoreAcquired = false + val idxDocument: Seq[ShowValuesDoc] = documentGenerator.getAndUpdateShowValues(packedSpanBuffer.protoObj.getChildSpansList.asScala) + idxDocument.foreach(document => { + try { + addIndexOperation(traceId, document, ServiceMetadataUtils.indexName(config.indexName)) match { + case Some(bulkToDispatch) => + inflightRequestsSemaphore.acquire() + isSemaphoreAcquired = true + + // execute the request async with retry + withRetryBackoff( + retryCallback => { + esClient.executeAsync(bulkToDispatch, new ElasticSearchResultHandler(writeTimer.time(), failureMeter, retryCallback)) + }, + config.retryConfig, + onSuccess = (_ : Any) => inflightRequestsSemaphore.release(), + onFailure = ex => { + inflightRequestsSemaphore.release() + LOGGER.error("Fail to write to ES after {} retry attempts", config.retryConfig.maxRetries, ex) + }) + case _ => + } + } catch { + case ex: Exception => + if (isSemaphoreAcquired) inflightRequestsSemaphore.release() + failureMeter.mark() + LOGGER.error("Failed to write spans to elastic search with exception", ex) + } + }) + } + + private def addIndexOperation(traceId: String, document: ShowValuesDoc, indexName: String): Option[Bulk] = { // add all the service field combinations in one bulk + val action: Index = new Index.Builder(document.json) + .index(indexName) + .`type`(config.indexType) + .setParameter(Parameters.CONSISTENCY, config.consistencyLevel) + .id(s"${document.servicename}_${document.fieldvalue}") + .build() + + bulkBuilder.addAction(action, document.json.getBytes("utf-8").length, forceBulkCreate = false) + } + + private def applyTemplate(templateJson: String) { + val putTemplateRequest = new PutTemplate.Builder("show-values-template", templateJson).build() + val result = esClient.execute(putTemplateRequest) + if (!result.isSucceeded) { + throw new RuntimeException(s"Fail to apply the following template to elastic search with reason=${result.getErrorMessage}") + } + } +} From b2ebbfcb6a5c0bf946c707992bfb8266176e9b9d Mon Sep 17 00:00:00 2001 From: Raghvendra Singh Date: Thu, 8 Aug 2019 15:05:15 +0530 Subject: [PATCH 02/12] fixing issues --- indexer/src/main/resources/config/base.conf | 3 +- .../www/haystack/trace/indexer/App.scala | 1 + .../haystack/trace/indexer/StreamRunner.scala | 2 +- .../indexer/config/ProjectConfiguration.scala | 1 - .../entities/ShowValuesConfiguration.scala | 61 ------------------- .../ShowValuesWriteConfiguration.scala | 1 - .../es/ShowValuesDocumentGenerator.scala | 20 +++--- .../indexer/writers/es/ShowValuesWriter.scala | 4 +- .../integration/EvictedSpanBufferSpec.scala | 3 +- .../clients/ElasticSearchTestClient.scala | 22 ++++++- 10 files changed, 41 insertions(+), 77 deletions(-) delete mode 100644 indexer/src/main/scala/com/expedia/www/haystack/trace/indexer/config/entities/ShowValuesConfiguration.scala diff --git a/indexer/src/main/resources/config/base.conf b/indexer/src/main/resources/config/base.conf index 9bc48647..e75b260d 100644 --- a/indexer/src/main/resources/config/base.conf +++ b/indexer/src/main/resources/config/base.conf @@ -95,7 +95,6 @@ service.metadata { show.values { enabled = true - fieldnames = [ "page-name" ] flush { interval.sec = 60 operation.count = 10000 @@ -107,7 +106,7 @@ show.values { consistency.level = "one" index { # apply the template before starting the client, if json is empty, no operation is performed - template.json = "{\"template\": \”show-values\", \"index_patterns\": [\”show-values*\"], \"aliases\": {\”show-values\":{}}, \"settings\": {\"number_of_shards\": 4, \"index.mapping.ignore_malformed\": true, \"analysis\": {\"normalizer\": {\"lowercase_normalizer\": {\"type\": \"custom\", \"filter\": [\"lowercase\"]}}}}, \"mappings\": {\"metadata\": {\"_field_names\": {\"enabled\": false}, \"_all\": {\"enabled\": false}, \"properties\": {\"servicename\": {\"type\": \"keyword\", \"norms\": false}, \"fieldname\": {\"type\": \"keyword\", \"norms\": false}, \"fieldvalue\": {\"type\": \"keyword\", \"norms\": false}}}}}" + template.json = "{\"template\": \”show-values\", \"index_patterns\": [\”show-values*\"], \"aliases\": {\”show-values\":{}}, \"settings\": {\"number_of_shards\": 4, \"index.mapping.ignore_malformed\": true, \"analysis\": {\"normalizer\": {\"lowercase_normalizer\": {\"type\": \"custom\", \"filter\": [\"lowercase\"]}}}}, \"mappings\": {\"metadata\": {\"_field_names\": {\"enabled\": false}, \"_all\": {\"enabled\": false}, \"properties\": {\"servicename\": {\"type\": \"keyword\", \"norms\": false, \"doc_values\": false}, \"fieldname\": {\"type\": \"keyword\", \"norms\": false, \"doc_values\": false}, \"fieldvalue\": {\"type\": \"keyword\", \"norms\": false, \"doc_values\": false}}}}}" name = "show-values" type = "fieldvalues-metadata" } diff --git a/indexer/src/main/scala/com/expedia/www/haystack/trace/indexer/App.scala b/indexer/src/main/scala/com/expedia/www/haystack/trace/indexer/App.scala index 2ca21d72..7060f4ba 100644 --- a/indexer/src/main/scala/com/expedia/www/haystack/trace/indexer/App.scala +++ b/indexer/src/main/scala/com/expedia/www/haystack/trace/indexer/App.scala @@ -44,6 +44,7 @@ object App extends MetricsSupport { appConfig.elasticSearchConfig, appConfig.backendConfig, appConfig.serviceMetadataWriteConfig, + appConfig.showValuesConfig, appConfig.indexConfig) Runtime.getRuntime.addShutdownHook(new Thread { diff --git a/indexer/src/main/scala/com/expedia/www/haystack/trace/indexer/StreamRunner.scala b/indexer/src/main/scala/com/expedia/www/haystack/trace/indexer/StreamRunner.scala index 7c3086e7..2b2e274e 100644 --- a/indexer/src/main/scala/com/expedia/www/haystack/trace/indexer/StreamRunner.scala +++ b/indexer/src/main/scala/com/expedia/www/haystack/trace/indexer/StreamRunner.scala @@ -65,7 +65,7 @@ class StreamRunner(kafkaConfig: KafkaConfiguration, } if (showValuesWriteConfig.enabled) { - writers += new ShowValuesWriter(showValuesWriteConfig) + writers += new ShowValuesWriter(showValuesWriteConfig, indexConfig) } if (StringUtils.isNotEmpty(kafkaConfig.produceTopic)) { diff --git a/indexer/src/main/scala/com/expedia/www/haystack/trace/indexer/config/ProjectConfiguration.scala b/indexer/src/main/scala/com/expedia/www/haystack/trace/indexer/config/ProjectConfiguration.scala index 5236ad51..e52dd79d 100644 --- a/indexer/src/main/scala/com/expedia/www/haystack/trace/indexer/config/ProjectConfiguration.scala +++ b/indexer/src/main/scala/com/expedia/www/haystack/trace/indexer/config/ProjectConfiguration.scala @@ -199,7 +199,6 @@ class ProjectConfiguration extends AutoCloseable { val password = if (es.hasPath("password")) Option(es.getString("password")) else None ShowValuesConfiguration( enabled = showValuesConfig.getBoolean("enabled"), - fieldNames = showValuesConfig.getStringList("fieldnames").asScala.toList, flushIntervalInSec = showValuesConfig.getInt("flush.interval.sec"), flushOnMaxFieldCount = showValuesConfig.getInt("flush.operation.count"), esEndpoint = es.getString("endpoint"), diff --git a/indexer/src/main/scala/com/expedia/www/haystack/trace/indexer/config/entities/ShowValuesConfiguration.scala b/indexer/src/main/scala/com/expedia/www/haystack/trace/indexer/config/entities/ShowValuesConfiguration.scala deleted file mode 100644 index b1fa2ee9..00000000 --- a/indexer/src/main/scala/com/expedia/www/haystack/trace/indexer/config/entities/ShowValuesConfiguration.scala +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Copyright 2018 Expedia, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.expedia.www.haystack.trace.indexer.config.entities - -import com.expedia.www.haystack.commons.retries.RetryOperation - -/** - * Configurations for writing service metadata to elastic search - * - * @param enabled: enable writing service metadata, if its set to false, list of service_names and operation names would be fetched from elastic search traces index, which is an expensive aggregation - * @param fieldNames: list of field names to index in show values index - * @param esEndpoint: http endpoint to connect - * @param indexTemplateJson: template as json that will be applied when the app runs, this is optional * @param username - * @param password: password for the es - * @param consistencyLevel: consistency level of writes, for e.g. one, quoram - * @param indexName: name of the elastic search index where the data is written - * @param indexType: elastic search index type - * @param connectionTimeoutMillis : connection timeout in millis - * @param readTimeoutMillis: read timeout in millis - * @param maxInFlightBulkRequests: max bulk writes that can be run in parallel - * @param maxDocsInBulk: maximum number of index documents in a single bulk - * @param maxBulkDocSizeInBytes maximum size (in bytes) of a single bulk request - * @param flushIntervalInSec: interval for collecting service name operation names in memory before flushing to es - * @param flushOnMaxFieldCount: maximum number of unique operations to force flushing to es - * @param retryConfig: retry max retries limit, initial backoff and exponential factor values - */ - -case class ShowValuesConfiguration(enabled: Boolean, - fieldNames: List[String], - esEndpoint: String, - username: Option[String], - password: Option[String], - consistencyLevel: String, - indexTemplateJson: Option[String], - indexName: String, - indexType: String, - connectionTimeoutMillis: Int, - readTimeoutMillis: Int, - maxInFlightBulkRequests: Int, - maxDocsInBulk: Int, - maxBulkDocSizeInBytes: Int, - flushIntervalInSec: Int, - flushOnMaxFieldCount: Int, - retryConfig: RetryOperation.Config - ) { - require(maxInFlightBulkRequests > 0) -} diff --git a/indexer/src/main/scala/com/expedia/www/haystack/trace/indexer/config/entities/ShowValuesWriteConfiguration.scala b/indexer/src/main/scala/com/expedia/www/haystack/trace/indexer/config/entities/ShowValuesWriteConfiguration.scala index b1fa2ee9..7f602bed 100644 --- a/indexer/src/main/scala/com/expedia/www/haystack/trace/indexer/config/entities/ShowValuesWriteConfiguration.scala +++ b/indexer/src/main/scala/com/expedia/www/haystack/trace/indexer/config/entities/ShowValuesWriteConfiguration.scala @@ -40,7 +40,6 @@ import com.expedia.www.haystack.commons.retries.RetryOperation */ case class ShowValuesConfiguration(enabled: Boolean, - fieldNames: List[String], esEndpoint: String, username: Option[String], password: Option[String], diff --git a/indexer/src/main/scala/com/expedia/www/haystack/trace/indexer/writers/es/ShowValuesDocumentGenerator.scala b/indexer/src/main/scala/com/expedia/www/haystack/trace/indexer/writers/es/ShowValuesDocumentGenerator.scala index 8eccce95..b191c3a9 100644 --- a/indexer/src/main/scala/com/expedia/www/haystack/trace/indexer/writers/es/ShowValuesDocumentGenerator.scala +++ b/indexer/src/main/scala/com/expedia/www/haystack/trace/indexer/writers/es/ShowValuesDocumentGenerator.scala @@ -5,14 +5,15 @@ import java.time.Instant import com.expedia.open.tracing.Span import com.expedia.www.haystack.commons.metrics.MetricsSupport import com.expedia.www.haystack.trace.commons.clients.es.document.ShowValuesDoc +import com.expedia.www.haystack.trace.commons.config.entities.WhitelistIndexFieldConfiguration import com.expedia.www.haystack.trace.indexer.config.entities.ShowValuesConfiguration import org.apache.commons.lang3.StringUtils import scala.collection.mutable -class ShowValuesDocumentGenerator(config: ShowValuesConfiguration) extends MetricsSupport { +class ShowValuesDocumentGenerator(config: ShowValuesConfiguration, whitelistIndexFieldConfiguration: WhitelistIndexFieldConfiguration) extends MetricsSupport { - private var showValuesMap = new mutable.HashMap[String, mutable.Set[String]]() + private var showValuesMap = new mutable.HashMap[String, mutable.HashMap[String, mutable.Set[String]]]() private var lastFlushInstant = Instant.MIN private val PAGE_NAME_KEY = "page-name" private var fieldCount = 0 @@ -45,14 +46,17 @@ class ShowValuesDocumentGenerator(config: ShowValuesConfiguration) extends Metri */ def getAndUpdateShowValues(spans: Iterable[Span]): Seq[ShowValuesDoc] = { this.synchronized { + val showValuesTagList = whitelistIndexFieldConfiguration.whitelistIndexFields.filter(p => p.showValue.equals(true)) spans.foreach(span => { - val fieldInfo = span.getTagsList.stream().filter(p => p.getKey.equalsIgnoreCase(PAGE_NAME_KEY)) + val tagsToSave = span.getTagsList.stream().filter(p => showValuesTagList.contains(p.getKey)) if (StringUtils.isNotEmpty(span.getServiceName) && fieldInfo.count() > 0) { - val pageList = showValuesMap.getOrElseUpdate(span.getServiceName, mutable.Set[String]()) - val fieldValue = fieldInfo.findFirst().get().getVStr - if (pageList.add(fieldValue)) { - fieldCount += 1 - } + val serviceInfo = showValuesMap.getOrElseUpdate(span.getServiceName, mutable.HashMap[String, mutable.Set[String]]()) + tagsToSave.forEach(tag => { + var tagValues = serviceInfo.getOrElseUpdate(tag.getKey, mutable.Set[String]()) + if (tagValues.add(tag.getVStr)) { + fieldCount += 1 + } + }) } }) areStatementReadyToBeExecuted() diff --git a/indexer/src/main/scala/com/expedia/www/haystack/trace/indexer/writers/es/ShowValuesWriter.scala b/indexer/src/main/scala/com/expedia/www/haystack/trace/indexer/writers/es/ShowValuesWriter.scala index 672aa0b7..aea9f2fc 100644 --- a/indexer/src/main/scala/com/expedia/www/haystack/trace/indexer/writers/es/ShowValuesWriter.scala +++ b/indexer/src/main/scala/com/expedia/www/haystack/trace/indexer/writers/es/ShowValuesWriter.scala @@ -6,6 +6,7 @@ import com.expedia.open.tracing.buffer.SpanBuffer import com.expedia.www.haystack.commons.metrics.MetricsSupport import com.expedia.www.haystack.trace.commons.clients.es.document.ShowValuesDoc import com.expedia.www.haystack.commons.retries.RetryOperation.withRetryBackoff +import com.expedia.www.haystack.trace.commons.config.entities.WhitelistIndexFieldConfiguration import com.expedia.www.haystack.trace.commons.packer.PackedMessage import com.expedia.www.haystack.trace.indexer.config.entities.ShowValuesConfiguration import com.expedia.www.haystack.trace.indexer.metrics.AppMetricNames @@ -33,7 +34,8 @@ object ShowValuesUtils { s"$prefix-${dataFormatter.print(eventTime)}" } } -class ShowValuesWriter(config: ShowValuesConfiguration) extends TraceWriter with MetricsSupport { + +class ShowValuesWriter(config: ShowValuesConfiguration, whitelistFieldConfig: WhitelistIndexFieldConfiguration) extends TraceWriter with MetricsSupport { private val LOGGER: Logger= LoggerFactory.getLogger(ShowValuesConfiguration.getClass) // a timer that measures the amount of time it takes to complete one bulk write diff --git a/indexer/src/test/scala/com/expedia/www/haystack/trace/indexer/integration/EvictedSpanBufferSpec.scala b/indexer/src/test/scala/com/expedia/www/haystack/trace/indexer/integration/EvictedSpanBufferSpec.scala index 8596d25f..58bfbca3 100644 --- a/indexer/src/test/scala/com/expedia/www/haystack/trace/indexer/integration/EvictedSpanBufferSpec.scala +++ b/indexer/src/test/scala/com/expedia/www/haystack/trace/indexer/integration/EvictedSpanBufferSpec.scala @@ -24,6 +24,7 @@ class EvictedSpanBufferSpec extends BaseIntegrationTestSpec { val indexTagsConfig = elastic.indexingConfig val backendConfig = traceBackendClient.buildConfig val serviceMetadataConfig = elastic.buildServiceMetadataConfig + val showValuesWriteConfig = elastic.buildServiceMetadataConfig val accumulatorConfig = spanAccumulatorConfig.copy(minTracesPerCache = 1, maxEntriesAllStores = 1) produceSpansAsync(MAX_CHILD_SPANS, @@ -32,7 +33,7 @@ class EvictedSpanBufferSpec extends BaseIntegrationTestSpec { 0L, accumulatorConfig.bufferingWindowMillis) When(s"kafka-streams topology is started") - val topology = new StreamRunner(kafkaConfig, accumulatorConfig, esConfig, backendConfig, serviceMetadataConfig, indexTagsConfig) + val topology = new StreamRunner(kafkaConfig, accumulatorConfig, esConfig, backendConfig, serviceMetadataConfig, showValuesWriteConfig, indexTagsConfig) topology.start() Then(s"we should get multiple span-buffers bearing only 1 span due to early eviction from store") diff --git a/indexer/src/test/scala/com/expedia/www/haystack/trace/indexer/integration/clients/ElasticSearchTestClient.scala b/indexer/src/test/scala/com/expedia/www/haystack/trace/indexer/integration/clients/ElasticSearchTestClient.scala index 3dd407af..a308bc2f 100644 --- a/indexer/src/test/scala/com/expedia/www/haystack/trace/indexer/integration/clients/ElasticSearchTestClient.scala +++ b/indexer/src/test/scala/com/expedia/www/haystack/trace/indexer/integration/clients/ElasticSearchTestClient.scala @@ -22,7 +22,7 @@ import java.util.Date import com.expedia.www.haystack.commons.retries.RetryOperation import com.expedia.www.haystack.trace.commons.config.entities._ -import com.expedia.www.haystack.trace.indexer.config.entities.{ElasticSearchConfiguration, ServiceMetadataWriteConfiguration} +import com.expedia.www.haystack.trace.indexer.config.entities.{ElasticSearchConfiguration, ServiceMetadataWriteConfiguration, ShowValuesConfiguration} import io.searchbox.client.config.HttpClientConfig import io.searchbox.client.{JestClient, JestClientFactory} import io.searchbox.core.Search @@ -97,6 +97,26 @@ class ElasticSearchTestClient { retryConfig = RetryOperation.Config(10, 250, 2)) } + def buildShowValuesConfig: ShowValuesConfiguration = { + ShowValuesConfiguration(enabled = true, + esEndpoint = ELASTIC_SEARCH_ENDPOINT, + username = None, + password = None, + consistencyLevel = "one", + indexTemplateJson = Some(SERVICE_METADATA_INDEX_TEMPLATE), + indexName = "service-metadata", + indexType = "metadata", + connectionTimeoutMillis = 3000, + readTimeoutMillis = 3000, + maxInFlightBulkRequests = 10, + maxDocsInBulk = 5, + maxBulkDocSizeInBytes = 50, + flushIntervalInSec = 10, + flushOnMaxOperationCount = 10, + retryConfig = RetryOperation.Config(10, 250, 2)) + ) + } + def indexingConfig: WhitelistIndexFieldConfiguration = { val cfg = WhitelistIndexFieldConfiguration() From 13b0a7fdf5e0598017e32919a501e90403f6140a Mon Sep 17 00:00:00 2001 From: Raghvendra Singh Date: Fri, 9 Aug 2019 08:55:47 +0530 Subject: [PATCH 03/12] fixing the tests to compile --- indexer/src/main/resources/config/base.conf | 2 +- .../writers/es/ShowValuesDocumentGenerator.scala | 12 ++++++------ .../trace/indexer/writers/es/ShowValuesWriter.scala | 2 +- .../indexer/integration/EvictedSpanBufferSpec.scala | 4 ++-- .../integration/FailedTopologyRecoverySpec.scala | 5 +++-- .../MultipleTraceIndexingTopologySpec.scala | 3 ++- .../PartialTraceIndexingTopologySpec.scala | 3 ++- .../ServiceMetadataIndexingTopologySpec.scala | 3 ++- .../clients/ElasticSearchTestClient.scala | 3 +-- 9 files changed, 20 insertions(+), 17 deletions(-) diff --git a/indexer/src/main/resources/config/base.conf b/indexer/src/main/resources/config/base.conf index e75b260d..eb34d93b 100644 --- a/indexer/src/main/resources/config/base.conf +++ b/indexer/src/main/resources/config/base.conf @@ -106,7 +106,7 @@ show.values { consistency.level = "one" index { # apply the template before starting the client, if json is empty, no operation is performed - template.json = "{\"template\": \”show-values\", \"index_patterns\": [\”show-values*\"], \"aliases\": {\”show-values\":{}}, \"settings\": {\"number_of_shards\": 4, \"index.mapping.ignore_malformed\": true, \"analysis\": {\"normalizer\": {\"lowercase_normalizer\": {\"type\": \"custom\", \"filter\": [\"lowercase\"]}}}}, \"mappings\": {\"metadata\": {\"_field_names\": {\"enabled\": false}, \"_all\": {\"enabled\": false}, \"properties\": {\"servicename\": {\"type\": \"keyword\", \"norms\": false, \"doc_values\": false}, \"fieldname\": {\"type\": \"keyword\", \"norms\": false, \"doc_values\": false}, \"fieldvalue\": {\"type\": \"keyword\", \"norms\": false, \"doc_values\": false}}}}}" + template.json = "{\"template\": \"show-values\", \"index_patterns\": [\"show-values*\"], \"aliases\": {\"show-values\":{}}, \"settings\": {\"number_of_shards\": 4, \"index.mapping.ignore_malformed\": true, \"analysis\": {\"normalizer\": {\"lowercase_normalizer\": {\"type\": \"custom\", \"filter\": [\"lowercase\"]}}}}, \"mappings\": {\"metadata\": {\"_field_names\": {\"enabled\": false}, \"_all\": {\"enabled\": false}, \"properties\": {\"servicename\": {\"type\": \"keyword\", \"norms\": false, \"doc_values\": false}, \"fieldname\": {\"type\": \"keyword\", \"norms\": false, \"doc_values\": false}, \"fieldvalue\": {\"type\": \"keyword\", \"norms\": false, \"doc_values\": false}}}}}" name = "show-values" type = "fieldvalues-metadata" } diff --git a/indexer/src/main/scala/com/expedia/www/haystack/trace/indexer/writers/es/ShowValuesDocumentGenerator.scala b/indexer/src/main/scala/com/expedia/www/haystack/trace/indexer/writers/es/ShowValuesDocumentGenerator.scala index b191c3a9..d45b19fb 100644 --- a/indexer/src/main/scala/com/expedia/www/haystack/trace/indexer/writers/es/ShowValuesDocumentGenerator.scala +++ b/indexer/src/main/scala/com/expedia/www/haystack/trace/indexer/writers/es/ShowValuesDocumentGenerator.scala @@ -25,12 +25,12 @@ class ShowValuesDocumentGenerator(config: ShowValuesConfiguration, whitelistInde private def areStatementReadyToBeExecuted(): Seq[ShowValuesDoc] = { if (showValuesMap.nonEmpty && (shouldFlush || fieldCount > config.flushOnMaxFieldCount)) { val statements = showValuesMap.flatMap { - case (serviceName, fieldList) => - createShowValuesDoc(serviceName, fieldList) + case (serviceName, fieldValuesMap) => + createShowValuesDoc(serviceName, fieldValuesMap) } lastFlushInstant = Instant.now() - showValuesMap = new mutable.HashMap[String, mutable.Set[String]]() + showValuesMap = new mutable.HashMap[String, mutable.HashMap[String, mutable.Set[String]]]() fieldCount = 0 statements.toSeq } else { @@ -49,7 +49,7 @@ class ShowValuesDocumentGenerator(config: ShowValuesConfiguration, whitelistInde val showValuesTagList = whitelistIndexFieldConfiguration.whitelistIndexFields.filter(p => p.showValue.equals(true)) spans.foreach(span => { val tagsToSave = span.getTagsList.stream().filter(p => showValuesTagList.contains(p.getKey)) - if (StringUtils.isNotEmpty(span.getServiceName) && fieldInfo.count() > 0) { + if (StringUtils.isNotEmpty(span.getServiceName) && tagsToSave.count > 0) { val serviceInfo = showValuesMap.getOrElseUpdate(span.getServiceName, mutable.HashMap[String, mutable.Set[String]]()) tagsToSave.forEach(tag => { var tagValues = serviceInfo.getOrElseUpdate(tag.getKey, mutable.Set[String]()) @@ -66,8 +66,8 @@ class ShowValuesDocumentGenerator(config: ShowValuesConfiguration, whitelistInde /** * @return index document that can be put in elastic search */ - def createShowValuesDoc(serviceName: String, fieldValueList: mutable.Set[String]): List[ShowValuesDoc] = { - fieldValueList.map(v => ShowValuesDoc(serviceName, PAGE_NAME_KEY, v)).toList + def createShowValuesDoc(serviceName: String, fieldValuesMap: mutable.HashMap[String, mutable.Set[String]]): List[ShowValuesDoc] = { + fieldValuesMap.flatMap(p => p._2.map(values => ShowValuesDoc(serviceName, p._1, values))).toList } } diff --git a/indexer/src/main/scala/com/expedia/www/haystack/trace/indexer/writers/es/ShowValuesWriter.scala b/indexer/src/main/scala/com/expedia/www/haystack/trace/indexer/writers/es/ShowValuesWriter.scala index aea9f2fc..9facab84 100644 --- a/indexer/src/main/scala/com/expedia/www/haystack/trace/indexer/writers/es/ShowValuesWriter.scala +++ b/indexer/src/main/scala/com/expedia/www/haystack/trace/indexer/writers/es/ShowValuesWriter.scala @@ -45,7 +45,7 @@ class ShowValuesWriter(config: ShowValuesConfiguration, whitelistFieldConfig: W private val failureMeter = metricRegistry.meter(AppMetricNames.METADATA_WRITE_FAILURE) // converts a serviceMetadata object into an indexable document - private val documentGenerator = new ShowValuesDocumentGenerator(config) + private val documentGenerator = new ShowValuesDocumentGenerator(config, whitelistFieldConfig) // this semaphore controls the parallel writes to service metadata index diff --git a/indexer/src/test/scala/com/expedia/www/haystack/trace/indexer/integration/EvictedSpanBufferSpec.scala b/indexer/src/test/scala/com/expedia/www/haystack/trace/indexer/integration/EvictedSpanBufferSpec.scala index 58bfbca3..36a81d3c 100644 --- a/indexer/src/test/scala/com/expedia/www/haystack/trace/indexer/integration/EvictedSpanBufferSpec.scala +++ b/indexer/src/test/scala/com/expedia/www/haystack/trace/indexer/integration/EvictedSpanBufferSpec.scala @@ -24,7 +24,7 @@ class EvictedSpanBufferSpec extends BaseIntegrationTestSpec { val indexTagsConfig = elastic.indexingConfig val backendConfig = traceBackendClient.buildConfig val serviceMetadataConfig = elastic.buildServiceMetadataConfig - val showValuesWriteConfig = elastic.buildServiceMetadataConfig + val showValuesConfig = elastic.buildShowValuesConfig val accumulatorConfig = spanAccumulatorConfig.copy(minTracesPerCache = 1, maxEntriesAllStores = 1) produceSpansAsync(MAX_CHILD_SPANS, @@ -33,7 +33,7 @@ class EvictedSpanBufferSpec extends BaseIntegrationTestSpec { 0L, accumulatorConfig.bufferingWindowMillis) When(s"kafka-streams topology is started") - val topology = new StreamRunner(kafkaConfig, accumulatorConfig, esConfig, backendConfig, serviceMetadataConfig, showValuesWriteConfig, indexTagsConfig) + val topology = new StreamRunner(kafkaConfig, accumulatorConfig, esConfig, backendConfig, serviceMetadataConfig, showValuesConfig, indexTagsConfig) topology.start() Then(s"we should get multiple span-buffers bearing only 1 span due to early eviction from store") diff --git a/indexer/src/test/scala/com/expedia/www/haystack/trace/indexer/integration/FailedTopologyRecoverySpec.scala b/indexer/src/test/scala/com/expedia/www/haystack/trace/indexer/integration/FailedTopologyRecoverySpec.scala index fc07d76b..c3a02d4c 100644 --- a/indexer/src/test/scala/com/expedia/www/haystack/trace/indexer/integration/FailedTopologyRecoverySpec.scala +++ b/indexer/src/test/scala/com/expedia/www/haystack/trace/indexer/integration/FailedTopologyRecoverySpec.scala @@ -42,6 +42,7 @@ class FailedTopologyRecoverySpec extends BaseIntegrationTestSpec { val backendConfig = traceBackendClient.buildConfig val serviceMetadataConfig = elastic.buildServiceMetadataConfig val accumulatorConfig = spanAccumulatorConfig.copy(pollIntervalMillis = spanAccumulatorConfig.pollIntervalMillis * 5) + val showValuesConfig = elastic.buildShowValuesConfig val startTimestamp = System.currentTimeMillis() produceSpansAsync( MAX_CHILD_SPANS_PER_TRACE, @@ -51,7 +52,7 @@ class FailedTopologyRecoverySpec extends BaseIntegrationTestSpec { spanAccumulatorConfig.bufferingWindowMillis) When(s"kafka-streams topology is started and then stopped forcefully after few sec") - var topology = new StreamRunner(kafkaConfig, accumulatorConfig, esConfig, backendConfig, serviceMetadataConfig, indexTagsConfig) + var topology = new StreamRunner(kafkaConfig, accumulatorConfig, esConfig, backendConfig, serviceMetadataConfig, showValuesConfig, indexTagsConfig) topology.start() Thread.sleep(7000) topology.close() @@ -60,7 +61,7 @@ class FailedTopologyRecoverySpec extends BaseIntegrationTestSpec { Thread.sleep(6000) Then(s"on restart of the topology, we should be able to read complete trace created in previous run from the '${kafka.OUTPUT_TOPIC}' topic in kafka, trace-backend and elasticsearch") - topology = new StreamRunner(kafkaConfig, accumulatorConfig, esConfig, backendConfig, serviceMetadataConfig, indexTagsConfig) + topology = new StreamRunner(kafkaConfig, accumulatorConfig, esConfig, backendConfig, serviceMetadataConfig, showValuesConfig, indexTagsConfig) topology.start() // produce one more span record with same traceId to trigger punctuate diff --git a/indexer/src/test/scala/com/expedia/www/haystack/trace/indexer/integration/MultipleTraceIndexingTopologySpec.scala b/indexer/src/test/scala/com/expedia/www/haystack/trace/indexer/integration/MultipleTraceIndexingTopologySpec.scala index a9716646..5a4e6d57 100644 --- a/indexer/src/test/scala/com/expedia/www/haystack/trace/indexer/integration/MultipleTraceIndexingTopologySpec.scala +++ b/indexer/src/test/scala/com/expedia/www/haystack/trace/indexer/integration/MultipleTraceIndexingTopologySpec.scala @@ -42,6 +42,7 @@ class MultipleTraceIndexingTopologySpec extends BaseIntegrationTestSpec { val indexTagsConfig = elastic.indexingConfig val backendConfig = traceBackendClient.buildConfig val serviceMetadataConfig = elastic.buildServiceMetadataConfig + val showValuesConfig = elastic.buildShowValuesConfig When(s"spans are produced in '${kafka.INPUT_TOPIC}' topic async, and kafka-streams topology is started") val traceDescriptions = List(TraceDescription(TRACE_ID_5, SPAN_ID_PREFIX_2),TraceDescription(TRACE_ID_9, SPAN_ID_PREFIX_1)) @@ -52,7 +53,7 @@ class MultipleTraceIndexingTopologySpec extends BaseIntegrationTestSpec { startRecordTimestamp = 0, maxRecordTimestamp = spanAccumulatorConfig.bufferingWindowMillis) - val topology = new StreamRunner(kafkaConfig, spanAccumulatorConfig, esConfig, backendConfig, serviceMetadataConfig, indexTagsConfig) + val topology = new StreamRunner(kafkaConfig, spanAccumulatorConfig, esConfig, backendConfig, serviceMetadataConfig, showValuesConfig, indexTagsConfig) topology.start() Then(s"we should read two span buffers with different traceIds from '${kafka.OUTPUT_TOPIC}' topic and same should be read from trace-backend and elastic search") diff --git a/indexer/src/test/scala/com/expedia/www/haystack/trace/indexer/integration/PartialTraceIndexingTopologySpec.scala b/indexer/src/test/scala/com/expedia/www/haystack/trace/indexer/integration/PartialTraceIndexingTopologySpec.scala index ca484104..b7e03cdb 100644 --- a/indexer/src/test/scala/com/expedia/www/haystack/trace/indexer/integration/PartialTraceIndexingTopologySpec.scala +++ b/indexer/src/test/scala/com/expedia/www/haystack/trace/indexer/integration/PartialTraceIndexingTopologySpec.scala @@ -40,6 +40,7 @@ class PartialTraceIndexingTopologySpec extends BaseIntegrationTestSpec { val indexTagsConfig = elastic.indexingConfig val backendConfig = traceBackendClient.buildConfig val serviceMetadataConfig = elastic.buildServiceMetadataConfig + val showValuesConfig = elastic.buildShowValuesConfig val traceDescription = List(TraceDescription(TRACE_ID, SPAN_ID_PREFIX)) When(s"spans are produced in '${kafka.INPUT_TOPIC}' topic async, and kafka-streams topology is started") @@ -50,7 +51,7 @@ class PartialTraceIndexingTopologySpec extends BaseIntegrationTestSpec { 0L, spanAccumulatorConfig.bufferingWindowMillis) - val topology = new StreamRunner(kafkaConfig, spanAccumulatorConfig, esConfig, backendConfig, serviceMetadataConfig, indexTagsConfig) + val topology = new StreamRunner(kafkaConfig, spanAccumulatorConfig, esConfig, backendConfig, serviceMetadataConfig, showValuesConfig, indexTagsConfig) topology.start() Then(s"we should read one span buffer object from '${kafka.OUTPUT_TOPIC}' topic and the same should be searchable in trace-backend and elastic search") diff --git a/indexer/src/test/scala/com/expedia/www/haystack/trace/indexer/integration/ServiceMetadataIndexingTopologySpec.scala b/indexer/src/test/scala/com/expedia/www/haystack/trace/indexer/integration/ServiceMetadataIndexingTopologySpec.scala index 5e7ba693..c4824ee7 100644 --- a/indexer/src/test/scala/com/expedia/www/haystack/trace/indexer/integration/ServiceMetadataIndexingTopologySpec.scala +++ b/indexer/src/test/scala/com/expedia/www/haystack/trace/indexer/integration/ServiceMetadataIndexingTopologySpec.scala @@ -41,6 +41,7 @@ class ServiceMetadataIndexingTopologySpec extends BaseIntegrationTestSpec { val indexTagsConfig = elastic.indexingConfig val backendConfig = traceBackendClient.buildConfig val serviceMetadataConfig = elastic.buildServiceMetadataConfig + val showValuesConfig = elastic.buildShowValuesConfig When(s"spans are produced in '${kafka.INPUT_TOPIC}' topic async, and kafka-streams topology is started") val traceDescriptions = List(TraceDescription(TRACE_ID_6, SPAN_ID_PREFIX_1), TraceDescription(TRACE_ID_7, SPAN_ID_PREFIX_2)) @@ -51,7 +52,7 @@ class ServiceMetadataIndexingTopologySpec extends BaseIntegrationTestSpec { 0, spanAccumulatorConfig.bufferingWindowMillis) - val topology = new StreamRunner(kafkaConfig, spanAccumulatorConfig, esConfig, backendConfig, serviceMetadataConfig, indexTagsConfig) + val topology = new StreamRunner(kafkaConfig, spanAccumulatorConfig, esConfig, backendConfig, serviceMetadataConfig, showValuesConfig, indexTagsConfig) topology.start() Then(s"we should read two multiple service operation combinations in elastic search") diff --git a/indexer/src/test/scala/com/expedia/www/haystack/trace/indexer/integration/clients/ElasticSearchTestClient.scala b/indexer/src/test/scala/com/expedia/www/haystack/trace/indexer/integration/clients/ElasticSearchTestClient.scala index a308bc2f..45becb67 100644 --- a/indexer/src/test/scala/com/expedia/www/haystack/trace/indexer/integration/clients/ElasticSearchTestClient.scala +++ b/indexer/src/test/scala/com/expedia/www/haystack/trace/indexer/integration/clients/ElasticSearchTestClient.scala @@ -112,9 +112,8 @@ class ElasticSearchTestClient { maxDocsInBulk = 5, maxBulkDocSizeInBytes = 50, flushIntervalInSec = 10, - flushOnMaxOperationCount = 10, + flushOnMaxFieldCount = 10, retryConfig = RetryOperation.Config(10, 250, 2)) - ) } From fba29d80ec2779acf75a90a68eda196ca414aab7 Mon Sep 17 00:00:00 2001 From: Raghvendra Singh Date: Fri, 9 Aug 2019 11:50:23 +0530 Subject: [PATCH 04/12] fixing tests --- .../writers/es/IndexDocumentGenerator.scala | 2 +- indexer/src/test/resources/config/base.conf | 35 +++++++++++++++++++ .../clients/ElasticSearchTestClient.scala | 4 +-- 3 files changed, 38 insertions(+), 3 deletions(-) diff --git a/indexer/src/main/scala/com/expedia/www/haystack/trace/indexer/writers/es/IndexDocumentGenerator.scala b/indexer/src/main/scala/com/expedia/www/haystack/trace/indexer/writers/es/IndexDocumentGenerator.scala index ec6e4a5a..4df155d3 100644 --- a/indexer/src/main/scala/com/expedia/www/haystack/trace/indexer/writers/es/IndexDocumentGenerator.scala +++ b/indexer/src/main/scala/com/expedia/www/haystack/trace/indexer/writers/es/IndexDocumentGenerator.scala @@ -90,7 +90,7 @@ class IndexDocumentGenerator(config: WhitelistIndexFieldConfiguration) extends M for (tag <- span.getTagsList.asScala; normalizedTagKey = tag.getKey.toLowerCase; - indexField = config.indexFieldMap.get(normalizedTagKey); if indexField != null && indexField.enabled && !indexField.showValue; + indexField = config.indexFieldMap.get(normalizedTagKey); if indexField != null && indexField.enabled; v = readTagValue(tag); indexableValue = transformValueForIndexing(indexField.`type`, v); if indexableValue.isDefined) { append(indexField.name, indexableValue) diff --git a/indexer/src/test/resources/config/base.conf b/indexer/src/test/resources/config/base.conf index 753a6ab0..01b2fb81 100644 --- a/indexer/src/test/resources/config/base.conf +++ b/indexer/src/test/resources/config/base.conf @@ -88,6 +88,41 @@ service.metadata { } } +show.values { + enabled = true + flush { + interval.sec = 60 + operation.count = 10000 + } + es { + endpoint = "http://elasticsearch:9200" + conn.timeout.ms = 10000 + read.timeout.ms = 5000 + consistency.level = "one" + index { + # apply the template before starting the client, if json is empty, no operation is performed + template.json = "{\"template\": \"show-values\", \"index_patterns\": [\"show-values*\"], \"aliases\": {\"show-values\":{}}, \"settings\": {\"number_of_shards\": 4, \"index.mapping.ignore_malformed\": true, \"analysis\": {\"normalizer\": {\"lowercase_normalizer\": {\"type\": \"custom\", \"filter\": [\"lowercase\"]}}}}, \"mappings\": {\"metadata\": {\"_field_names\": {\"enabled\": false}, \"_all\": {\"enabled\": false}, \"properties\": {\"servicename\": {\"type\": \"keyword\", \"norms\": false, \"doc_values\": false}, \"fieldname\": {\"type\": \"keyword\", \"norms\": false, \"doc_values\": false}, \"fieldvalue\": {\"type\": \"keyword\", \"norms\": false, \"doc_values\": false}}}}}" + name = "show-values" + type = "fieldvalues-metadata" + } + # defines settings for bulk operation like max inflight bulks, number of documents and the total size in a single bulk + bulk.max { + docs { + count = 100 + size.kb = 1000 + } + inflight = 10 + } + retries { + max = 10 + backoff { + initial.ms = 100 + factor = 2 + } + } + } +} + elasticsearch { endpoint = "http://elasticsearch:9200" max.inflight.requests = 50 diff --git a/indexer/src/test/scala/com/expedia/www/haystack/trace/indexer/integration/clients/ElasticSearchTestClient.scala b/indexer/src/test/scala/com/expedia/www/haystack/trace/indexer/integration/clients/ElasticSearchTestClient.scala index 45becb67..13e5c45e 100644 --- a/indexer/src/test/scala/com/expedia/www/haystack/trace/indexer/integration/clients/ElasticSearchTestClient.scala +++ b/indexer/src/test/scala/com/expedia/www/haystack/trace/indexer/integration/clients/ElasticSearchTestClient.scala @@ -104,8 +104,8 @@ class ElasticSearchTestClient { password = None, consistencyLevel = "one", indexTemplateJson = Some(SERVICE_METADATA_INDEX_TEMPLATE), - indexName = "service-metadata", - indexType = "metadata", + indexName = "show-values", + indexType = "fieldvalues-metadata", connectionTimeoutMillis = 3000, readTimeoutMillis = 3000, maxInFlightBulkRequests = 10, From 1ac6d32b3dfcfee7659915ab01e68458be068632 Mon Sep 17 00:00:00 2001 From: Raghvendra Singh Date: Fri, 9 Aug 2019 16:03:25 +0530 Subject: [PATCH 05/12] updating config and tests --- indexer/src/test/resources/config/base.conf | 2 +- .../unit/ConfigurationLoaderSpec.scala | 20 +++++++++++++++++++ 2 files changed, 21 insertions(+), 1 deletion(-) diff --git a/indexer/src/test/resources/config/base.conf b/indexer/src/test/resources/config/base.conf index 01b2fb81..ad2234e3 100644 --- a/indexer/src/test/resources/config/base.conf +++ b/indexer/src/test/resources/config/base.conf @@ -101,7 +101,7 @@ show.values { consistency.level = "one" index { # apply the template before starting the client, if json is empty, no operation is performed - template.json = "{\"template\": \"show-values\", \"index_patterns\": [\"show-values*\"], \"aliases\": {\"show-values\":{}}, \"settings\": {\"number_of_shards\": 4, \"index.mapping.ignore_malformed\": true, \"analysis\": {\"normalizer\": {\"lowercase_normalizer\": {\"type\": \"custom\", \"filter\": [\"lowercase\"]}}}}, \"mappings\": {\"metadata\": {\"_field_names\": {\"enabled\": false}, \"_all\": {\"enabled\": false}, \"properties\": {\"servicename\": {\"type\": \"keyword\", \"norms\": false, \"doc_values\": false}, \"fieldname\": {\"type\": \"keyword\", \"norms\": false, \"doc_values\": false}, \"fieldvalue\": {\"type\": \"keyword\", \"norms\": false, \"doc_values\": false}}}}}" + template.json = "some_template_json" name = "show-values" type = "fieldvalues-metadata" } diff --git a/indexer/src/test/scala/com/expedia/www/haystack/trace/indexer/unit/ConfigurationLoaderSpec.scala b/indexer/src/test/scala/com/expedia/www/haystack/trace/indexer/unit/ConfigurationLoaderSpec.scala index d569c59c..9080af53 100644 --- a/indexer/src/test/scala/com/expedia/www/haystack/trace/indexer/unit/ConfigurationLoaderSpec.scala +++ b/indexer/src/test/scala/com/expedia/www/haystack/trace/indexer/unit/ConfigurationLoaderSpec.scala @@ -108,5 +108,25 @@ class ConfigurationLoaderSpec extends FunSpec with Matchers { elastic.indexHourBucket shouldBe 6 elastic.maxConnectionsPerRoute shouldBe 10 } + + it("should load the show values config from base.conf") { + val config = project.showValuesConfig + config.flushIntervalInSec shouldBe 60 + config.flushOnMaxFieldCount shouldBe 10000 + config.esEndpoint shouldBe "http://elasticsearch:9200" + config.maxInFlightBulkRequests shouldBe 10 + config.maxDocsInBulk shouldBe 100 + config.maxBulkDocSizeInBytes shouldBe 1000000 + config.indexTemplateJson shouldBe Some("some_template_json") + config.consistencyLevel shouldBe "one" + config.readTimeoutMillis shouldBe 5000 + config.connectionTimeoutMillis shouldBe 10000 + config.indexName shouldBe "show-values" + config.indexType shouldBe "fieldvalues-metadata" + config.retryConfig.maxRetries shouldBe 10 + config.retryConfig.backOffInMillis shouldBe 100 + config.retryConfig.backoffFactor shouldBe 2 + } + } } From 1e9c414fc465b21b08bfc02db1a7ae000a853854 Mon Sep 17 00:00:00 2001 From: Raghvendra Singh Date: Tue, 13 Aug 2019 06:37:48 +0530 Subject: [PATCH 06/12] adding test cases --- .../es/ShowValuesDocumentGenerator.scala | 19 ++-- .../ShowValuesDocumentGeneratorSpec.scala | 94 +++++++++++++++++++ 2 files changed, 105 insertions(+), 8 deletions(-) create mode 100644 indexer/src/test/scala/com/expedia/www/haystack/trace/indexer/unit/ShowValuesDocumentGeneratorSpec.scala diff --git a/indexer/src/main/scala/com/expedia/www/haystack/trace/indexer/writers/es/ShowValuesDocumentGenerator.scala b/indexer/src/main/scala/com/expedia/www/haystack/trace/indexer/writers/es/ShowValuesDocumentGenerator.scala index d45b19fb..5dba9fd7 100644 --- a/indexer/src/main/scala/com/expedia/www/haystack/trace/indexer/writers/es/ShowValuesDocumentGenerator.scala +++ b/indexer/src/main/scala/com/expedia/www/haystack/trace/indexer/writers/es/ShowValuesDocumentGenerator.scala @@ -1,21 +1,22 @@ package com.expedia.www.haystack.trace.indexer.writers.es import java.time.Instant +import java.util.stream.Collectors -import com.expedia.open.tracing.Span +import com.expedia.open.tracing.{Span, Tag} import com.expedia.www.haystack.commons.metrics.MetricsSupport import com.expedia.www.haystack.trace.commons.clients.es.document.ShowValuesDoc -import com.expedia.www.haystack.trace.commons.config.entities.WhitelistIndexFieldConfiguration +import com.expedia.www.haystack.trace.commons.config.entities.{WhitelistIndexField, WhitelistIndexFieldConfiguration} import com.expedia.www.haystack.trace.indexer.config.entities.ShowValuesConfiguration import org.apache.commons.lang3.StringUtils import scala.collection.mutable +import scala.collection.JavaConverters._ class ShowValuesDocumentGenerator(config: ShowValuesConfiguration, whitelistIndexFieldConfiguration: WhitelistIndexFieldConfiguration) extends MetricsSupport { private var showValuesMap = new mutable.HashMap[String, mutable.HashMap[String, mutable.Set[String]]]() private var lastFlushInstant = Instant.MIN - private val PAGE_NAME_KEY = "page-name" private var fieldCount = 0 private def shouldFlush: Boolean = { @@ -46,13 +47,15 @@ class ShowValuesDocumentGenerator(config: ShowValuesConfiguration, whitelistInde */ def getAndUpdateShowValues(spans: Iterable[Span]): Seq[ShowValuesDoc] = { this.synchronized { - val showValuesTagList = whitelistIndexFieldConfiguration.whitelistIndexFields.filter(p => p.showValue.equals(true)) + val showValuesIndexField: List[WhitelistIndexField] = whitelistIndexFieldConfiguration.whitelistIndexFields.filter(p => p.showValue.equals(true)) spans.foreach(span => { - val tagsToSave = span.getTagsList.stream().filter(p => showValuesTagList.contains(p.getKey)) - if (StringUtils.isNotEmpty(span.getServiceName) && tagsToSave.count > 0) { + val tagsToSave: List[Tag] = span.getTagsList.stream() + .filter(t => showValuesIndexField.exists(p => p.name.equalsIgnoreCase(t.getKey))) + .collect(Collectors.toList[Tag]()).asScala.toList + if (StringUtils.isNotEmpty(span.getServiceName) && tagsToSave.nonEmpty) { val serviceInfo = showValuesMap.getOrElseUpdate(span.getServiceName, mutable.HashMap[String, mutable.Set[String]]()) - tagsToSave.forEach(tag => { - var tagValues = serviceInfo.getOrElseUpdate(tag.getKey, mutable.Set[String]()) + tagsToSave.foreach(tag => { + val tagValues = serviceInfo.getOrElseUpdate(tag.getKey, mutable.Set[String]()) if (tagValues.add(tag.getVStr)) { fieldCount += 1 } diff --git a/indexer/src/test/scala/com/expedia/www/haystack/trace/indexer/unit/ShowValuesDocumentGeneratorSpec.scala b/indexer/src/test/scala/com/expedia/www/haystack/trace/indexer/unit/ShowValuesDocumentGeneratorSpec.scala new file mode 100644 index 00000000..ad7596b4 --- /dev/null +++ b/indexer/src/test/scala/com/expedia/www/haystack/trace/indexer/unit/ShowValuesDocumentGeneratorSpec.scala @@ -0,0 +1,94 @@ +package com.expedia.www.haystack.trace.indexer.unit + +import java.util.concurrent.TimeUnit + +import com.expedia.open.tracing.{Span, Tag} +import com.expedia.www.haystack.trace.commons.clients.es.document.ShowValuesDoc +import com.expedia.www.haystack.trace.commons.config.entities.{IndexFieldType, WhiteListIndexFields, WhitelistIndexField, WhitelistIndexFieldConfiguration} +import com.expedia.www.haystack.trace.indexer.config.ProjectConfiguration +import com.expedia.www.haystack.trace.indexer.writers.es.ShowValuesDocumentGenerator +import org.json4s.ext.EnumNameSerializer +import org.json4s.jackson.Serialization +import org.json4s.{DefaultFormats, Formats} +import org.scalatest.{FunSpec, Matchers} + +import scala.collection.mutable + +class ShowValuesDocumentGeneratorSpec extends FunSpec with Matchers { + protected implicit val formats: Formats = DefaultFormats + new EnumNameSerializer(IndexFieldType) + + private val TRACE_ID = "trace_id" + private val START_TIME_1 = 1529042838469123l + private val START_TIME_2 = 1529042848469000l + private val project = new ProjectConfiguration + + private val LONG_DURATION = TimeUnit.SECONDS.toMicros(25) + TimeUnit.MICROSECONDS.toMicros(500) + + describe("Show values document generator") { + it("should create show value docs for given servicename and fieldvaluesmap") { + //Whatever is passed through Whitelistindexconfiguration is irrelevant to the generator for this test + val generator = new ShowValuesDocumentGenerator(project.showValuesConfig, WhitelistIndexFieldConfiguration()) + val serviceName = "service1" + val fieldValuesMap: mutable.HashMap[String, mutable.Set[String]] = new mutable.HashMap[String, mutable.Set[String]](); + fieldValuesMap.update("page-name", mutable.Set[String]("pageA", "pageB")) + fieldValuesMap.update("page-id", mutable.Set[String]("1", "2")) + + val docs = generator.createShowValuesDoc(serviceName, fieldValuesMap) + docs.head shouldEqual ShowValuesDoc(serviceName, "page-id", "2") + docs(1) shouldEqual ShowValuesDoc(serviceName, "page-id", "1") + docs(2) shouldEqual ShowValuesDoc(serviceName, "page-name", "pageA") + docs(3) shouldEqual ShowValuesDoc(serviceName, "page-name", "pageB") + docs.size shouldEqual 4 + } + + it("should create showvaluedocuments for given spans using the whitelistfieldconfiguration(empty) and return showValues docs accumulated if conditions met") { + val config = WhitelistIndexFieldConfiguration() + val generator = new ShowValuesDocumentGenerator(project.showValuesConfig, config) + + val span_1 = Span.newBuilder().setTraceId(TRACE_ID) + .setSpanId("span-1") + .setServiceName("service1") + .setOperationName("op1") + .setStartTime(START_TIME_1) + .setDuration(610000L) + .addTags(Tag.newBuilder().setKey("page-name").setVStr("pageA")) + .addTags(Tag.newBuilder().setKey("page-id").setVStr("1")) + .build() + val span_2 = Span.newBuilder().setTraceId(TRACE_ID) + .setSpanId("span-2") + .setServiceName("service2") + .setOperationName("op1") + .setStartTime(START_TIME_1) + .setDuration(500000L) + .addTags(Tag.newBuilder().setKey("page-name").setVStr("pageB")) + .addTags(Tag.newBuilder().setKey("page-id").setVStr("2")) + .build() + val span_3 = Span.newBuilder().setTraceId(TRACE_ID) + .setSpanId("span-3") + .setServiceName("service1") + .setDuration(LONG_DURATION) + .setStartTime(START_TIME_2) + .addTags(Tag.newBuilder().setKey("page-name").setVStr("pageB")) + .addTags(Tag.newBuilder().setKey("page-id").setVStr("2")) + .setOperationName("op3").build() + + val docs1 = generator.getAndUpdateShowValues(List(span_1, span_2, span_3)) + //Because whitelistfields are empty + docs1.size shouldEqual 0 + + val whitelistField_1 = WhitelistIndexField(name = "page-name", `type` = IndexFieldType.string, showValue = true) + val whitelistField_2 = WhitelistIndexField(name = "page-id", `type` = IndexFieldType.string, showValue = true) + + val cfgJsonData = Serialization.write(WhiteListIndexFields(List(whitelistField_1, whitelistField_2))) + + // reload with the given whitelisted fields + config.onReload(cfgJsonData) + val docs2 = generator.getAndUpdateShowValues(List(span_1, span_2, span_3)) + docs2.size shouldEqual 6 + docs2.map(d => d.servicename) should contain allOf ("service1", "service2") + docs2.map(d => d.fieldname) should contain allOf ("page-name", "page-id") + docs2.map(d => d.fieldvalue) should contain allOf ("pageA", "pageB", "1", "2") + docs2.foreach(d => List) + } + } +} From c968f9ea4d5f54966e8096fd4a6a15b2a40f1b9b Mon Sep 17 00:00:00 2001 From: Raghvendra Singh Date: Tue, 20 Aug 2019 04:11:06 +0530 Subject: [PATCH 07/12] adding int test for show values feature --- ...WhitelistIndexFieldConfigurationSpec.scala | 2 +- .../integration/BaseIntegrationTestSpec.scala | 19 +++++ .../ShowValuesIndexingTopologySpec.scala | 52 ++++++++++++++ .../clients/ElasticSearchTestClient.scala | 70 +++++++++++++++++-- 4 files changed, 137 insertions(+), 6 deletions(-) create mode 100644 indexer/src/test/scala/com/expedia/www/haystack/trace/indexer/integration/ShowValuesIndexingTopologySpec.scala diff --git a/commons/src/test/scala/com/expedia/www/haystack/trace/commons/unit/WhitelistIndexFieldConfigurationSpec.scala b/commons/src/test/scala/com/expedia/www/haystack/trace/commons/unit/WhitelistIndexFieldConfigurationSpec.scala index 24febefb..1b7c0e14 100644 --- a/commons/src/test/scala/com/expedia/www/haystack/trace/commons/unit/WhitelistIndexFieldConfigurationSpec.scala +++ b/commons/src/test/scala/com/expedia/www/haystack/trace/commons/unit/WhitelistIndexFieldConfigurationSpec.scala @@ -21,7 +21,7 @@ import com.expedia.www.haystack.trace.commons.config.entities.{IndexFieldType, W import org.json4s.ext.EnumNameSerializer import org.json4s.jackson.Serialization import org.json4s.{DefaultFormats, Formats} -import org.scalatest.{Entry, FunSpec, Matchers} +import org.scalatest.{FunSpec, Matchers} import scala.collection.JavaConverters._ diff --git a/indexer/src/test/scala/com/expedia/www/haystack/trace/indexer/integration/BaseIntegrationTestSpec.scala b/indexer/src/test/scala/com/expedia/www/haystack/trace/indexer/integration/BaseIntegrationTestSpec.scala index 51ef660e..f9bf084b 100644 --- a/indexer/src/test/scala/com/expedia/www/haystack/trace/indexer/integration/BaseIntegrationTestSpec.scala +++ b/indexer/src/test/scala/com/expedia/www/haystack/trace/indexer/integration/BaseIntegrationTestSpec.scala @@ -89,6 +89,7 @@ abstract class BaseIntegrationTestSpec extends WordSpec with GivenWhenThen with .setStartTime(System.currentTimeMillis() * 1000) .addTags(Tag.newBuilder().setKey("errorCode").setType(TagType.LONG).setVLong(404)) .addTags(Tag.newBuilder().setKey("_role").setType(TagType.STRING).setVStr("haystack")) + .addTags(Tag.newBuilder().setKey("pagename").setType(TagType.STRING).setVStr("homepage")) .addLogs(Log.newBuilder().addFields(Tag.newBuilder().setKey("exceptiontype").setType(TagType.STRING).setVStr("external").build()).build()) .build() } @@ -163,6 +164,24 @@ abstract class BaseIntegrationTestSpec extends WordSpec with GivenWhenThen with } + def verifyFields(): Unit = { + val fieldNamesQuery = + """{ + | "query" : { + | "match_all": {} + | }, + | "_source" : { + | "includes" : [ + | "fieldname", + | "fieldvalue", + | "servicename" + | ] + | } + |}""".stripMargin + val docs = elastic.queryShowValuesIndex(fieldNamesQuery) + docs.size shouldBe 5 + } + def verifyElasticSearchWrites(traceIds: Seq[String]): Unit = { val matchAllQuery = """{ diff --git a/indexer/src/test/scala/com/expedia/www/haystack/trace/indexer/integration/ShowValuesIndexingTopologySpec.scala b/indexer/src/test/scala/com/expedia/www/haystack/trace/indexer/integration/ShowValuesIndexingTopologySpec.scala new file mode 100644 index 00000000..5b3e123f --- /dev/null +++ b/indexer/src/test/scala/com/expedia/www/haystack/trace/indexer/integration/ShowValuesIndexingTopologySpec.scala @@ -0,0 +1,52 @@ +package com.expedia.www.haystack.trace.indexer.integration + +import java.util + +import com.expedia.open.tracing.buffer.SpanBuffer +import com.expedia.www.haystack.trace.indexer.StreamRunner +import org.apache.kafka.streams.KeyValue +import org.apache.kafka.streams.integration.utils.IntegrationTestUtils + +import scala.concurrent.duration._ + +class ShowValuesIndexingTopologySpec extends BaseIntegrationTestSpec { + private val MAX_CHILD_SPANS_PER_TRACE = 5 + private val TRACE_ID_6 = "traceid-6" + private val TRACE_ID_7 = "traceid-7" + private val SPAN_ID_PREFIX_1 = TRACE_ID_6 + "span-id-" + private val SPAN_ID_PREFIX_2 = TRACE_ID_7 + "span-id-" + + "Show values topology" should { + s"consume spans from input ${kafka.INPUT_TOPIC} and write ShowValueDocs for the whitelisted fields for which showValues is true" in { + Given("a set of spans with different servicenames and a project configuration") + val kafkaConfig = kafka.buildConfig + val esConfig = elastic.buildConfig + val indexTagsConfig = elastic.indexingConfig + val backendConfig = traceBackendClient.buildConfig + val serviceMetadataConfig = elastic.buildServiceMetadataConfig + val showValuesConfig = elastic.buildShowValuesConfig + + When(s"spans are produced in '${kafka.INPUT_TOPIC}' topic async, and kafka-streams topology is started") + val traceDescriptions = List(TraceDescription(TRACE_ID_6, SPAN_ID_PREFIX_1), TraceDescription(TRACE_ID_7, SPAN_ID_PREFIX_2)) + + produceSpansAsync(MAX_CHILD_SPANS_PER_TRACE, + 1.seconds, + traceDescriptions, + 0, + spanAccumulatorConfig.bufferingWindowMillis) + + val topology = new StreamRunner(kafkaConfig, spanAccumulatorConfig, esConfig, backendConfig, serviceMetadataConfig, showValuesConfig, indexTagsConfig) + topology.start() + + Then(s"we should read two multiple service operation combinations in elastic search") + try { + val result: util.List[KeyValue[String, SpanBuffer]] = + IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(kafka.RESULT_CONSUMER_CONFIG, kafka.OUTPUT_TOPIC, 2, MAX_WAIT_FOR_OUTPUT_MS) + Thread.sleep(6000) + verifyFields() + } finally { + topology.close() + } + } + } +} diff --git a/indexer/src/test/scala/com/expedia/www/haystack/trace/indexer/integration/clients/ElasticSearchTestClient.scala b/indexer/src/test/scala/com/expedia/www/haystack/trace/indexer/integration/clients/ElasticSearchTestClient.scala index 13e5c45e..56a19fb1 100644 --- a/indexer/src/test/scala/com/expedia/www/haystack/trace/indexer/integration/clients/ElasticSearchTestClient.scala +++ b/indexer/src/test/scala/com/expedia/www/haystack/trace/indexer/integration/clients/ElasticSearchTestClient.scala @@ -103,7 +103,7 @@ class ElasticSearchTestClient { username = None, password = None, consistencyLevel = "one", - indexTemplateJson = Some(SERVICE_METADATA_INDEX_TEMPLATE), + indexTemplateJson = Some(SHOW_VALUES_INDEX_TEMPLATE), indexName = "show-values", indexType = "fieldvalues-metadata", connectionTimeoutMillis = 3000, @@ -120,7 +120,10 @@ class ElasticSearchTestClient { def indexingConfig: WhitelistIndexFieldConfiguration = { val cfg = WhitelistIndexFieldConfiguration() val cfgJsonData = Serialization.write(WhiteListIndexFields( - List(WhitelistIndexField(name = "role", `type` = IndexFieldType.string, aliases = Set("_role")), WhitelistIndexField(name = "errorcode", `type` = IndexFieldType.long)))) + List( + WhitelistIndexField(name = "role", `type` = IndexFieldType.string, aliases = Set("_role")), + WhitelistIndexField(name = "errorcode", `type` = IndexFieldType.long), + WhitelistIndexField(name = "pagename", `type` = IndexFieldType.long, showValue = true)))) cfg.onReload(cfgJsonData) cfg } @@ -140,13 +143,23 @@ class ElasticSearchTestClient { } } + def queryShowValuesIndex(query: String): List[String] = { + val SHOW_VALUES_INDEX_NAME = "show-values" + val SHOW_VALUES_INDEX_TYPE = "fieldvalues-metadata" + queryIndex(query, SHOW_VALUES_INDEX_NAME, SHOW_VALUES_INDEX_TYPE) + } + def queryServiceMetadataIndex(query: String): List[String] = { - import scala.collection.JavaConverters._ val SERVICE_METADATA_INDEX_NAME = "service-metadata" val SERVICE_METADATA_INDEX_TYPE = "metadata" + queryIndex(query, SERVICE_METADATA_INDEX_NAME, SERVICE_METADATA_INDEX_TYPE) + } + + def queryIndex(query: String, index: String, index_type: String): List[String] = { + import scala.collection.JavaConverters._ val searchQuery = new Search.Builder(query) - .addIndex(SERVICE_METADATA_INDEX_NAME) - .addType(SERVICE_METADATA_INDEX_TYPE) + .addIndex(index) + .addType(index_type) .build() val result = esClient.execute(searchQuery) if (result.getSourceAsStringList != null && result.getSourceAsStringList.size() > 0) { @@ -229,6 +242,53 @@ class ElasticSearchTestClient { |} |""".stripMargin + private val SHOW_VALUES_INDEX_TEMPLATE = + """{ + |"template": "show-values*", + |"aliases": { + | "show-values":{} + |}, + |"settings": { + | "number_of_shards": 4, + | "index.mapping.ignore_malformed": true, + | "analysis": { + | "normalizer": { + | "lowercase_normalizer": { + | "type": "custom", + | "filter": ["lowercase"] + | } + | } + | } + |}, + |"mappings": { + | "fieldvalues-metadata": { + | "_field_names": { + | "enabled": false + | }, + | "_all": { + | "enabled": false + | }, + | "properties": { + | "servicename": { + | "type": "keyword", + | "norms": false, + | "doc_values": false + | }, + | "fieldname": { + | "type": "keyword", + | "norms": false, + | "doc_values": false + | }, + | "fieldvalue": { + | "type": "keyword", + | "norms": false, + | "doc_values": false + | } + | } + | } + |} + |}""".stripMargin + private val SERVICE_METADATA_INDEX_TEMPLATE = """{ | "template": "service-metadata*", From 3d3683a42a91b9eff4ceb9a6251c77cff37cbcd2 Mon Sep 17 00:00:00 2001 From: Raghvendra Singh Date: Tue, 20 Aug 2019 12:38:02 +0530 Subject: [PATCH 08/12] fixing a test case --- .../trace/indexer/integration/BaseIntegrationTestSpec.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/indexer/src/test/scala/com/expedia/www/haystack/trace/indexer/integration/BaseIntegrationTestSpec.scala b/indexer/src/test/scala/com/expedia/www/haystack/trace/indexer/integration/BaseIntegrationTestSpec.scala index f9bf084b..8b29840a 100644 --- a/indexer/src/test/scala/com/expedia/www/haystack/trace/indexer/integration/BaseIntegrationTestSpec.scala +++ b/indexer/src/test/scala/com/expedia/www/haystack/trace/indexer/integration/BaseIntegrationTestSpec.scala @@ -179,7 +179,7 @@ abstract class BaseIntegrationTestSpec extends WordSpec with GivenWhenThen with | } |}""".stripMargin val docs = elastic.queryShowValuesIndex(fieldNamesQuery) - docs.size shouldBe 5 + docs.size shouldBe 6 } def verifyElasticSearchWrites(traceIds: Seq[String]): Unit = { From 118f085898f9885b89de891c7ebe924502f4a1f3 Mon Sep 17 00:00:00 2001 From: Raghvendra Singh Date: Fri, 23 Aug 2019 16:49:02 +0530 Subject: [PATCH 09/12] making reader changes --- reader/src/main/resources/config/base.conf | 5 ++ .../reader/config/ProviderConfiguration.scala | 11 +++- .../entities/ElasticSearchConfiguration.scala | 7 +- .../reader/stores/EsIndexedTraceStore.scala | 21 ++++-- .../es/query/ShowValuesQueryGenerator.scala | 64 +++++++++++++++++++ reader/src/test/resources/config/base.conf | 5 ++ .../integration/BaseIntegrationTestSpec.scala | 12 +++- .../TraceServiceIntegrationTestSpec.scala | 19 ++++++ .../unit/config/ConfigurationLoaderSpec.scala | 4 ++ .../query/ShowValuesQueryGeneratorSpec.scala | 46 +++++++++++++ 10 files changed, 185 insertions(+), 9 deletions(-) create mode 100644 reader/src/main/scala/com/expedia/www/haystack/trace/reader/stores/readers/es/query/ShowValuesQueryGenerator.scala create mode 100644 reader/src/test/scala/com/expedia/www/haystack/trace/reader/unit/stores/readers/es/query/ShowValuesQueryGeneratorSpec.scala diff --git a/reader/src/main/resources/config/base.conf b/reader/src/main/resources/config/base.conf index aeb64049..f8b6200e 100644 --- a/reader/src/main/resources/config/base.conf +++ b/reader/src/main/resources/config/base.conf @@ -44,6 +44,11 @@ elasticsearch { name = "service-metadata" type = "metadata" } + show.values { + enabled = true + name = "show-values" + type = "fieldvalues-metadata" + } } } diff --git a/reader/src/main/scala/com/expedia/www/haystack/trace/reader/config/ProviderConfiguration.scala b/reader/src/main/scala/com/expedia/www/haystack/trace/reader/config/ProviderConfiguration.scala index ec6c0c78..dd6e39dd 100644 --- a/reader/src/main/scala/com/expedia/www/haystack/trace/reader/config/ProviderConfiguration.scala +++ b/reader/src/main/scala/com/expedia/www/haystack/trace/reader/config/ProviderConfiguration.scala @@ -101,13 +101,22 @@ class ProviderConfiguration { metadataCfg.getString("name"), metadataCfg.getString("type")) } + private val showValuesIndexConfiguration: ShowValuesIndexConfiguration = { + val showValuesConf = config.getConfig("elasticsearch.index.show.values") + + ShowValuesIndexConfiguration( + showValuesConf.getBoolean("enabled"), + showValuesConf.getString("name"), + showValuesConf.getString("type")) + } val elasticSearchConfiguration: ElasticSearchConfiguration = { ElasticSearchConfiguration( clientConfiguration = elasticSearchClientConfig, spansIndexConfiguration = spansIndexConfiguration, - serviceMetadataIndexConfiguration = serviceMetadataIndexConfig + serviceMetadataIndexConfiguration = serviceMetadataIndexConfig, + showValuesIndexConfiguration = showValuesIndexConfiguration ) } diff --git a/reader/src/main/scala/com/expedia/www/haystack/trace/reader/config/entities/ElasticSearchConfiguration.scala b/reader/src/main/scala/com/expedia/www/haystack/trace/reader/config/entities/ElasticSearchConfiguration.scala index f25d87fb..fbcbce59 100644 --- a/reader/src/main/scala/com/expedia/www/haystack/trace/reader/config/entities/ElasticSearchConfiguration.scala +++ b/reader/src/main/scala/com/expedia/www/haystack/trace/reader/config/entities/ElasticSearchConfiguration.scala @@ -34,4 +34,9 @@ case class ServiceMetadataIndexConfiguration(enabled: Boolean, case class ElasticSearchConfiguration(clientConfiguration: ElasticSearchClientConfiguration, spansIndexConfiguration: SpansIndexConfiguration, - serviceMetadataIndexConfiguration: ServiceMetadataIndexConfiguration) \ No newline at end of file + serviceMetadataIndexConfiguration: ServiceMetadataIndexConfiguration, + showValuesIndexConfiguration: ShowValuesIndexConfiguration) + +case class ShowValuesIndexConfiguration(enabled: Boolean, + indexName: String, + indexType: String) \ No newline at end of file diff --git a/reader/src/main/scala/com/expedia/www/haystack/trace/reader/stores/EsIndexedTraceStore.scala b/reader/src/main/scala/com/expedia/www/haystack/trace/reader/stores/EsIndexedTraceStore.scala index 6c513352..87f0375e 100644 --- a/reader/src/main/scala/com/expedia/www/haystack/trace/reader/stores/EsIndexedTraceStore.scala +++ b/reader/src/main/scala/com/expedia/www/haystack/trace/reader/stores/EsIndexedTraceStore.scala @@ -22,7 +22,7 @@ import com.expedia.www.haystack.trace.commons.clients.es.document.TraceIndexDoc import com.expedia.www.haystack.trace.commons.config.entities.{TraceStoreBackends, WhitelistIndexFieldConfiguration} import com.expedia.www.haystack.trace.reader.config.entities.ElasticSearchConfiguration import com.expedia.www.haystack.trace.reader.stores.readers.es.ElasticSearchReader -import com.expedia.www.haystack.trace.reader.stores.readers.es.query.{FieldValuesQueryGenerator, ServiceMetadataQueryGenerator, TraceCountsQueryGenerator, TraceSearchQueryGenerator} +import com.expedia.www.haystack.trace.reader.stores.readers.es.query.{FieldValuesQueryGenerator, ServiceMetadataQueryGenerator, ShowValuesQueryGenerator, TraceCountsQueryGenerator, TraceSearchQueryGenerator} import com.expedia.www.haystack.trace.reader.stores.readers.grpc.GrpcTraceReaders import io.searchbox.core.SearchResult import org.elasticsearch.index.IndexNotFoundException @@ -41,8 +41,9 @@ class EsIndexedTraceStore(traceStoreBackendConfig: TraceStoreBackends, private val esReader: ElasticSearchReader = new ElasticSearchReader(elasticSearchConfiguration.clientConfiguration) private val traceSearchQueryGenerator = new TraceSearchQueryGenerator(elasticSearchConfiguration.spansIndexConfiguration, ES_NESTED_DOC_NAME, whitelistedFieldsConfiguration) private val traceCountsQueryGenerator = new TraceCountsQueryGenerator(elasticSearchConfiguration.spansIndexConfiguration, ES_NESTED_DOC_NAME, whitelistedFieldsConfiguration) - private val fieldValuesQueryGenerator = new FieldValuesQueryGenerator(elasticSearchConfiguration.spansIndexConfiguration, ES_NESTED_DOC_NAME, whitelistedFieldsConfiguration) private val serviceMetadataQueryGenerator = new ServiceMetadataQueryGenerator(elasticSearchConfiguration.serviceMetadataIndexConfiguration) + private val showValuesQueryGenerator = new ShowValuesQueryGenerator(elasticSearchConfiguration.showValuesIndexConfiguration) + private val FIELD_VALUE_KEY = "fieldvalue" private val esCountTraces = (request: TraceCountsRequest, useSpecificIndices: Boolean) => { esReader.count(traceCountsQueryGenerator.generate(request, useSpecificIndices)) @@ -121,12 +122,20 @@ class EsIndexedTraceStore(traceStoreBackendConfig: TraceStoreBackends, } } + private def readFromShowValues(request: FieldValuesRequest): Option[Future[Seq[String]]] = { + val showValuesIndexConfiguration = elasticSearchConfiguration.showValuesIndexConfiguration + if (!showValuesIndexConfiguration.enabled) return None + + Some(esReader + .search(showValuesQueryGenerator.generateSearchFieldValuesQuery(request.getFilters(0).getValue, request.getFieldName)) + .map(extractFieldValues(_, FIELD_VALUE_KEY))) + } override def getFieldValues(request: FieldValuesRequest): Future[Seq[String]] = { - readFromServiceMetadata(request).getOrElse( - esReader - .search(fieldValuesQueryGenerator.generate(request)) - .map(extractFieldValues(_, request.getFieldName.toLowerCase))) + if (request.getFieldName.equalsIgnoreCase(TraceIndexDoc.SERVICE_KEY_NAME) || request.getFieldName.equalsIgnoreCase(TraceIndexDoc.OPERATION_KEY_NAME)) { + readFromServiceMetadata(request).getOrElse(Future.apply(Seq[String]())) + } + readFromShowValues(request).getOrElse(Future.apply(Seq[String]())) } override def getTraceCounts(request: TraceCountsRequest): Future[TraceCounts] = { diff --git a/reader/src/main/scala/com/expedia/www/haystack/trace/reader/stores/readers/es/query/ShowValuesQueryGenerator.scala b/reader/src/main/scala/com/expedia/www/haystack/trace/reader/stores/readers/es/query/ShowValuesQueryGenerator.scala new file mode 100644 index 00000000..6f140ac7 --- /dev/null +++ b/reader/src/main/scala/com/expedia/www/haystack/trace/reader/stores/readers/es/query/ShowValuesQueryGenerator.scala @@ -0,0 +1,64 @@ +/* + * Copyright 2017 Expedia, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.expedia.www.haystack.trace.reader.stores.readers.es.query + + +import com.expedia.www.haystack.trace.reader.config.entities.ShowValuesIndexConfiguration +import io.searchbox.core.Search +import org.apache.commons.lang3.StringUtils +import org.elasticsearch.index.query.QueryBuilders +import org.elasticsearch.index.query.QueryBuilders.termQuery +import org.elasticsearch.search.builder.SearchSourceBuilder + +class ShowValuesQueryGenerator(config: ShowValuesIndexConfiguration) { + private val SERVICE_NAME_KEY = "servicename" + private val FIELD_NAME_KEY = "fieldname" + private val FIELD_VALUE_KEY = "fieldvalue" + private val LIMIT = 10000 + + + def generateSearchFieldValuesQuery(serviceName: String, fieldName: String): Search = { + val fieldValuesQuery = buildFieldValuesQuery(serviceName, fieldName) + generateSearchQuery(fieldValuesQuery) + } + + private def generateSearchQuery(queryString: String): Search = { + new Search.Builder(queryString) + .addIndex(config.indexName) + .addType(config.indexType) + .build() + } + + private def buildFieldValuesQuery(serviceName: String, fieldName: String): String = { + if (StringUtils.isNotEmpty(serviceName)) { + new SearchSourceBuilder() + .query(QueryBuilders + .boolQuery() + .must(termQuery(SERVICE_NAME_KEY, serviceName)) + .must(termQuery(FIELD_NAME_KEY, fieldName))) + .fetchSource(Array[String](FIELD_VALUE_KEY), Array[String](FIELD_NAME_KEY, SERVICE_NAME_KEY)) + .size(LIMIT) + .toString + } else { + new SearchSourceBuilder() + .query(termQuery(FIELD_NAME_KEY, fieldName)) + .fetchSource(Array[String](FIELD_VALUE_KEY), Array[String](FIELD_NAME_KEY, SERVICE_NAME_KEY)) + .size(LIMIT) + .toString + } + } +} diff --git a/reader/src/test/resources/config/base.conf b/reader/src/test/resources/config/base.conf index a9c2f86f..7542d497 100644 --- a/reader/src/test/resources/config/base.conf +++ b/reader/src/test/resources/config/base.conf @@ -36,6 +36,11 @@ elasticsearch { name = "service_metadata" type = "metadata" } + show.values { + enabled = false + name = "show-values" + type = "showvalues-metadata" + } } } diff --git a/reader/src/test/scala/com/expedia/www/haystack/trace/reader/integration/BaseIntegrationTestSpec.scala b/reader/src/test/scala/com/expedia/www/haystack/trace/reader/integration/BaseIntegrationTestSpec.scala index c0c7b5ce..4699d16a 100644 --- a/reader/src/test/scala/com/expedia/www/haystack/trace/reader/integration/BaseIntegrationTestSpec.scala +++ b/reader/src/test/scala/com/expedia/www/haystack/trace/reader/integration/BaseIntegrationTestSpec.scala @@ -26,7 +26,7 @@ import com.expedia.open.tracing.api.TraceReaderGrpc.TraceReaderBlockingStub import com.expedia.open.tracing.backend.StorageBackendGrpc.StorageBackendBlockingStub import com.expedia.open.tracing.backend.{StorageBackendGrpc, TraceRecord, WriteSpansRequest, WriteSpansResponse} import com.expedia.open.tracing.buffer.SpanBuffer -import com.expedia.www.haystack.trace.commons.clients.es.document.TraceIndexDoc +import com.expedia.www.haystack.trace.commons.clients.es.document.{ShowValuesDoc, TraceIndexDoc} import com.expedia.www.haystack.trace.commons.config.entities.{IndexFieldType, WhiteListIndexFields, WhitelistIndexField} import com.expedia.www.haystack.trace.commons.packer.{PackerFactory, PackerType} import com.expedia.www.haystack.trace.reader.Service @@ -56,6 +56,8 @@ trait BaseIntegrationTestSpec extends FunSpec with GivenWhenThen with Matchers w private val ELASTIC_SEARCH_ENDPOINT = "http://elasticsearch:9200" private val ELASTIC_SEARCH_WHITELIST_INDEX = "reload-configs" private val ELASTIC_SEARCH_WHITELIST_TYPE = "whitelist-index-fields" + private val ELASTIC_SEARCH_SHOW_VALUES_INDEX = "show-values" + private val ELASTIC_SEARCH_SHOW_VALUES_TYPE = "fieldvalues-metadata" private val SPANS_INDEX_TYPE = "spans" private val executors = Executors.newFixedThreadPool(2) @@ -260,6 +262,14 @@ trait BaseIntegrationTestSpec extends FunSpec with GivenWhenThen with Matchers w Thread.sleep(10000) } + protected def putShowValueFieldsInEs(serviceName: String, fieldNameValuePairs: Set[(String, String)]): Unit = { + val showValuesDocList = fieldNameValuePairs.map(p => ShowValuesDoc(serviceName, p._1, p._2)) + esClient.execute(new Index.Builder(Serialization.write(showValuesDocList)) + .index(ELASTIC_SEARCH_SHOW_VALUES_INDEX) + .`type`(ELASTIC_SEARCH_SHOW_VALUES_TYPE) + .build()) + } + private def insertTraceInBackend(traceId: String, spanId: String, serviceName: String, diff --git a/reader/src/test/scala/com/expedia/www/haystack/trace/reader/integration/TraceServiceIntegrationTestSpec.scala b/reader/src/test/scala/com/expedia/www/haystack/trace/reader/integration/TraceServiceIntegrationTestSpec.scala index 1c870505..65b05738 100644 --- a/reader/src/test/scala/com/expedia/www/haystack/trace/reader/integration/TraceServiceIntegrationTestSpec.scala +++ b/reader/src/test/scala/com/expedia/www/haystack/trace/reader/integration/TraceServiceIntegrationTestSpec.scala @@ -84,6 +84,25 @@ class TraceServiceIntegrationTestSpec extends BaseIntegrationTestSpec { result.getValuesList.size() should be(2) result.getValuesList.asScala should contain allOf(op1, op2) } + + it("should return values of a given fields(other than service and operation name) with filters") { + Given("trace in trace-backend and elasticsearch") + val serviceName = "get_values_with_filters_non_service_operation_type_servicename" + val fieldNameValuePairs = Set[(String, String)](("tagname1", "tagvalue1"), ("tagname2", "tagvalue2")) + + putShowValueFieldsInEs(serviceName, fieldNameValuePairs) + val request = FieldValuesRequest.newBuilder() + .addFilters(Field.newBuilder().setName(TraceIndexDoc.SERVICE_KEY_NAME).setValue(serviceName)) + .setFieldName("tagname1") + .build() + + When("calling getFieldValues") + val result = client.getFieldValues(request) + + Then("should return filtered values for given field") + result.getValuesList.size() should be(1) + result.getValuesList.asScala should contain("tagname1") + } } describe("TraceReader.getTrace") { diff --git a/reader/src/test/scala/com/expedia/www/haystack/trace/reader/unit/config/ConfigurationLoaderSpec.scala b/reader/src/test/scala/com/expedia/www/haystack/trace/reader/unit/config/ConfigurationLoaderSpec.scala index 58e4feb3..f4da7781 100644 --- a/reader/src/test/scala/com/expedia/www/haystack/trace/reader/unit/config/ConfigurationLoaderSpec.scala +++ b/reader/src/test/scala/com/expedia/www/haystack/trace/reader/unit/config/ConfigurationLoaderSpec.scala @@ -71,6 +71,10 @@ class ConfigurationLoaderSpec extends BaseUnitTestSpec { elasticSearchConfig.serviceMetadataIndexConfiguration.enabled shouldEqual false elasticSearchConfig.serviceMetadataIndexConfiguration.indexName shouldEqual "service_metadata" elasticSearchConfig.serviceMetadataIndexConfiguration.indexType shouldEqual "metadata" + + elasticSearchConfig.showValuesIndexConfiguration.enabled shouldEqual false + elasticSearchConfig.showValuesIndexConfiguration.indexName shouldEqual "show-values" + elasticSearchConfig.showValuesIndexConfiguration.indexType shouldEqual "showvalues-metadata" } it("should load trace backend configuration") { diff --git a/reader/src/test/scala/com/expedia/www/haystack/trace/reader/unit/stores/readers/es/query/ShowValuesQueryGeneratorSpec.scala b/reader/src/test/scala/com/expedia/www/haystack/trace/reader/unit/stores/readers/es/query/ShowValuesQueryGeneratorSpec.scala new file mode 100644 index 00000000..f9a71e25 --- /dev/null +++ b/reader/src/test/scala/com/expedia/www/haystack/trace/reader/unit/stores/readers/es/query/ShowValuesQueryGeneratorSpec.scala @@ -0,0 +1,46 @@ +/* + * Copyright 2017 Expedia, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.expedia.www.haystack.trace.reader.unit.stores.readers.es.query + +import com.expedia.www.haystack.trace.reader.config.entities.ShowValuesIndexConfiguration +import com.expedia.www.haystack.trace.reader.stores.readers.es.query.ShowValuesQueryGenerator +import com.expedia.www.haystack.trace.reader.unit.BaseUnitTestSpec +import com.google.gson.Gson + +class ShowValuesQueryGeneratorSpec extends BaseUnitTestSpec { + private val indexType = "fieldvalues-metadata" + private val showValuesIndexConfiguration = ShowValuesIndexConfiguration( + enabled = true, + indexName = "show-values", + indexType = indexType) + + describe("ShowValuesQueryGenerator") { + it("should generate valid bool queries for field") { + Given("a query generator") + val queryGenerator = new ShowValuesQueryGenerator(showValuesIndexConfiguration) + val serviceName = "test_service" + val fieldName = "test_field" + + When("asked for aggregated service name") + val query = queryGenerator.generateSearchFieldValuesQuery(serviceName, fieldName) + + Then("generate a valid query") + query.getType should be(indexType) + query.getData(new Gson()) shouldEqual "{\n \"size\" : 10000,\n \"query\" : {\n \"bool\" : {\n \"must\" : [\n {\n \"term\" : {\n \"servicename\" : {\n \"value\" : \"test_service\",\n \"boost\" : 1.0\n }\n }\n },\n {\n \"term\" : {\n \"fieldname\" : {\n \"value\" : \"test_field\",\n \"boost\" : 1.0\n }\n }\n }\n ],\n \"adjust_pure_negative\" : true,\n \"boost\" : 1.0\n }\n },\n \"_source\" : {\n \"includes\" : [\n \"fieldvalue\"\n ],\n \"excludes\" : [\n \"fieldname\",\n \"servicename\"\n ]\n }\n}" + query.toString shouldEqual "Search{uri=show-values/fieldvalues-metadata/_search, method=POST}" + } + } +} From 48d0f9cc0e99538a44bf88e2813420ecbb698552 Mon Sep 17 00:00:00 2001 From: Raghvendra Singh Date: Sun, 25 Aug 2019 05:22:13 +0530 Subject: [PATCH 10/12] fixing response parser for reader --- .../reader/stores/EsIndexedTraceStore.scala | 12 ++- .../trace/reader/stores/ResponseParser.scala | 55 ++++++++------ reader/src/test/resources/config/base.conf | 4 +- .../integration/BaseIntegrationTestSpec.scala | 75 +++++++++++++++++-- .../TraceServiceIntegrationTestSpec.scala | 2 +- 5 files changed, 116 insertions(+), 32 deletions(-) diff --git a/reader/src/main/scala/com/expedia/www/haystack/trace/reader/stores/EsIndexedTraceStore.scala b/reader/src/main/scala/com/expedia/www/haystack/trace/reader/stores/EsIndexedTraceStore.scala index 87f0375e..7c53e742 100644 --- a/reader/src/main/scala/com/expedia/www/haystack/trace/reader/stores/EsIndexedTraceStore.scala +++ b/reader/src/main/scala/com/expedia/www/haystack/trace/reader/stores/EsIndexedTraceStore.scala @@ -41,6 +41,7 @@ class EsIndexedTraceStore(traceStoreBackendConfig: TraceStoreBackends, private val esReader: ElasticSearchReader = new ElasticSearchReader(elasticSearchConfiguration.clientConfiguration) private val traceSearchQueryGenerator = new TraceSearchQueryGenerator(elasticSearchConfiguration.spansIndexConfiguration, ES_NESTED_DOC_NAME, whitelistedFieldsConfiguration) private val traceCountsQueryGenerator = new TraceCountsQueryGenerator(elasticSearchConfiguration.spansIndexConfiguration, ES_NESTED_DOC_NAME, whitelistedFieldsConfiguration) + private val fieldValuesQueryGenerator = new FieldValuesQueryGenerator(elasticSearchConfiguration.spansIndexConfiguration, ES_NESTED_DOC_NAME, whitelistedFieldsConfiguration) private val serviceMetadataQueryGenerator = new ServiceMetadataQueryGenerator(elasticSearchConfiguration.serviceMetadataIndexConfiguration) private val showValuesQueryGenerator = new ShowValuesQueryGenerator(elasticSearchConfiguration.showValuesIndexConfiguration) private val FIELD_VALUE_KEY = "fieldvalue" @@ -131,11 +132,18 @@ class EsIndexedTraceStore(traceStoreBackendConfig: TraceStoreBackends, .map(extractFieldValues(_, FIELD_VALUE_KEY))) } + private def readFromSpansIndex(request: FieldValuesRequest): Future[Seq[String]] = { + esReader + .search(fieldValuesQueryGenerator.generate(request)) + .map(extractFieldValues(_, request.getFieldName.toLowerCase)) + } + override def getFieldValues(request: FieldValuesRequest): Future[Seq[String]] = { if (request.getFieldName.equalsIgnoreCase(TraceIndexDoc.SERVICE_KEY_NAME) || request.getFieldName.equalsIgnoreCase(TraceIndexDoc.OPERATION_KEY_NAME)) { - readFromServiceMetadata(request).getOrElse(Future.apply(Seq[String]())) + readFromServiceMetadata(request).getOrElse(readFromSpansIndex(request)) + } else { + readFromShowValues(request).getOrElse(readFromSpansIndex(request)) } - readFromShowValues(request).getOrElse(Future.apply(Seq[String]())) } override def getTraceCounts(request: TraceCountsRequest): Future[TraceCounts] = { diff --git a/reader/src/main/scala/com/expedia/www/haystack/trace/reader/stores/ResponseParser.scala b/reader/src/main/scala/com/expedia/www/haystack/trace/reader/stores/ResponseParser.scala index aaa6cf71..917cd674 100644 --- a/reader/src/main/scala/com/expedia/www/haystack/trace/reader/stores/ResponseParser.scala +++ b/reader/src/main/scala/com/expedia/www/haystack/trace/reader/stores/ResponseParser.scala @@ -36,6 +36,10 @@ trait ResponseParser { private val ES_COUNT_PER_INTERVAL = "__count_per_interval" private val ES_AGG_DOC_COUNT = "doc_count" protected val ES_NESTED_DOC_NAME = "spans" + private val ES_HITS_KEY = "hits" + private val ES_SOURCE_KEY = "_source" + private val ES_FIELD_VALUE_KEY = "fieldvalue" + protected def mapSearchResultToTraceCounts(result: SearchResult): Future[TraceCounts] = { val aggregation = result.getJsonObject @@ -69,27 +73,36 @@ trait ResponseParser { } protected def extractFieldValues(result: SearchResult, fieldName: String): List[String] = { - val aggregations = - result - .getJsonObject - .getAsJsonObject(ES_FIELD_AGGREGATIONS) - .getAsJsonObject(ES_NESTED_DOC_NAME) - .getAsJsonObject(fieldName) - - if (aggregations.has(ES_FIELD_BUCKETS)) { - aggregations - .getAsJsonArray(ES_FIELD_BUCKETS) - .asScala - .map(element => element.getAsJsonObject.get(ES_FIELD_KEY).getAsString) - .toList - } - else { - aggregations - .getAsJsonObject(fieldName) - .getAsJsonArray(ES_FIELD_BUCKETS) - .asScala - .map(element => element.getAsJsonObject.get(ES_FIELD_KEY).getAsString) - .toList + + val jsonResult = result.getJsonObject + + if (jsonResult.has(ES_FIELD_AGGREGATIONS)) { + val aggregations = jsonResult + .getAsJsonObject(ES_FIELD_AGGREGATIONS) + .getAsJsonObject(ES_NESTED_DOC_NAME) + .getAsJsonObject(fieldName) + + if (aggregations.has(ES_FIELD_BUCKETS)) { + aggregations + .getAsJsonArray(ES_FIELD_BUCKETS) + .asScala + .map(element => element.getAsJsonObject.get(ES_FIELD_KEY).getAsString) + .toList + } + else { + aggregations + .getAsJsonObject(fieldName) + .getAsJsonArray(ES_FIELD_BUCKETS) + .asScala + .map(element => element.getAsJsonObject.get(ES_FIELD_KEY).getAsString) + .toList + } + } else { + jsonResult + .getAsJsonObject(ES_HITS_KEY) + .getAsJsonArray(ES_HITS_KEY) + .asScala.map(p => p.getAsJsonObject.getAsJsonObject(ES_SOURCE_KEY).get(ES_FIELD_VALUE_KEY).getAsString) + .toList.distinct } } diff --git a/reader/src/test/resources/config/base.conf b/reader/src/test/resources/config/base.conf index 7542d497..9f629de4 100644 --- a/reader/src/test/resources/config/base.conf +++ b/reader/src/test/resources/config/base.conf @@ -37,9 +37,9 @@ elasticsearch { type = "metadata" } show.values { - enabled = false + enabled = true name = "show-values" - type = "showvalues-metadata" + type = "fieldvalues-metadata" } } } diff --git a/reader/src/test/scala/com/expedia/www/haystack/trace/reader/integration/BaseIntegrationTestSpec.scala b/reader/src/test/scala/com/expedia/www/haystack/trace/reader/integration/BaseIntegrationTestSpec.scala index 4699d16a..1863dad8 100644 --- a/reader/src/test/scala/com/expedia/www/haystack/trace/reader/integration/BaseIntegrationTestSpec.scala +++ b/reader/src/test/scala/com/expedia/www/haystack/trace/reader/integration/BaseIntegrationTestSpec.scala @@ -56,7 +56,6 @@ trait BaseIntegrationTestSpec extends FunSpec with GivenWhenThen with Matchers w private val ELASTIC_SEARCH_ENDPOINT = "http://elasticsearch:9200" private val ELASTIC_SEARCH_WHITELIST_INDEX = "reload-configs" private val ELASTIC_SEARCH_WHITELIST_TYPE = "whitelist-index-fields" - private val ELASTIC_SEARCH_SHOW_VALUES_INDEX = "show-values" private val ELASTIC_SEARCH_SHOW_VALUES_TYPE = "fieldvalues-metadata" private val SPANS_INDEX_TYPE = "spans" @@ -72,6 +71,16 @@ trait BaseIntegrationTestSpec extends FunSpec with GivenWhenThen with Matchers w s"haystack-traces-$dateBucket-$hourBucket" } + + private val SHOW_VAlUES_INDEX = { + val date = new Date() + + val dateBucket = new SimpleDateFormat("yyyy-MM-dd").format(date) + val hourBucket = new SimpleDateFormat("HH").format(date).toInt / 6 + + s"show-values-$dateBucket" + } + private val INDEX_TEMPLATE = """{ | "template": "haystack-traces*", @@ -158,6 +167,54 @@ trait BaseIntegrationTestSpec extends FunSpec with GivenWhenThen with Matchers w |""".stripMargin + private val SHOW_VALUES_INDEX_TEMPLATE = + """{ + |"template": "show-values*", + |"aliases": { + | "show-values":{} + |}, + |"settings": { + | "number_of_shards": 4, + | "index.mapping.ignore_malformed": true, + | "analysis": { + | "normalizer": { + | "lowercase_normalizer": { + | "type": "custom", + | "filter": ["lowercase"] + | } + | } + | } + |}, + |"mappings": { + | "fieldvalues-metadata": { + | "_field_names": { + | "enabled": false + | }, + | "_all": { + | "enabled": false + | }, + | "properties": { + | "servicename": { + | "type": "keyword", + | "norms": false, + | "doc_values": false + | }, + | "fieldname": { + | "type": "keyword", + | "norms": false, + | "doc_values": false + | }, + | "fieldvalue": { + | "type": "keyword", + | "norms": false, + | "doc_values": false + | } + | } + | } + |} + |}""".stripMargin + + private var esClient: JestClient = _ private var traceBackendClient: StorageBackendBlockingStub = _ @@ -190,6 +247,10 @@ trait BaseIntegrationTestSpec extends FunSpec with GivenWhenThen with Matchers w .settings(INDEX_TEMPLATE) .build) + esClient.execute(new CreateIndex.Builder(SHOW_VAlUES_INDEX) + .settings(SHOW_VALUES_INDEX_TEMPLATE) + .build) + executors.submit(new Runnable { override def run(): Unit = Service.main(null) }) @@ -263,11 +324,13 @@ trait BaseIntegrationTestSpec extends FunSpec with GivenWhenThen with Matchers w } protected def putShowValueFieldsInEs(serviceName: String, fieldNameValuePairs: Set[(String, String)]): Unit = { - val showValuesDocList = fieldNameValuePairs.map(p => ShowValuesDoc(serviceName, p._1, p._2)) - esClient.execute(new Index.Builder(Serialization.write(showValuesDocList)) - .index(ELASTIC_SEARCH_SHOW_VALUES_INDEX) - .`type`(ELASTIC_SEARCH_SHOW_VALUES_TYPE) - .build()) + val showValuesDocList = fieldNameValuePairs.map(p => { + esClient.execute(new Index.Builder(Serialization.write(ShowValuesDoc(serviceName, p._1, p._2))) + .index(SHOW_VAlUES_INDEX) + .`type`(ELASTIC_SEARCH_SHOW_VALUES_TYPE) + .build()) + }) + } private def insertTraceInBackend(traceId: String, diff --git a/reader/src/test/scala/com/expedia/www/haystack/trace/reader/integration/TraceServiceIntegrationTestSpec.scala b/reader/src/test/scala/com/expedia/www/haystack/trace/reader/integration/TraceServiceIntegrationTestSpec.scala index 65b05738..6901306d 100644 --- a/reader/src/test/scala/com/expedia/www/haystack/trace/reader/integration/TraceServiceIntegrationTestSpec.scala +++ b/reader/src/test/scala/com/expedia/www/haystack/trace/reader/integration/TraceServiceIntegrationTestSpec.scala @@ -101,7 +101,7 @@ class TraceServiceIntegrationTestSpec extends BaseIntegrationTestSpec { Then("should return filtered values for given field") result.getValuesList.size() should be(1) - result.getValuesList.asScala should contain("tagname1") + result.getValuesList.asScala should contain("tagvalue1") } } From 20adbb35f1ebc8353ac6b4cf73454f66e99e917b Mon Sep 17 00:00:00 2001 From: Raghvendra Singh Date: Mon, 26 Aug 2019 12:01:33 +0530 Subject: [PATCH 11/12] fixing reader tests --- .../trace/reader/unit/config/ConfigurationLoaderSpec.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/reader/src/test/scala/com/expedia/www/haystack/trace/reader/unit/config/ConfigurationLoaderSpec.scala b/reader/src/test/scala/com/expedia/www/haystack/trace/reader/unit/config/ConfigurationLoaderSpec.scala index f4da7781..c5c3036e 100644 --- a/reader/src/test/scala/com/expedia/www/haystack/trace/reader/unit/config/ConfigurationLoaderSpec.scala +++ b/reader/src/test/scala/com/expedia/www/haystack/trace/reader/unit/config/ConfigurationLoaderSpec.scala @@ -72,9 +72,9 @@ class ConfigurationLoaderSpec extends BaseUnitTestSpec { elasticSearchConfig.serviceMetadataIndexConfiguration.indexName shouldEqual "service_metadata" elasticSearchConfig.serviceMetadataIndexConfiguration.indexType shouldEqual "metadata" - elasticSearchConfig.showValuesIndexConfiguration.enabled shouldEqual false + elasticSearchConfig.showValuesIndexConfiguration.enabled shouldEqual true elasticSearchConfig.showValuesIndexConfiguration.indexName shouldEqual "show-values" - elasticSearchConfig.showValuesIndexConfiguration.indexType shouldEqual "showvalues-metadata" + elasticSearchConfig.showValuesIndexConfiguration.indexType shouldEqual "fieldvalues-metadata" } it("should load trace backend configuration") { From 0e29681e15a67415986a6c2686f3cc884748e03b Mon Sep 17 00:00:00 2001 From: Raghvendra Singh Date: Fri, 30 Aug 2019 04:04:05 +0530 Subject: [PATCH 12/12] fixing the es refresh issue for tests --- .../trace/reader/integration/BaseIntegrationTestSpec.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/reader/src/test/scala/com/expedia/www/haystack/trace/reader/integration/BaseIntegrationTestSpec.scala b/reader/src/test/scala/com/expedia/www/haystack/trace/reader/integration/BaseIntegrationTestSpec.scala index 1863dad8..9056e6de 100644 --- a/reader/src/test/scala/com/expedia/www/haystack/trace/reader/integration/BaseIntegrationTestSpec.scala +++ b/reader/src/test/scala/com/expedia/www/haystack/trace/reader/integration/BaseIntegrationTestSpec.scala @@ -331,6 +331,7 @@ trait BaseIntegrationTestSpec extends FunSpec with GivenWhenThen with Matchers w .build()) }) + Thread.sleep(10000) } private def insertTraceInBackend(traceId: String,