diff --git a/commons/src/main/scala/com/expedia/www/haystack/trace/commons/clients/es/document/ShowValuesDoc.scala b/commons/src/main/scala/com/expedia/www/haystack/trace/commons/clients/es/document/ShowValuesDoc.scala new file mode 100644 index 00000000..66462554 --- /dev/null +++ b/commons/src/main/scala/com/expedia/www/haystack/trace/commons/clients/es/document/ShowValuesDoc.scala @@ -0,0 +1,10 @@ +package com.expedia.www.haystack.trace.commons.clients.es.document + +import org.json4s.jackson.Serialization + +case class ShowValuesDoc(servicename: String, + fieldname: String, + fieldvalue: String) { + val json: String = Serialization.write(this)(TraceIndexDoc.formats) +} + diff --git a/commons/src/main/scala/com/expedia/www/haystack/trace/commons/config/entities/WhitelistIndexFieldConfiguration.scala b/commons/src/main/scala/com/expedia/www/haystack/trace/commons/config/entities/WhitelistIndexFieldConfiguration.scala index 19a1ac42..c8f5eedc 100644 --- a/commons/src/main/scala/com/expedia/www/haystack/trace/commons/config/entities/WhitelistIndexFieldConfiguration.scala +++ b/commons/src/main/scala/com/expedia/www/haystack/trace/commons/config/entities/WhitelistIndexFieldConfiguration.scala @@ -40,6 +40,7 @@ case class WhitelistIndexField(name: String, aliases: Set[String] = Set(), enableRangeQuery: Boolean = false, searchContext: String = "span", + showValue: Boolean = false, enabled: Boolean = true) case class WhiteListIndexFields(fields: List[WhitelistIndexField]) diff --git a/commons/src/test/scala/com/expedia/www/haystack/trace/commons/unit/WhitelistIndexFieldConfigurationSpec.scala b/commons/src/test/scala/com/expedia/www/haystack/trace/commons/unit/WhitelistIndexFieldConfigurationSpec.scala index 24febefb..1b7c0e14 100644 --- a/commons/src/test/scala/com/expedia/www/haystack/trace/commons/unit/WhitelistIndexFieldConfigurationSpec.scala +++ b/commons/src/test/scala/com/expedia/www/haystack/trace/commons/unit/WhitelistIndexFieldConfigurationSpec.scala @@ -21,7 +21,7 @@ import com.expedia.www.haystack.trace.commons.config.entities.{IndexFieldType, W import org.json4s.ext.EnumNameSerializer import org.json4s.jackson.Serialization import org.json4s.{DefaultFormats, Formats} -import org.scalatest.{Entry, FunSpec, Matchers} +import org.scalatest.{FunSpec, Matchers} import scala.collection.JavaConverters._ diff --git a/indexer/src/main/resources/config/base.conf b/indexer/src/main/resources/config/base.conf index 03070a08..eb34d93b 100644 --- a/indexer/src/main/resources/config/base.conf +++ b/indexer/src/main/resources/config/base.conf @@ -93,6 +93,41 @@ service.metadata { } } +show.values { + enabled = true + flush { + interval.sec = 60 + operation.count = 10000 + } + es { + endpoint = "http://elasticsearch:9200" + conn.timeout.ms = 10000 + read.timeout.ms = 5000 + consistency.level = "one" + index { + # apply the template before starting the client, if json is empty, no operation is performed + template.json = "{\"template\": \"show-values\", \"index_patterns\": [\"show-values*\"], \"aliases\": {\"show-values\":{}}, \"settings\": {\"number_of_shards\": 4, \"index.mapping.ignore_malformed\": true, \"analysis\": {\"normalizer\": {\"lowercase_normalizer\": {\"type\": \"custom\", \"filter\": [\"lowercase\"]}}}}, \"mappings\": {\"metadata\": {\"_field_names\": {\"enabled\": false}, \"_all\": {\"enabled\": false}, \"properties\": {\"servicename\": {\"type\": \"keyword\", \"norms\": false, \"doc_values\": false}, \"fieldname\": {\"type\": \"keyword\", \"norms\": false, \"doc_values\": false}, \"fieldvalue\": {\"type\": \"keyword\", \"norms\": false, \"doc_values\": false}}}}}" + name = "show-values" + type = "fieldvalues-metadata" + } + # defines settings for bulk operation like max inflight bulks, number of documents and the total size in a single bulk + bulk.max { + docs { + count = 100 + size.kb = 1000 + } + inflight = 10 + } + retries { + max = 10 + backoff { + initial.ms = 100 + factor = 2 + } + } + } +} + elasticsearch { endpoint = "http://elasticsearch:9200" diff --git a/indexer/src/main/scala/com/expedia/www/haystack/trace/indexer/App.scala b/indexer/src/main/scala/com/expedia/www/haystack/trace/indexer/App.scala index 2ca21d72..7060f4ba 100644 --- a/indexer/src/main/scala/com/expedia/www/haystack/trace/indexer/App.scala +++ b/indexer/src/main/scala/com/expedia/www/haystack/trace/indexer/App.scala @@ -44,6 +44,7 @@ object App extends MetricsSupport { appConfig.elasticSearchConfig, appConfig.backendConfig, appConfig.serviceMetadataWriteConfig, + appConfig.showValuesConfig, appConfig.indexConfig) Runtime.getRuntime.addShutdownHook(new Thread { diff --git a/indexer/src/main/scala/com/expedia/www/haystack/trace/indexer/StreamRunner.scala b/indexer/src/main/scala/com/expedia/www/haystack/trace/indexer/StreamRunner.scala index 5c15959d..2b2e274e 100644 --- a/indexer/src/main/scala/com/expedia/www/haystack/trace/indexer/StreamRunner.scala +++ b/indexer/src/main/scala/com/expedia/www/haystack/trace/indexer/StreamRunner.scala @@ -29,7 +29,7 @@ import com.expedia.www.haystack.trace.indexer.processors._ import com.expedia.www.haystack.trace.indexer.processors.supplier.SpanIndexProcessorSupplier import com.expedia.www.haystack.trace.indexer.store.SpanBufferMemoryStoreSupplier import com.expedia.www.haystack.trace.indexer.writers.TraceWriter -import com.expedia.www.haystack.trace.indexer.writers.es.{ElasticSearchWriter, ServiceMetadataWriter} +import com.expedia.www.haystack.trace.indexer.writers.es.{ElasticSearchWriter, ServiceMetadataWriter, ShowValuesWriter} import com.expedia.www.haystack.trace.indexer.writers.grpc.GrpcTraceWriter import com.expedia.www.haystack.trace.indexer.writers.kafka.KafkaWriter import org.apache.commons.lang3.StringUtils @@ -44,6 +44,7 @@ class StreamRunner(kafkaConfig: KafkaConfiguration, esConfig: ElasticSearchConfiguration, traceWriteConfig: TraceBackendConfiguration, serviceMetadataWriteConfig: ServiceMetadataWriteConfiguration, + showValuesWriteConfig: ShowValuesConfiguration, indexConfig: WhitelistIndexFieldConfiguration) extends AutoCloseable with StateListener { implicit private val executor: ExecutionContextExecutor = scala.concurrent.ExecutionContext.global @@ -63,6 +64,10 @@ class StreamRunner(kafkaConfig: KafkaConfiguration, writers += new ServiceMetadataWriter(serviceMetadataWriteConfig) } + if (showValuesWriteConfig.enabled) { + writers += new ShowValuesWriter(showValuesWriteConfig, indexConfig) + } + if (StringUtils.isNotEmpty(kafkaConfig.produceTopic)) { writers += new KafkaWriter(kafkaConfig.producerProps, kafkaConfig.produceTopic) } diff --git a/indexer/src/main/scala/com/expedia/www/haystack/trace/indexer/config/ProjectConfiguration.scala b/indexer/src/main/scala/com/expedia/www/haystack/trace/indexer/config/ProjectConfiguration.scala index 0e677b89..e52dd79d 100644 --- a/indexer/src/main/scala/com/expedia/www/haystack/trace/indexer/config/ProjectConfiguration.scala +++ b/indexer/src/main/scala/com/expedia/www/haystack/trace/indexer/config/ProjectConfiguration.scala @@ -182,6 +182,44 @@ class ProjectConfiguration extends AutoCloseable { ) } + /** + * show values write configuration + */ + val showValuesConfig: ShowValuesConfiguration = { + val showValuesConfig = config.getConfig("show.values") + val es = showValuesConfig.getConfig("es") + val templateJsonConfigField = "index.template.json" + val indexTemplateJson = if (es.hasPath(templateJsonConfigField) + && StringUtils.isNotEmpty(es.getString(templateJsonConfigField))) { + Some(es.getString(templateJsonConfigField)) + } else { + None + } + val username = if (es.hasPath("username")) Option(es.getString("username")) else None + val password = if (es.hasPath("password")) Option(es.getString("password")) else None + ShowValuesConfiguration( + enabled = showValuesConfig.getBoolean("enabled"), + flushIntervalInSec = showValuesConfig.getInt("flush.interval.sec"), + flushOnMaxFieldCount = showValuesConfig.getInt("flush.operation.count"), + esEndpoint = es.getString("endpoint"), + username = username, + password = password, + consistencyLevel = es.getString("consistency.level"), + indexName = es.getString("index.name"), + indexType = es.getString("index.type"), + indexTemplateJson = indexTemplateJson, + connectionTimeoutMillis = es.getInt("conn.timeout.ms"), + readTimeoutMillis = es.getInt("read.timeout.ms"), + maxInFlightBulkRequests = es.getInt("bulk.max.inflight"), + maxDocsInBulk = es.getInt("bulk.max.docs.count"), + maxBulkDocSizeInBytes = es.getInt("bulk.max.docs.size.kb") * 1000, + retryConfig = RetryOperation.Config( + es.getInt("retries.max"), + es.getLong("retries.backoff.initial.ms"), + es.getDouble("retries.backoff.factor")) + ) + } + /** * * elastic search configuration object diff --git a/indexer/src/main/scala/com/expedia/www/haystack/trace/indexer/config/entities/ShowValuesWriteConfiguration.scala b/indexer/src/main/scala/com/expedia/www/haystack/trace/indexer/config/entities/ShowValuesWriteConfiguration.scala new file mode 100644 index 00000000..7f602bed --- /dev/null +++ b/indexer/src/main/scala/com/expedia/www/haystack/trace/indexer/config/entities/ShowValuesWriteConfiguration.scala @@ -0,0 +1,60 @@ +/* + * Copyright 2018 Expedia, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.expedia.www.haystack.trace.indexer.config.entities + +import com.expedia.www.haystack.commons.retries.RetryOperation + +/** + * Configurations for writing service metadata to elastic search + * + * @param enabled: enable writing service metadata, if its set to false, list of service_names and operation names would be fetched from elastic search traces index, which is an expensive aggregation + * @param fieldNames: list of field names to index in show values index + * @param esEndpoint: http endpoint to connect + * @param indexTemplateJson: template as json that will be applied when the app runs, this is optional * @param username + * @param password: password for the es + * @param consistencyLevel: consistency level of writes, for e.g. one, quoram + * @param indexName: name of the elastic search index where the data is written + * @param indexType: elastic search index type + * @param connectionTimeoutMillis : connection timeout in millis + * @param readTimeoutMillis: read timeout in millis + * @param maxInFlightBulkRequests: max bulk writes that can be run in parallel + * @param maxDocsInBulk: maximum number of index documents in a single bulk + * @param maxBulkDocSizeInBytes maximum size (in bytes) of a single bulk request + * @param flushIntervalInSec: interval for collecting service name operation names in memory before flushing to es + * @param flushOnMaxFieldCount: maximum number of unique operations to force flushing to es + * @param retryConfig: retry max retries limit, initial backoff and exponential factor values + */ + +case class ShowValuesConfiguration(enabled: Boolean, + esEndpoint: String, + username: Option[String], + password: Option[String], + consistencyLevel: String, + indexTemplateJson: Option[String], + indexName: String, + indexType: String, + connectionTimeoutMillis: Int, + readTimeoutMillis: Int, + maxInFlightBulkRequests: Int, + maxDocsInBulk: Int, + maxBulkDocSizeInBytes: Int, + flushIntervalInSec: Int, + flushOnMaxFieldCount: Int, + retryConfig: RetryOperation.Config + ) { + require(maxInFlightBulkRequests > 0) +} diff --git a/indexer/src/main/scala/com/expedia/www/haystack/trace/indexer/writers/es/IndexDocumentGenerator.scala b/indexer/src/main/scala/com/expedia/www/haystack/trace/indexer/writers/es/IndexDocumentGenerator.scala index ec53bffc..4df155d3 100644 --- a/indexer/src/main/scala/com/expedia/www/haystack/trace/indexer/writers/es/IndexDocumentGenerator.scala +++ b/indexer/src/main/scala/com/expedia/www/haystack/trace/indexer/writers/es/IndexDocumentGenerator.scala @@ -34,6 +34,7 @@ import scala.util.{Failure, Success, Try} class IndexDocumentGenerator(config: WhitelistIndexFieldConfiguration) extends MetricsSupport { + type SpanDocument = mutable.Map[String, Any] private val MIN_DURATION_FOR_TRUNCATION = TimeUnit.SECONDS.toMicros(20) /** @@ -44,7 +45,7 @@ class IndexDocumentGenerator(config: WhitelistIndexFieldConfiguration) extends M // We maintain a white list of tags that are to be indexed. The whitelist is maintained as a configuration // in an external database (outside this app boundary). However, the app periodically reads this whitelist config // and applies it to the new spans that are read. - val spanIndices = mutable.ListBuffer[mutable.Map[String, Any]]() + val spanIndices = mutable.ListBuffer[SpanDocument]() var traceStartTime = Long.MaxValue var rootDuration = 0l diff --git a/indexer/src/main/scala/com/expedia/www/haystack/trace/indexer/writers/es/ShowValuesDocumentGenerator.scala b/indexer/src/main/scala/com/expedia/www/haystack/trace/indexer/writers/es/ShowValuesDocumentGenerator.scala new file mode 100644 index 00000000..5dba9fd7 --- /dev/null +++ b/indexer/src/main/scala/com/expedia/www/haystack/trace/indexer/writers/es/ShowValuesDocumentGenerator.scala @@ -0,0 +1,76 @@ +package com.expedia.www.haystack.trace.indexer.writers.es + +import java.time.Instant +import java.util.stream.Collectors + +import com.expedia.open.tracing.{Span, Tag} +import com.expedia.www.haystack.commons.metrics.MetricsSupport +import com.expedia.www.haystack.trace.commons.clients.es.document.ShowValuesDoc +import com.expedia.www.haystack.trace.commons.config.entities.{WhitelistIndexField, WhitelistIndexFieldConfiguration} +import com.expedia.www.haystack.trace.indexer.config.entities.ShowValuesConfiguration +import org.apache.commons.lang3.StringUtils + +import scala.collection.mutable +import scala.collection.JavaConverters._ + +class ShowValuesDocumentGenerator(config: ShowValuesConfiguration, whitelistIndexFieldConfiguration: WhitelistIndexFieldConfiguration) extends MetricsSupport { + + private var showValuesMap = new mutable.HashMap[String, mutable.HashMap[String, mutable.Set[String]]]() + private var lastFlushInstant = Instant.MIN + private var fieldCount = 0 + + private def shouldFlush: Boolean = { + config.flushIntervalInSec == 0 || Instant.now().minusSeconds(config.flushIntervalInSec).isAfter(lastFlushInstant) + } + + private def areStatementReadyToBeExecuted(): Seq[ShowValuesDoc] = { + if (showValuesMap.nonEmpty && (shouldFlush || fieldCount > config.flushOnMaxFieldCount)) { + val statements = showValuesMap.flatMap { + case (serviceName, fieldValuesMap) => + createShowValuesDoc(serviceName, fieldValuesMap) + } + + lastFlushInstant = Instant.now() + showValuesMap = new mutable.HashMap[String, mutable.HashMap[String, mutable.Set[String]]]() + fieldCount = 0 + statements.toSeq + } else { + Nil + } + } + + /** + * get the list of unique service metadata documents contained in the list of spans + * + * @param spans : list of service metadata + * @return + */ + def getAndUpdateShowValues(spans: Iterable[Span]): Seq[ShowValuesDoc] = { + this.synchronized { + val showValuesIndexField: List[WhitelistIndexField] = whitelistIndexFieldConfiguration.whitelistIndexFields.filter(p => p.showValue.equals(true)) + spans.foreach(span => { + val tagsToSave: List[Tag] = span.getTagsList.stream() + .filter(t => showValuesIndexField.exists(p => p.name.equalsIgnoreCase(t.getKey))) + .collect(Collectors.toList[Tag]()).asScala.toList + if (StringUtils.isNotEmpty(span.getServiceName) && tagsToSave.nonEmpty) { + val serviceInfo = showValuesMap.getOrElseUpdate(span.getServiceName, mutable.HashMap[String, mutable.Set[String]]()) + tagsToSave.foreach(tag => { + val tagValues = serviceInfo.getOrElseUpdate(tag.getKey, mutable.Set[String]()) + if (tagValues.add(tag.getVStr)) { + fieldCount += 1 + } + }) + } + }) + areStatementReadyToBeExecuted() + } + } + + /** + * @return index document that can be put in elastic search + */ + def createShowValuesDoc(serviceName: String, fieldValuesMap: mutable.HashMap[String, mutable.Set[String]]): List[ShowValuesDoc] = { + fieldValuesMap.flatMap(p => p._2.map(values => ShowValuesDoc(serviceName, p._1, values))).toList + } + +} diff --git a/indexer/src/main/scala/com/expedia/www/haystack/trace/indexer/writers/es/ShowValuesWriter.scala b/indexer/src/main/scala/com/expedia/www/haystack/trace/indexer/writers/es/ShowValuesWriter.scala new file mode 100644 index 00000000..9facab84 --- /dev/null +++ b/indexer/src/main/scala/com/expedia/www/haystack/trace/indexer/writers/es/ShowValuesWriter.scala @@ -0,0 +1,141 @@ +package com.expedia.www.haystack.trace.indexer.writers.es + +import java.util.concurrent.{Semaphore, TimeUnit} + +import com.expedia.open.tracing.buffer.SpanBuffer +import com.expedia.www.haystack.commons.metrics.MetricsSupport +import com.expedia.www.haystack.trace.commons.clients.es.document.ShowValuesDoc +import com.expedia.www.haystack.commons.retries.RetryOperation.withRetryBackoff +import com.expedia.www.haystack.trace.commons.config.entities.WhitelistIndexFieldConfiguration +import com.expedia.www.haystack.trace.commons.packer.PackedMessage +import com.expedia.www.haystack.trace.indexer.config.entities.ShowValuesConfiguration +import com.expedia.www.haystack.trace.indexer.metrics.AppMetricNames +import com.expedia.www.haystack.trace.indexer.writers.TraceWriter +import io.searchbox.client.{JestClient, JestClientFactory} +import io.searchbox.client.config.HttpClientConfig +import io.searchbox.core.{Bulk, Index} +import io.searchbox.indices.template.PutTemplate +import io.searchbox.params.Parameters +import org.joda.time.{DateTime, DateTimeZone} +import org.joda.time.format.DateTimeFormat +import org.slf4j.{Logger, LoggerFactory} + +import scala.collection.JavaConverters._ +import scala.util.Try + + +object ShowValuesUtils { + // creates an index name based on current date. following example illustrates the naming convention of + // elastic search indices for service metadata: + // service-metadata-2019-02-20 + def indexName(prefix: String): String = { + val eventTime = new DateTime(DateTimeZone.UTC) + val dataFormatter = DateTimeFormat.forPattern("yyyy-MM-dd") + s"$prefix-${dataFormatter.print(eventTime)}" + } +} + +class ShowValuesWriter(config: ShowValuesConfiguration, whitelistFieldConfig: WhitelistIndexFieldConfiguration) extends TraceWriter with MetricsSupport { + private val LOGGER: Logger= LoggerFactory.getLogger(ShowValuesConfiguration.getClass) + + // a timer that measures the amount of time it takes to complete one bulk write + private val writeTimer = metricRegistry.timer(AppMetricNames.METADATA_WRITE_TIME) + + // meter that measures the write failures + private val failureMeter = metricRegistry.meter(AppMetricNames.METADATA_WRITE_FAILURE) + + // converts a serviceMetadata object into an indexable document + private val documentGenerator = new ShowValuesDocumentGenerator(config, whitelistFieldConfig) + + + // this semaphore controls the parallel writes to service metadata index + private val inflightRequestsSemaphore = new Semaphore(config.maxInFlightBulkRequests, true) + + // initialize the elastic search client + private val esClient: JestClient = { + LOGGER.info("Initializing the http elastic search client with endpoint={}", config.esEndpoint) + + val factory = new JestClientFactory() + val builder = new HttpClientConfig.Builder(config.esEndpoint) + .multiThreaded(true) + .maxConnectionIdleTime(config.flushIntervalInSec + 10, TimeUnit.SECONDS) + .connTimeout(config.connectionTimeoutMillis) + .readTimeout(config.readTimeoutMillis) + + if (config.username.isDefined && config.password.isDefined) { + builder.defaultCredentials(config.username.get, config.password.get) + } + + factory.setHttpClientConfig(builder.build()) + factory.getObject + } + + private val bulkBuilder = new ThreadSafeBulkBuilder(config.maxDocsInBulk, config.maxBulkDocSizeInBytes) + + if (config.indexTemplateJson.isDefined) applyTemplate(config.indexTemplateJson.get) + + override def close(): Unit = { + LOGGER.info("Closing the elastic search client now.") + Try(esClient.shutdownClient()) + } + + /** + * converts the spans to an index document and writes to elastic search. Also if the parallel writes + * exceed the max inflight requests, then we block and this puts backpressure on upstream + * + * @param traceId trace id + * @param packedSpanBuffer list of spans belonging to this traceId - packed bytes of span buffer + * @param isLastSpanBuffer tells if this is the last record, so the writer can flush + * @return + */ + override def writeAsync(traceId: String, packedSpanBuffer: PackedMessage[SpanBuffer], isLastSpanBuffer: Boolean): Unit = { + var isSemaphoreAcquired = false + val idxDocument: Seq[ShowValuesDoc] = documentGenerator.getAndUpdateShowValues(packedSpanBuffer.protoObj.getChildSpansList.asScala) + idxDocument.foreach(document => { + try { + addIndexOperation(traceId, document, ServiceMetadataUtils.indexName(config.indexName)) match { + case Some(bulkToDispatch) => + inflightRequestsSemaphore.acquire() + isSemaphoreAcquired = true + + // execute the request async with retry + withRetryBackoff( + retryCallback => { + esClient.executeAsync(bulkToDispatch, new ElasticSearchResultHandler(writeTimer.time(), failureMeter, retryCallback)) + }, + config.retryConfig, + onSuccess = (_ : Any) => inflightRequestsSemaphore.release(), + onFailure = ex => { + inflightRequestsSemaphore.release() + LOGGER.error("Fail to write to ES after {} retry attempts", config.retryConfig.maxRetries, ex) + }) + case _ => + } + } catch { + case ex: Exception => + if (isSemaphoreAcquired) inflightRequestsSemaphore.release() + failureMeter.mark() + LOGGER.error("Failed to write spans to elastic search with exception", ex) + } + }) + } + + private def addIndexOperation(traceId: String, document: ShowValuesDoc, indexName: String): Option[Bulk] = { // add all the service field combinations in one bulk + val action: Index = new Index.Builder(document.json) + .index(indexName) + .`type`(config.indexType) + .setParameter(Parameters.CONSISTENCY, config.consistencyLevel) + .id(s"${document.servicename}_${document.fieldvalue}") + .build() + + bulkBuilder.addAction(action, document.json.getBytes("utf-8").length, forceBulkCreate = false) + } + + private def applyTemplate(templateJson: String) { + val putTemplateRequest = new PutTemplate.Builder("show-values-template", templateJson).build() + val result = esClient.execute(putTemplateRequest) + if (!result.isSucceeded) { + throw new RuntimeException(s"Fail to apply the following template to elastic search with reason=${result.getErrorMessage}") + } + } +} diff --git a/indexer/src/test/resources/config/base.conf b/indexer/src/test/resources/config/base.conf index 753a6ab0..ad2234e3 100644 --- a/indexer/src/test/resources/config/base.conf +++ b/indexer/src/test/resources/config/base.conf @@ -88,6 +88,41 @@ service.metadata { } } +show.values { + enabled = true + flush { + interval.sec = 60 + operation.count = 10000 + } + es { + endpoint = "http://elasticsearch:9200" + conn.timeout.ms = 10000 + read.timeout.ms = 5000 + consistency.level = "one" + index { + # apply the template before starting the client, if json is empty, no operation is performed + template.json = "some_template_json" + name = "show-values" + type = "fieldvalues-metadata" + } + # defines settings for bulk operation like max inflight bulks, number of documents and the total size in a single bulk + bulk.max { + docs { + count = 100 + size.kb = 1000 + } + inflight = 10 + } + retries { + max = 10 + backoff { + initial.ms = 100 + factor = 2 + } + } + } +} + elasticsearch { endpoint = "http://elasticsearch:9200" max.inflight.requests = 50 diff --git a/indexer/src/test/scala/com/expedia/www/haystack/trace/indexer/integration/BaseIntegrationTestSpec.scala b/indexer/src/test/scala/com/expedia/www/haystack/trace/indexer/integration/BaseIntegrationTestSpec.scala index 51ef660e..8b29840a 100644 --- a/indexer/src/test/scala/com/expedia/www/haystack/trace/indexer/integration/BaseIntegrationTestSpec.scala +++ b/indexer/src/test/scala/com/expedia/www/haystack/trace/indexer/integration/BaseIntegrationTestSpec.scala @@ -89,6 +89,7 @@ abstract class BaseIntegrationTestSpec extends WordSpec with GivenWhenThen with .setStartTime(System.currentTimeMillis() * 1000) .addTags(Tag.newBuilder().setKey("errorCode").setType(TagType.LONG).setVLong(404)) .addTags(Tag.newBuilder().setKey("_role").setType(TagType.STRING).setVStr("haystack")) + .addTags(Tag.newBuilder().setKey("pagename").setType(TagType.STRING).setVStr("homepage")) .addLogs(Log.newBuilder().addFields(Tag.newBuilder().setKey("exceptiontype").setType(TagType.STRING).setVStr("external").build()).build()) .build() } @@ -163,6 +164,24 @@ abstract class BaseIntegrationTestSpec extends WordSpec with GivenWhenThen with } + def verifyFields(): Unit = { + val fieldNamesQuery = + """{ + | "query" : { + | "match_all": {} + | }, + | "_source" : { + | "includes" : [ + | "fieldname", + | "fieldvalue", + | "servicename" + | ] + | } + |}""".stripMargin + val docs = elastic.queryShowValuesIndex(fieldNamesQuery) + docs.size shouldBe 6 + } + def verifyElasticSearchWrites(traceIds: Seq[String]): Unit = { val matchAllQuery = """{ diff --git a/indexer/src/test/scala/com/expedia/www/haystack/trace/indexer/integration/EvictedSpanBufferSpec.scala b/indexer/src/test/scala/com/expedia/www/haystack/trace/indexer/integration/EvictedSpanBufferSpec.scala index 8596d25f..36a81d3c 100644 --- a/indexer/src/test/scala/com/expedia/www/haystack/trace/indexer/integration/EvictedSpanBufferSpec.scala +++ b/indexer/src/test/scala/com/expedia/www/haystack/trace/indexer/integration/EvictedSpanBufferSpec.scala @@ -24,6 +24,7 @@ class EvictedSpanBufferSpec extends BaseIntegrationTestSpec { val indexTagsConfig = elastic.indexingConfig val backendConfig = traceBackendClient.buildConfig val serviceMetadataConfig = elastic.buildServiceMetadataConfig + val showValuesConfig = elastic.buildShowValuesConfig val accumulatorConfig = spanAccumulatorConfig.copy(minTracesPerCache = 1, maxEntriesAllStores = 1) produceSpansAsync(MAX_CHILD_SPANS, @@ -32,7 +33,7 @@ class EvictedSpanBufferSpec extends BaseIntegrationTestSpec { 0L, accumulatorConfig.bufferingWindowMillis) When(s"kafka-streams topology is started") - val topology = new StreamRunner(kafkaConfig, accumulatorConfig, esConfig, backendConfig, serviceMetadataConfig, indexTagsConfig) + val topology = new StreamRunner(kafkaConfig, accumulatorConfig, esConfig, backendConfig, serviceMetadataConfig, showValuesConfig, indexTagsConfig) topology.start() Then(s"we should get multiple span-buffers bearing only 1 span due to early eviction from store") diff --git a/indexer/src/test/scala/com/expedia/www/haystack/trace/indexer/integration/FailedTopologyRecoverySpec.scala b/indexer/src/test/scala/com/expedia/www/haystack/trace/indexer/integration/FailedTopologyRecoverySpec.scala index fc07d76b..c3a02d4c 100644 --- a/indexer/src/test/scala/com/expedia/www/haystack/trace/indexer/integration/FailedTopologyRecoverySpec.scala +++ b/indexer/src/test/scala/com/expedia/www/haystack/trace/indexer/integration/FailedTopologyRecoverySpec.scala @@ -42,6 +42,7 @@ class FailedTopologyRecoverySpec extends BaseIntegrationTestSpec { val backendConfig = traceBackendClient.buildConfig val serviceMetadataConfig = elastic.buildServiceMetadataConfig val accumulatorConfig = spanAccumulatorConfig.copy(pollIntervalMillis = spanAccumulatorConfig.pollIntervalMillis * 5) + val showValuesConfig = elastic.buildShowValuesConfig val startTimestamp = System.currentTimeMillis() produceSpansAsync( MAX_CHILD_SPANS_PER_TRACE, @@ -51,7 +52,7 @@ class FailedTopologyRecoverySpec extends BaseIntegrationTestSpec { spanAccumulatorConfig.bufferingWindowMillis) When(s"kafka-streams topology is started and then stopped forcefully after few sec") - var topology = new StreamRunner(kafkaConfig, accumulatorConfig, esConfig, backendConfig, serviceMetadataConfig, indexTagsConfig) + var topology = new StreamRunner(kafkaConfig, accumulatorConfig, esConfig, backendConfig, serviceMetadataConfig, showValuesConfig, indexTagsConfig) topology.start() Thread.sleep(7000) topology.close() @@ -60,7 +61,7 @@ class FailedTopologyRecoverySpec extends BaseIntegrationTestSpec { Thread.sleep(6000) Then(s"on restart of the topology, we should be able to read complete trace created in previous run from the '${kafka.OUTPUT_TOPIC}' topic in kafka, trace-backend and elasticsearch") - topology = new StreamRunner(kafkaConfig, accumulatorConfig, esConfig, backendConfig, serviceMetadataConfig, indexTagsConfig) + topology = new StreamRunner(kafkaConfig, accumulatorConfig, esConfig, backendConfig, serviceMetadataConfig, showValuesConfig, indexTagsConfig) topology.start() // produce one more span record with same traceId to trigger punctuate diff --git a/indexer/src/test/scala/com/expedia/www/haystack/trace/indexer/integration/MultipleTraceIndexingTopologySpec.scala b/indexer/src/test/scala/com/expedia/www/haystack/trace/indexer/integration/MultipleTraceIndexingTopologySpec.scala index a9716646..5a4e6d57 100644 --- a/indexer/src/test/scala/com/expedia/www/haystack/trace/indexer/integration/MultipleTraceIndexingTopologySpec.scala +++ b/indexer/src/test/scala/com/expedia/www/haystack/trace/indexer/integration/MultipleTraceIndexingTopologySpec.scala @@ -42,6 +42,7 @@ class MultipleTraceIndexingTopologySpec extends BaseIntegrationTestSpec { val indexTagsConfig = elastic.indexingConfig val backendConfig = traceBackendClient.buildConfig val serviceMetadataConfig = elastic.buildServiceMetadataConfig + val showValuesConfig = elastic.buildShowValuesConfig When(s"spans are produced in '${kafka.INPUT_TOPIC}' topic async, and kafka-streams topology is started") val traceDescriptions = List(TraceDescription(TRACE_ID_5, SPAN_ID_PREFIX_2),TraceDescription(TRACE_ID_9, SPAN_ID_PREFIX_1)) @@ -52,7 +53,7 @@ class MultipleTraceIndexingTopologySpec extends BaseIntegrationTestSpec { startRecordTimestamp = 0, maxRecordTimestamp = spanAccumulatorConfig.bufferingWindowMillis) - val topology = new StreamRunner(kafkaConfig, spanAccumulatorConfig, esConfig, backendConfig, serviceMetadataConfig, indexTagsConfig) + val topology = new StreamRunner(kafkaConfig, spanAccumulatorConfig, esConfig, backendConfig, serviceMetadataConfig, showValuesConfig, indexTagsConfig) topology.start() Then(s"we should read two span buffers with different traceIds from '${kafka.OUTPUT_TOPIC}' topic and same should be read from trace-backend and elastic search") diff --git a/indexer/src/test/scala/com/expedia/www/haystack/trace/indexer/integration/PartialTraceIndexingTopologySpec.scala b/indexer/src/test/scala/com/expedia/www/haystack/trace/indexer/integration/PartialTraceIndexingTopologySpec.scala index ca484104..b7e03cdb 100644 --- a/indexer/src/test/scala/com/expedia/www/haystack/trace/indexer/integration/PartialTraceIndexingTopologySpec.scala +++ b/indexer/src/test/scala/com/expedia/www/haystack/trace/indexer/integration/PartialTraceIndexingTopologySpec.scala @@ -40,6 +40,7 @@ class PartialTraceIndexingTopologySpec extends BaseIntegrationTestSpec { val indexTagsConfig = elastic.indexingConfig val backendConfig = traceBackendClient.buildConfig val serviceMetadataConfig = elastic.buildServiceMetadataConfig + val showValuesConfig = elastic.buildShowValuesConfig val traceDescription = List(TraceDescription(TRACE_ID, SPAN_ID_PREFIX)) When(s"spans are produced in '${kafka.INPUT_TOPIC}' topic async, and kafka-streams topology is started") @@ -50,7 +51,7 @@ class PartialTraceIndexingTopologySpec extends BaseIntegrationTestSpec { 0L, spanAccumulatorConfig.bufferingWindowMillis) - val topology = new StreamRunner(kafkaConfig, spanAccumulatorConfig, esConfig, backendConfig, serviceMetadataConfig, indexTagsConfig) + val topology = new StreamRunner(kafkaConfig, spanAccumulatorConfig, esConfig, backendConfig, serviceMetadataConfig, showValuesConfig, indexTagsConfig) topology.start() Then(s"we should read one span buffer object from '${kafka.OUTPUT_TOPIC}' topic and the same should be searchable in trace-backend and elastic search") diff --git a/indexer/src/test/scala/com/expedia/www/haystack/trace/indexer/integration/ServiceMetadataIndexingTopologySpec.scala b/indexer/src/test/scala/com/expedia/www/haystack/trace/indexer/integration/ServiceMetadataIndexingTopologySpec.scala index 5e7ba693..c4824ee7 100644 --- a/indexer/src/test/scala/com/expedia/www/haystack/trace/indexer/integration/ServiceMetadataIndexingTopologySpec.scala +++ b/indexer/src/test/scala/com/expedia/www/haystack/trace/indexer/integration/ServiceMetadataIndexingTopologySpec.scala @@ -41,6 +41,7 @@ class ServiceMetadataIndexingTopologySpec extends BaseIntegrationTestSpec { val indexTagsConfig = elastic.indexingConfig val backendConfig = traceBackendClient.buildConfig val serviceMetadataConfig = elastic.buildServiceMetadataConfig + val showValuesConfig = elastic.buildShowValuesConfig When(s"spans are produced in '${kafka.INPUT_TOPIC}' topic async, and kafka-streams topology is started") val traceDescriptions = List(TraceDescription(TRACE_ID_6, SPAN_ID_PREFIX_1), TraceDescription(TRACE_ID_7, SPAN_ID_PREFIX_2)) @@ -51,7 +52,7 @@ class ServiceMetadataIndexingTopologySpec extends BaseIntegrationTestSpec { 0, spanAccumulatorConfig.bufferingWindowMillis) - val topology = new StreamRunner(kafkaConfig, spanAccumulatorConfig, esConfig, backendConfig, serviceMetadataConfig, indexTagsConfig) + val topology = new StreamRunner(kafkaConfig, spanAccumulatorConfig, esConfig, backendConfig, serviceMetadataConfig, showValuesConfig, indexTagsConfig) topology.start() Then(s"we should read two multiple service operation combinations in elastic search") diff --git a/indexer/src/test/scala/com/expedia/www/haystack/trace/indexer/integration/ShowValuesIndexingTopologySpec.scala b/indexer/src/test/scala/com/expedia/www/haystack/trace/indexer/integration/ShowValuesIndexingTopologySpec.scala new file mode 100644 index 00000000..5b3e123f --- /dev/null +++ b/indexer/src/test/scala/com/expedia/www/haystack/trace/indexer/integration/ShowValuesIndexingTopologySpec.scala @@ -0,0 +1,52 @@ +package com.expedia.www.haystack.trace.indexer.integration + +import java.util + +import com.expedia.open.tracing.buffer.SpanBuffer +import com.expedia.www.haystack.trace.indexer.StreamRunner +import org.apache.kafka.streams.KeyValue +import org.apache.kafka.streams.integration.utils.IntegrationTestUtils + +import scala.concurrent.duration._ + +class ShowValuesIndexingTopologySpec extends BaseIntegrationTestSpec { + private val MAX_CHILD_SPANS_PER_TRACE = 5 + private val TRACE_ID_6 = "traceid-6" + private val TRACE_ID_7 = "traceid-7" + private val SPAN_ID_PREFIX_1 = TRACE_ID_6 + "span-id-" + private val SPAN_ID_PREFIX_2 = TRACE_ID_7 + "span-id-" + + "Show values topology" should { + s"consume spans from input ${kafka.INPUT_TOPIC} and write ShowValueDocs for the whitelisted fields for which showValues is true" in { + Given("a set of spans with different servicenames and a project configuration") + val kafkaConfig = kafka.buildConfig + val esConfig = elastic.buildConfig + val indexTagsConfig = elastic.indexingConfig + val backendConfig = traceBackendClient.buildConfig + val serviceMetadataConfig = elastic.buildServiceMetadataConfig + val showValuesConfig = elastic.buildShowValuesConfig + + When(s"spans are produced in '${kafka.INPUT_TOPIC}' topic async, and kafka-streams topology is started") + val traceDescriptions = List(TraceDescription(TRACE_ID_6, SPAN_ID_PREFIX_1), TraceDescription(TRACE_ID_7, SPAN_ID_PREFIX_2)) + + produceSpansAsync(MAX_CHILD_SPANS_PER_TRACE, + 1.seconds, + traceDescriptions, + 0, + spanAccumulatorConfig.bufferingWindowMillis) + + val topology = new StreamRunner(kafkaConfig, spanAccumulatorConfig, esConfig, backendConfig, serviceMetadataConfig, showValuesConfig, indexTagsConfig) + topology.start() + + Then(s"we should read two multiple service operation combinations in elastic search") + try { + val result: util.List[KeyValue[String, SpanBuffer]] = + IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(kafka.RESULT_CONSUMER_CONFIG, kafka.OUTPUT_TOPIC, 2, MAX_WAIT_FOR_OUTPUT_MS) + Thread.sleep(6000) + verifyFields() + } finally { + topology.close() + } + } + } +} diff --git a/indexer/src/test/scala/com/expedia/www/haystack/trace/indexer/integration/clients/ElasticSearchTestClient.scala b/indexer/src/test/scala/com/expedia/www/haystack/trace/indexer/integration/clients/ElasticSearchTestClient.scala index 3dd407af..56a19fb1 100644 --- a/indexer/src/test/scala/com/expedia/www/haystack/trace/indexer/integration/clients/ElasticSearchTestClient.scala +++ b/indexer/src/test/scala/com/expedia/www/haystack/trace/indexer/integration/clients/ElasticSearchTestClient.scala @@ -22,7 +22,7 @@ import java.util.Date import com.expedia.www.haystack.commons.retries.RetryOperation import com.expedia.www.haystack.trace.commons.config.entities._ -import com.expedia.www.haystack.trace.indexer.config.entities.{ElasticSearchConfiguration, ServiceMetadataWriteConfiguration} +import com.expedia.www.haystack.trace.indexer.config.entities.{ElasticSearchConfiguration, ServiceMetadataWriteConfiguration, ShowValuesConfiguration} import io.searchbox.client.config.HttpClientConfig import io.searchbox.client.{JestClient, JestClientFactory} import io.searchbox.core.Search @@ -97,11 +97,33 @@ class ElasticSearchTestClient { retryConfig = RetryOperation.Config(10, 250, 2)) } + def buildShowValuesConfig: ShowValuesConfiguration = { + ShowValuesConfiguration(enabled = true, + esEndpoint = ELASTIC_SEARCH_ENDPOINT, + username = None, + password = None, + consistencyLevel = "one", + indexTemplateJson = Some(SHOW_VALUES_INDEX_TEMPLATE), + indexName = "show-values", + indexType = "fieldvalues-metadata", + connectionTimeoutMillis = 3000, + readTimeoutMillis = 3000, + maxInFlightBulkRequests = 10, + maxDocsInBulk = 5, + maxBulkDocSizeInBytes = 50, + flushIntervalInSec = 10, + flushOnMaxFieldCount = 10, + retryConfig = RetryOperation.Config(10, 250, 2)) + } + def indexingConfig: WhitelistIndexFieldConfiguration = { val cfg = WhitelistIndexFieldConfiguration() val cfgJsonData = Serialization.write(WhiteListIndexFields( - List(WhitelistIndexField(name = "role", `type` = IndexFieldType.string, aliases = Set("_role")), WhitelistIndexField(name = "errorcode", `type` = IndexFieldType.long)))) + List( + WhitelistIndexField(name = "role", `type` = IndexFieldType.string, aliases = Set("_role")), + WhitelistIndexField(name = "errorcode", `type` = IndexFieldType.long), + WhitelistIndexField(name = "pagename", `type` = IndexFieldType.long, showValue = true)))) cfg.onReload(cfgJsonData) cfg } @@ -121,13 +143,23 @@ class ElasticSearchTestClient { } } + def queryShowValuesIndex(query: String): List[String] = { + val SHOW_VALUES_INDEX_NAME = "show-values" + val SHOW_VALUES_INDEX_TYPE = "fieldvalues-metadata" + queryIndex(query, SHOW_VALUES_INDEX_NAME, SHOW_VALUES_INDEX_TYPE) + } + def queryServiceMetadataIndex(query: String): List[String] = { - import scala.collection.JavaConverters._ val SERVICE_METADATA_INDEX_NAME = "service-metadata" val SERVICE_METADATA_INDEX_TYPE = "metadata" + queryIndex(query, SERVICE_METADATA_INDEX_NAME, SERVICE_METADATA_INDEX_TYPE) + } + + def queryIndex(query: String, index: String, index_type: String): List[String] = { + import scala.collection.JavaConverters._ val searchQuery = new Search.Builder(query) - .addIndex(SERVICE_METADATA_INDEX_NAME) - .addType(SERVICE_METADATA_INDEX_TYPE) + .addIndex(index) + .addType(index_type) .build() val result = esClient.execute(searchQuery) if (result.getSourceAsStringList != null && result.getSourceAsStringList.size() > 0) { @@ -210,6 +242,53 @@ class ElasticSearchTestClient { |} |""".stripMargin + private val SHOW_VALUES_INDEX_TEMPLATE = + """{ + |"template": "show-values*", + |"aliases": { + | "show-values":{} + |}, + |"settings": { + | "number_of_shards": 4, + | "index.mapping.ignore_malformed": true, + | "analysis": { + | "normalizer": { + | "lowercase_normalizer": { + | "type": "custom", + | "filter": ["lowercase"] + | } + | } + | } + |}, + |"mappings": { + | "fieldvalues-metadata": { + | "_field_names": { + | "enabled": false + | }, + | "_all": { + | "enabled": false + | }, + | "properties": { + | "servicename": { + | "type": "keyword", + | "norms": false, + | "doc_values": false + | }, + | "fieldname": { + | "type": "keyword", + | "norms": false, + | "doc_values": false + | }, + | "fieldvalue": { + | "type": "keyword", + | "norms": false, + | "doc_values": false + | } + | } + | } + |} + |}""".stripMargin + private val SERVICE_METADATA_INDEX_TEMPLATE = """{ | "template": "service-metadata*", diff --git a/indexer/src/test/scala/com/expedia/www/haystack/trace/indexer/unit/ConfigurationLoaderSpec.scala b/indexer/src/test/scala/com/expedia/www/haystack/trace/indexer/unit/ConfigurationLoaderSpec.scala index d569c59c..9080af53 100644 --- a/indexer/src/test/scala/com/expedia/www/haystack/trace/indexer/unit/ConfigurationLoaderSpec.scala +++ b/indexer/src/test/scala/com/expedia/www/haystack/trace/indexer/unit/ConfigurationLoaderSpec.scala @@ -108,5 +108,25 @@ class ConfigurationLoaderSpec extends FunSpec with Matchers { elastic.indexHourBucket shouldBe 6 elastic.maxConnectionsPerRoute shouldBe 10 } + + it("should load the show values config from base.conf") { + val config = project.showValuesConfig + config.flushIntervalInSec shouldBe 60 + config.flushOnMaxFieldCount shouldBe 10000 + config.esEndpoint shouldBe "http://elasticsearch:9200" + config.maxInFlightBulkRequests shouldBe 10 + config.maxDocsInBulk shouldBe 100 + config.maxBulkDocSizeInBytes shouldBe 1000000 + config.indexTemplateJson shouldBe Some("some_template_json") + config.consistencyLevel shouldBe "one" + config.readTimeoutMillis shouldBe 5000 + config.connectionTimeoutMillis shouldBe 10000 + config.indexName shouldBe "show-values" + config.indexType shouldBe "fieldvalues-metadata" + config.retryConfig.maxRetries shouldBe 10 + config.retryConfig.backOffInMillis shouldBe 100 + config.retryConfig.backoffFactor shouldBe 2 + } + } } diff --git a/indexer/src/test/scala/com/expedia/www/haystack/trace/indexer/unit/ShowValuesDocumentGeneratorSpec.scala b/indexer/src/test/scala/com/expedia/www/haystack/trace/indexer/unit/ShowValuesDocumentGeneratorSpec.scala new file mode 100644 index 00000000..ad7596b4 --- /dev/null +++ b/indexer/src/test/scala/com/expedia/www/haystack/trace/indexer/unit/ShowValuesDocumentGeneratorSpec.scala @@ -0,0 +1,94 @@ +package com.expedia.www.haystack.trace.indexer.unit + +import java.util.concurrent.TimeUnit + +import com.expedia.open.tracing.{Span, Tag} +import com.expedia.www.haystack.trace.commons.clients.es.document.ShowValuesDoc +import com.expedia.www.haystack.trace.commons.config.entities.{IndexFieldType, WhiteListIndexFields, WhitelistIndexField, WhitelistIndexFieldConfiguration} +import com.expedia.www.haystack.trace.indexer.config.ProjectConfiguration +import com.expedia.www.haystack.trace.indexer.writers.es.ShowValuesDocumentGenerator +import org.json4s.ext.EnumNameSerializer +import org.json4s.jackson.Serialization +import org.json4s.{DefaultFormats, Formats} +import org.scalatest.{FunSpec, Matchers} + +import scala.collection.mutable + +class ShowValuesDocumentGeneratorSpec extends FunSpec with Matchers { + protected implicit val formats: Formats = DefaultFormats + new EnumNameSerializer(IndexFieldType) + + private val TRACE_ID = "trace_id" + private val START_TIME_1 = 1529042838469123l + private val START_TIME_2 = 1529042848469000l + private val project = new ProjectConfiguration + + private val LONG_DURATION = TimeUnit.SECONDS.toMicros(25) + TimeUnit.MICROSECONDS.toMicros(500) + + describe("Show values document generator") { + it("should create show value docs for given servicename and fieldvaluesmap") { + //Whatever is passed through Whitelistindexconfiguration is irrelevant to the generator for this test + val generator = new ShowValuesDocumentGenerator(project.showValuesConfig, WhitelistIndexFieldConfiguration()) + val serviceName = "service1" + val fieldValuesMap: mutable.HashMap[String, mutable.Set[String]] = new mutable.HashMap[String, mutable.Set[String]](); + fieldValuesMap.update("page-name", mutable.Set[String]("pageA", "pageB")) + fieldValuesMap.update("page-id", mutable.Set[String]("1", "2")) + + val docs = generator.createShowValuesDoc(serviceName, fieldValuesMap) + docs.head shouldEqual ShowValuesDoc(serviceName, "page-id", "2") + docs(1) shouldEqual ShowValuesDoc(serviceName, "page-id", "1") + docs(2) shouldEqual ShowValuesDoc(serviceName, "page-name", "pageA") + docs(3) shouldEqual ShowValuesDoc(serviceName, "page-name", "pageB") + docs.size shouldEqual 4 + } + + it("should create showvaluedocuments for given spans using the whitelistfieldconfiguration(empty) and return showValues docs accumulated if conditions met") { + val config = WhitelistIndexFieldConfiguration() + val generator = new ShowValuesDocumentGenerator(project.showValuesConfig, config) + + val span_1 = Span.newBuilder().setTraceId(TRACE_ID) + .setSpanId("span-1") + .setServiceName("service1") + .setOperationName("op1") + .setStartTime(START_TIME_1) + .setDuration(610000L) + .addTags(Tag.newBuilder().setKey("page-name").setVStr("pageA")) + .addTags(Tag.newBuilder().setKey("page-id").setVStr("1")) + .build() + val span_2 = Span.newBuilder().setTraceId(TRACE_ID) + .setSpanId("span-2") + .setServiceName("service2") + .setOperationName("op1") + .setStartTime(START_TIME_1) + .setDuration(500000L) + .addTags(Tag.newBuilder().setKey("page-name").setVStr("pageB")) + .addTags(Tag.newBuilder().setKey("page-id").setVStr("2")) + .build() + val span_3 = Span.newBuilder().setTraceId(TRACE_ID) + .setSpanId("span-3") + .setServiceName("service1") + .setDuration(LONG_DURATION) + .setStartTime(START_TIME_2) + .addTags(Tag.newBuilder().setKey("page-name").setVStr("pageB")) + .addTags(Tag.newBuilder().setKey("page-id").setVStr("2")) + .setOperationName("op3").build() + + val docs1 = generator.getAndUpdateShowValues(List(span_1, span_2, span_3)) + //Because whitelistfields are empty + docs1.size shouldEqual 0 + + val whitelistField_1 = WhitelistIndexField(name = "page-name", `type` = IndexFieldType.string, showValue = true) + val whitelistField_2 = WhitelistIndexField(name = "page-id", `type` = IndexFieldType.string, showValue = true) + + val cfgJsonData = Serialization.write(WhiteListIndexFields(List(whitelistField_1, whitelistField_2))) + + // reload with the given whitelisted fields + config.onReload(cfgJsonData) + val docs2 = generator.getAndUpdateShowValues(List(span_1, span_2, span_3)) + docs2.size shouldEqual 6 + docs2.map(d => d.servicename) should contain allOf ("service1", "service2") + docs2.map(d => d.fieldname) should contain allOf ("page-name", "page-id") + docs2.map(d => d.fieldvalue) should contain allOf ("pageA", "pageB", "1", "2") + docs2.foreach(d => List) + } + } +} diff --git a/reader/src/main/resources/config/base.conf b/reader/src/main/resources/config/base.conf index aeb64049..f8b6200e 100644 --- a/reader/src/main/resources/config/base.conf +++ b/reader/src/main/resources/config/base.conf @@ -44,6 +44,11 @@ elasticsearch { name = "service-metadata" type = "metadata" } + show.values { + enabled = true + name = "show-values" + type = "fieldvalues-metadata" + } } } diff --git a/reader/src/main/scala/com/expedia/www/haystack/trace/reader/config/ProviderConfiguration.scala b/reader/src/main/scala/com/expedia/www/haystack/trace/reader/config/ProviderConfiguration.scala index ec6c0c78..dd6e39dd 100644 --- a/reader/src/main/scala/com/expedia/www/haystack/trace/reader/config/ProviderConfiguration.scala +++ b/reader/src/main/scala/com/expedia/www/haystack/trace/reader/config/ProviderConfiguration.scala @@ -101,13 +101,22 @@ class ProviderConfiguration { metadataCfg.getString("name"), metadataCfg.getString("type")) } + private val showValuesIndexConfiguration: ShowValuesIndexConfiguration = { + val showValuesConf = config.getConfig("elasticsearch.index.show.values") + + ShowValuesIndexConfiguration( + showValuesConf.getBoolean("enabled"), + showValuesConf.getString("name"), + showValuesConf.getString("type")) + } val elasticSearchConfiguration: ElasticSearchConfiguration = { ElasticSearchConfiguration( clientConfiguration = elasticSearchClientConfig, spansIndexConfiguration = spansIndexConfiguration, - serviceMetadataIndexConfiguration = serviceMetadataIndexConfig + serviceMetadataIndexConfiguration = serviceMetadataIndexConfig, + showValuesIndexConfiguration = showValuesIndexConfiguration ) } diff --git a/reader/src/main/scala/com/expedia/www/haystack/trace/reader/config/entities/ElasticSearchConfiguration.scala b/reader/src/main/scala/com/expedia/www/haystack/trace/reader/config/entities/ElasticSearchConfiguration.scala index f25d87fb..fbcbce59 100644 --- a/reader/src/main/scala/com/expedia/www/haystack/trace/reader/config/entities/ElasticSearchConfiguration.scala +++ b/reader/src/main/scala/com/expedia/www/haystack/trace/reader/config/entities/ElasticSearchConfiguration.scala @@ -34,4 +34,9 @@ case class ServiceMetadataIndexConfiguration(enabled: Boolean, case class ElasticSearchConfiguration(clientConfiguration: ElasticSearchClientConfiguration, spansIndexConfiguration: SpansIndexConfiguration, - serviceMetadataIndexConfiguration: ServiceMetadataIndexConfiguration) \ No newline at end of file + serviceMetadataIndexConfiguration: ServiceMetadataIndexConfiguration, + showValuesIndexConfiguration: ShowValuesIndexConfiguration) + +case class ShowValuesIndexConfiguration(enabled: Boolean, + indexName: String, + indexType: String) \ No newline at end of file diff --git a/reader/src/main/scala/com/expedia/www/haystack/trace/reader/stores/EsIndexedTraceStore.scala b/reader/src/main/scala/com/expedia/www/haystack/trace/reader/stores/EsIndexedTraceStore.scala index 6c513352..7c53e742 100644 --- a/reader/src/main/scala/com/expedia/www/haystack/trace/reader/stores/EsIndexedTraceStore.scala +++ b/reader/src/main/scala/com/expedia/www/haystack/trace/reader/stores/EsIndexedTraceStore.scala @@ -22,7 +22,7 @@ import com.expedia.www.haystack.trace.commons.clients.es.document.TraceIndexDoc import com.expedia.www.haystack.trace.commons.config.entities.{TraceStoreBackends, WhitelistIndexFieldConfiguration} import com.expedia.www.haystack.trace.reader.config.entities.ElasticSearchConfiguration import com.expedia.www.haystack.trace.reader.stores.readers.es.ElasticSearchReader -import com.expedia.www.haystack.trace.reader.stores.readers.es.query.{FieldValuesQueryGenerator, ServiceMetadataQueryGenerator, TraceCountsQueryGenerator, TraceSearchQueryGenerator} +import com.expedia.www.haystack.trace.reader.stores.readers.es.query.{FieldValuesQueryGenerator, ServiceMetadataQueryGenerator, ShowValuesQueryGenerator, TraceCountsQueryGenerator, TraceSearchQueryGenerator} import com.expedia.www.haystack.trace.reader.stores.readers.grpc.GrpcTraceReaders import io.searchbox.core.SearchResult import org.elasticsearch.index.IndexNotFoundException @@ -43,6 +43,8 @@ class EsIndexedTraceStore(traceStoreBackendConfig: TraceStoreBackends, private val traceCountsQueryGenerator = new TraceCountsQueryGenerator(elasticSearchConfiguration.spansIndexConfiguration, ES_NESTED_DOC_NAME, whitelistedFieldsConfiguration) private val fieldValuesQueryGenerator = new FieldValuesQueryGenerator(elasticSearchConfiguration.spansIndexConfiguration, ES_NESTED_DOC_NAME, whitelistedFieldsConfiguration) private val serviceMetadataQueryGenerator = new ServiceMetadataQueryGenerator(elasticSearchConfiguration.serviceMetadataIndexConfiguration) + private val showValuesQueryGenerator = new ShowValuesQueryGenerator(elasticSearchConfiguration.showValuesIndexConfiguration) + private val FIELD_VALUE_KEY = "fieldvalue" private val esCountTraces = (request: TraceCountsRequest, useSpecificIndices: Boolean) => { esReader.count(traceCountsQueryGenerator.generate(request, useSpecificIndices)) @@ -121,12 +123,27 @@ class EsIndexedTraceStore(traceStoreBackendConfig: TraceStoreBackends, } } + private def readFromShowValues(request: FieldValuesRequest): Option[Future[Seq[String]]] = { + val showValuesIndexConfiguration = elasticSearchConfiguration.showValuesIndexConfiguration + if (!showValuesIndexConfiguration.enabled) return None + + Some(esReader + .search(showValuesQueryGenerator.generateSearchFieldValuesQuery(request.getFilters(0).getValue, request.getFieldName)) + .map(extractFieldValues(_, FIELD_VALUE_KEY))) + } + + private def readFromSpansIndex(request: FieldValuesRequest): Future[Seq[String]] = { + esReader + .search(fieldValuesQueryGenerator.generate(request)) + .map(extractFieldValues(_, request.getFieldName.toLowerCase)) + } override def getFieldValues(request: FieldValuesRequest): Future[Seq[String]] = { - readFromServiceMetadata(request).getOrElse( - esReader - .search(fieldValuesQueryGenerator.generate(request)) - .map(extractFieldValues(_, request.getFieldName.toLowerCase))) + if (request.getFieldName.equalsIgnoreCase(TraceIndexDoc.SERVICE_KEY_NAME) || request.getFieldName.equalsIgnoreCase(TraceIndexDoc.OPERATION_KEY_NAME)) { + readFromServiceMetadata(request).getOrElse(readFromSpansIndex(request)) + } else { + readFromShowValues(request).getOrElse(readFromSpansIndex(request)) + } } override def getTraceCounts(request: TraceCountsRequest): Future[TraceCounts] = { diff --git a/reader/src/main/scala/com/expedia/www/haystack/trace/reader/stores/ResponseParser.scala b/reader/src/main/scala/com/expedia/www/haystack/trace/reader/stores/ResponseParser.scala index aaa6cf71..917cd674 100644 --- a/reader/src/main/scala/com/expedia/www/haystack/trace/reader/stores/ResponseParser.scala +++ b/reader/src/main/scala/com/expedia/www/haystack/trace/reader/stores/ResponseParser.scala @@ -36,6 +36,10 @@ trait ResponseParser { private val ES_COUNT_PER_INTERVAL = "__count_per_interval" private val ES_AGG_DOC_COUNT = "doc_count" protected val ES_NESTED_DOC_NAME = "spans" + private val ES_HITS_KEY = "hits" + private val ES_SOURCE_KEY = "_source" + private val ES_FIELD_VALUE_KEY = "fieldvalue" + protected def mapSearchResultToTraceCounts(result: SearchResult): Future[TraceCounts] = { val aggregation = result.getJsonObject @@ -69,27 +73,36 @@ trait ResponseParser { } protected def extractFieldValues(result: SearchResult, fieldName: String): List[String] = { - val aggregations = - result - .getJsonObject - .getAsJsonObject(ES_FIELD_AGGREGATIONS) - .getAsJsonObject(ES_NESTED_DOC_NAME) - .getAsJsonObject(fieldName) - - if (aggregations.has(ES_FIELD_BUCKETS)) { - aggregations - .getAsJsonArray(ES_FIELD_BUCKETS) - .asScala - .map(element => element.getAsJsonObject.get(ES_FIELD_KEY).getAsString) - .toList - } - else { - aggregations - .getAsJsonObject(fieldName) - .getAsJsonArray(ES_FIELD_BUCKETS) - .asScala - .map(element => element.getAsJsonObject.get(ES_FIELD_KEY).getAsString) - .toList + + val jsonResult = result.getJsonObject + + if (jsonResult.has(ES_FIELD_AGGREGATIONS)) { + val aggregations = jsonResult + .getAsJsonObject(ES_FIELD_AGGREGATIONS) + .getAsJsonObject(ES_NESTED_DOC_NAME) + .getAsJsonObject(fieldName) + + if (aggregations.has(ES_FIELD_BUCKETS)) { + aggregations + .getAsJsonArray(ES_FIELD_BUCKETS) + .asScala + .map(element => element.getAsJsonObject.get(ES_FIELD_KEY).getAsString) + .toList + } + else { + aggregations + .getAsJsonObject(fieldName) + .getAsJsonArray(ES_FIELD_BUCKETS) + .asScala + .map(element => element.getAsJsonObject.get(ES_FIELD_KEY).getAsString) + .toList + } + } else { + jsonResult + .getAsJsonObject(ES_HITS_KEY) + .getAsJsonArray(ES_HITS_KEY) + .asScala.map(p => p.getAsJsonObject.getAsJsonObject(ES_SOURCE_KEY).get(ES_FIELD_VALUE_KEY).getAsString) + .toList.distinct } } diff --git a/reader/src/main/scala/com/expedia/www/haystack/trace/reader/stores/readers/es/query/ShowValuesQueryGenerator.scala b/reader/src/main/scala/com/expedia/www/haystack/trace/reader/stores/readers/es/query/ShowValuesQueryGenerator.scala new file mode 100644 index 00000000..6f140ac7 --- /dev/null +++ b/reader/src/main/scala/com/expedia/www/haystack/trace/reader/stores/readers/es/query/ShowValuesQueryGenerator.scala @@ -0,0 +1,64 @@ +/* + * Copyright 2017 Expedia, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.expedia.www.haystack.trace.reader.stores.readers.es.query + + +import com.expedia.www.haystack.trace.reader.config.entities.ShowValuesIndexConfiguration +import io.searchbox.core.Search +import org.apache.commons.lang3.StringUtils +import org.elasticsearch.index.query.QueryBuilders +import org.elasticsearch.index.query.QueryBuilders.termQuery +import org.elasticsearch.search.builder.SearchSourceBuilder + +class ShowValuesQueryGenerator(config: ShowValuesIndexConfiguration) { + private val SERVICE_NAME_KEY = "servicename" + private val FIELD_NAME_KEY = "fieldname" + private val FIELD_VALUE_KEY = "fieldvalue" + private val LIMIT = 10000 + + + def generateSearchFieldValuesQuery(serviceName: String, fieldName: String): Search = { + val fieldValuesQuery = buildFieldValuesQuery(serviceName, fieldName) + generateSearchQuery(fieldValuesQuery) + } + + private def generateSearchQuery(queryString: String): Search = { + new Search.Builder(queryString) + .addIndex(config.indexName) + .addType(config.indexType) + .build() + } + + private def buildFieldValuesQuery(serviceName: String, fieldName: String): String = { + if (StringUtils.isNotEmpty(serviceName)) { + new SearchSourceBuilder() + .query(QueryBuilders + .boolQuery() + .must(termQuery(SERVICE_NAME_KEY, serviceName)) + .must(termQuery(FIELD_NAME_KEY, fieldName))) + .fetchSource(Array[String](FIELD_VALUE_KEY), Array[String](FIELD_NAME_KEY, SERVICE_NAME_KEY)) + .size(LIMIT) + .toString + } else { + new SearchSourceBuilder() + .query(termQuery(FIELD_NAME_KEY, fieldName)) + .fetchSource(Array[String](FIELD_VALUE_KEY), Array[String](FIELD_NAME_KEY, SERVICE_NAME_KEY)) + .size(LIMIT) + .toString + } + } +} diff --git a/reader/src/test/resources/config/base.conf b/reader/src/test/resources/config/base.conf index a9c2f86f..9f629de4 100644 --- a/reader/src/test/resources/config/base.conf +++ b/reader/src/test/resources/config/base.conf @@ -36,6 +36,11 @@ elasticsearch { name = "service_metadata" type = "metadata" } + show.values { + enabled = true + name = "show-values" + type = "fieldvalues-metadata" + } } } diff --git a/reader/src/test/scala/com/expedia/www/haystack/trace/reader/integration/BaseIntegrationTestSpec.scala b/reader/src/test/scala/com/expedia/www/haystack/trace/reader/integration/BaseIntegrationTestSpec.scala index c0c7b5ce..9056e6de 100644 --- a/reader/src/test/scala/com/expedia/www/haystack/trace/reader/integration/BaseIntegrationTestSpec.scala +++ b/reader/src/test/scala/com/expedia/www/haystack/trace/reader/integration/BaseIntegrationTestSpec.scala @@ -26,7 +26,7 @@ import com.expedia.open.tracing.api.TraceReaderGrpc.TraceReaderBlockingStub import com.expedia.open.tracing.backend.StorageBackendGrpc.StorageBackendBlockingStub import com.expedia.open.tracing.backend.{StorageBackendGrpc, TraceRecord, WriteSpansRequest, WriteSpansResponse} import com.expedia.open.tracing.buffer.SpanBuffer -import com.expedia.www.haystack.trace.commons.clients.es.document.TraceIndexDoc +import com.expedia.www.haystack.trace.commons.clients.es.document.{ShowValuesDoc, TraceIndexDoc} import com.expedia.www.haystack.trace.commons.config.entities.{IndexFieldType, WhiteListIndexFields, WhitelistIndexField} import com.expedia.www.haystack.trace.commons.packer.{PackerFactory, PackerType} import com.expedia.www.haystack.trace.reader.Service @@ -56,6 +56,7 @@ trait BaseIntegrationTestSpec extends FunSpec with GivenWhenThen with Matchers w private val ELASTIC_SEARCH_ENDPOINT = "http://elasticsearch:9200" private val ELASTIC_SEARCH_WHITELIST_INDEX = "reload-configs" private val ELASTIC_SEARCH_WHITELIST_TYPE = "whitelist-index-fields" + private val ELASTIC_SEARCH_SHOW_VALUES_TYPE = "fieldvalues-metadata" private val SPANS_INDEX_TYPE = "spans" private val executors = Executors.newFixedThreadPool(2) @@ -70,6 +71,16 @@ trait BaseIntegrationTestSpec extends FunSpec with GivenWhenThen with Matchers w s"haystack-traces-$dateBucket-$hourBucket" } + + private val SHOW_VAlUES_INDEX = { + val date = new Date() + + val dateBucket = new SimpleDateFormat("yyyy-MM-dd").format(date) + val hourBucket = new SimpleDateFormat("HH").format(date).toInt / 6 + + s"show-values-$dateBucket" + } + private val INDEX_TEMPLATE = """{ | "template": "haystack-traces*", @@ -156,6 +167,54 @@ trait BaseIntegrationTestSpec extends FunSpec with GivenWhenThen with Matchers w |""".stripMargin + private val SHOW_VALUES_INDEX_TEMPLATE = + """{ + |"template": "show-values*", + |"aliases": { + | "show-values":{} + |}, + |"settings": { + | "number_of_shards": 4, + | "index.mapping.ignore_malformed": true, + | "analysis": { + | "normalizer": { + | "lowercase_normalizer": { + | "type": "custom", + | "filter": ["lowercase"] + | } + | } + | } + |}, + |"mappings": { + | "fieldvalues-metadata": { + | "_field_names": { + | "enabled": false + | }, + | "_all": { + | "enabled": false + | }, + | "properties": { + | "servicename": { + | "type": "keyword", + | "norms": false, + | "doc_values": false + | }, + | "fieldname": { + | "type": "keyword", + | "norms": false, + | "doc_values": false + | }, + | "fieldvalue": { + | "type": "keyword", + | "norms": false, + | "doc_values": false + | } + | } + | } + |} + |}""".stripMargin + + private var esClient: JestClient = _ private var traceBackendClient: StorageBackendBlockingStub = _ @@ -188,6 +247,10 @@ trait BaseIntegrationTestSpec extends FunSpec with GivenWhenThen with Matchers w .settings(INDEX_TEMPLATE) .build) + esClient.execute(new CreateIndex.Builder(SHOW_VAlUES_INDEX) + .settings(SHOW_VALUES_INDEX_TEMPLATE) + .build) + executors.submit(new Runnable { override def run(): Unit = Service.main(null) }) @@ -260,6 +323,17 @@ trait BaseIntegrationTestSpec extends FunSpec with GivenWhenThen with Matchers w Thread.sleep(10000) } + protected def putShowValueFieldsInEs(serviceName: String, fieldNameValuePairs: Set[(String, String)]): Unit = { + val showValuesDocList = fieldNameValuePairs.map(p => { + esClient.execute(new Index.Builder(Serialization.write(ShowValuesDoc(serviceName, p._1, p._2))) + .index(SHOW_VAlUES_INDEX) + .`type`(ELASTIC_SEARCH_SHOW_VALUES_TYPE) + .build()) + }) + + Thread.sleep(10000) + } + private def insertTraceInBackend(traceId: String, spanId: String, serviceName: String, diff --git a/reader/src/test/scala/com/expedia/www/haystack/trace/reader/integration/TraceServiceIntegrationTestSpec.scala b/reader/src/test/scala/com/expedia/www/haystack/trace/reader/integration/TraceServiceIntegrationTestSpec.scala index 1c870505..6901306d 100644 --- a/reader/src/test/scala/com/expedia/www/haystack/trace/reader/integration/TraceServiceIntegrationTestSpec.scala +++ b/reader/src/test/scala/com/expedia/www/haystack/trace/reader/integration/TraceServiceIntegrationTestSpec.scala @@ -84,6 +84,25 @@ class TraceServiceIntegrationTestSpec extends BaseIntegrationTestSpec { result.getValuesList.size() should be(2) result.getValuesList.asScala should contain allOf(op1, op2) } + + it("should return values of a given fields(other than service and operation name) with filters") { + Given("trace in trace-backend and elasticsearch") + val serviceName = "get_values_with_filters_non_service_operation_type_servicename" + val fieldNameValuePairs = Set[(String, String)](("tagname1", "tagvalue1"), ("tagname2", "tagvalue2")) + + putShowValueFieldsInEs(serviceName, fieldNameValuePairs) + val request = FieldValuesRequest.newBuilder() + .addFilters(Field.newBuilder().setName(TraceIndexDoc.SERVICE_KEY_NAME).setValue(serviceName)) + .setFieldName("tagname1") + .build() + + When("calling getFieldValues") + val result = client.getFieldValues(request) + + Then("should return filtered values for given field") + result.getValuesList.size() should be(1) + result.getValuesList.asScala should contain("tagvalue1") + } } describe("TraceReader.getTrace") { diff --git a/reader/src/test/scala/com/expedia/www/haystack/trace/reader/unit/config/ConfigurationLoaderSpec.scala b/reader/src/test/scala/com/expedia/www/haystack/trace/reader/unit/config/ConfigurationLoaderSpec.scala index 58e4feb3..c5c3036e 100644 --- a/reader/src/test/scala/com/expedia/www/haystack/trace/reader/unit/config/ConfigurationLoaderSpec.scala +++ b/reader/src/test/scala/com/expedia/www/haystack/trace/reader/unit/config/ConfigurationLoaderSpec.scala @@ -71,6 +71,10 @@ class ConfigurationLoaderSpec extends BaseUnitTestSpec { elasticSearchConfig.serviceMetadataIndexConfiguration.enabled shouldEqual false elasticSearchConfig.serviceMetadataIndexConfiguration.indexName shouldEqual "service_metadata" elasticSearchConfig.serviceMetadataIndexConfiguration.indexType shouldEqual "metadata" + + elasticSearchConfig.showValuesIndexConfiguration.enabled shouldEqual true + elasticSearchConfig.showValuesIndexConfiguration.indexName shouldEqual "show-values" + elasticSearchConfig.showValuesIndexConfiguration.indexType shouldEqual "fieldvalues-metadata" } it("should load trace backend configuration") { diff --git a/reader/src/test/scala/com/expedia/www/haystack/trace/reader/unit/stores/readers/es/query/ShowValuesQueryGeneratorSpec.scala b/reader/src/test/scala/com/expedia/www/haystack/trace/reader/unit/stores/readers/es/query/ShowValuesQueryGeneratorSpec.scala new file mode 100644 index 00000000..f9a71e25 --- /dev/null +++ b/reader/src/test/scala/com/expedia/www/haystack/trace/reader/unit/stores/readers/es/query/ShowValuesQueryGeneratorSpec.scala @@ -0,0 +1,46 @@ +/* + * Copyright 2017 Expedia, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.expedia.www.haystack.trace.reader.unit.stores.readers.es.query + +import com.expedia.www.haystack.trace.reader.config.entities.ShowValuesIndexConfiguration +import com.expedia.www.haystack.trace.reader.stores.readers.es.query.ShowValuesQueryGenerator +import com.expedia.www.haystack.trace.reader.unit.BaseUnitTestSpec +import com.google.gson.Gson + +class ShowValuesQueryGeneratorSpec extends BaseUnitTestSpec { + private val indexType = "fieldvalues-metadata" + private val showValuesIndexConfiguration = ShowValuesIndexConfiguration( + enabled = true, + indexName = "show-values", + indexType = indexType) + + describe("ShowValuesQueryGenerator") { + it("should generate valid bool queries for field") { + Given("a query generator") + val queryGenerator = new ShowValuesQueryGenerator(showValuesIndexConfiguration) + val serviceName = "test_service" + val fieldName = "test_field" + + When("asked for aggregated service name") + val query = queryGenerator.generateSearchFieldValuesQuery(serviceName, fieldName) + + Then("generate a valid query") + query.getType should be(indexType) + query.getData(new Gson()) shouldEqual "{\n \"size\" : 10000,\n \"query\" : {\n \"bool\" : {\n \"must\" : [\n {\n \"term\" : {\n \"servicename\" : {\n \"value\" : \"test_service\",\n \"boost\" : 1.0\n }\n }\n },\n {\n \"term\" : {\n \"fieldname\" : {\n \"value\" : \"test_field\",\n \"boost\" : 1.0\n }\n }\n }\n ],\n \"adjust_pure_negative\" : true,\n \"boost\" : 1.0\n }\n },\n \"_source\" : {\n \"includes\" : [\n \"fieldvalue\"\n ],\n \"excludes\" : [\n \"fieldname\",\n \"servicename\"\n ]\n }\n}" + query.toString shouldEqual "Search{uri=show-values/fieldvalues-metadata/_search, method=POST}" + } + } +}