From 8741755ed10862de6f2b3c0eab092073c2dd829e Mon Sep 17 00:00:00 2001 From: Surabhi Date: Fri, 3 Dec 2021 10:04:17 +0530 Subject: [PATCH 1/3] lightstep opentelemetry configuration --- pom.xml | 24 +++++++++++++- .../com/uci/utils/UtilAppConfiguration.java | 31 +++++++++++++++++++ 2 files changed, 54 insertions(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 00c2d6a..605efec 100644 --- a/pom.xml +++ b/pom.xml @@ -148,7 +148,29 @@ io.projectreactor.addons reactor-extra - + + + + com.google.guava + guava + 31.0.1-jre + + + com.lightstep.opentelemetry + opentelemetry-launcher + 1.5.0 + + + io.opentelemetry + opentelemetry-api + 1.7.1 + + + io.opentelemetry + opentelemetry-extension-annotations + 1.7.1 + + diff --git a/src/main/java/com/uci/utils/UtilAppConfiguration.java b/src/main/java/com/uci/utils/UtilAppConfiguration.java index 6b5de67..558e13b 100644 --- a/src/main/java/com/uci/utils/UtilAppConfiguration.java +++ b/src/main/java/com/uci/utils/UtilAppConfiguration.java @@ -12,6 +12,10 @@ import com.github.benmanes.caffeine.cache.Cache; import com.github.benmanes.caffeine.cache.Caffeine; +import com.lightstep.opentelemetry.launcher.OpenTelemetryConfiguration; + +import io.opentelemetry.api.GlobalOpenTelemetry; +import io.opentelemetry.api.trace.Tracer; @Configuration @EnableAutoConfiguration @@ -28,6 +32,21 @@ public class UtilAppConfiguration { @Value("${caffeine.cache.exprie.duration.seconds}") public Integer cacheExpireDuration; + + @Value("${opentelemetry.lightstep.service}") + private String lightstepService; + + @Value("${opentelemetry.lightstep.access.token}") + private String lightstepAccessToken; + + @Value("${opentelemetry.lightstep.end.point}") + private String lightstepEndPoint; + + @Value("${opentelemetry.lightstep.tracer}") + private String lightstepTracer; + + @Value("${opentelemetry.lightstep.tracer.version}") + private String lightstepTracerVersion; public Caffeine caffeineCacheBuilder() { return Caffeine.newBuilder() @@ -45,5 +64,17 @@ public Cache cache() { public WebClient getWebClient() { return WebClient.builder().baseUrl(CAMPAIGN_URL).defaultHeader("admin-token", CAMPAIGN_ADMIN_TOKEN).build(); } + + @Bean + public Tracer OpenTelemetryTracer() { + OpenTelemetryConfiguration.newBuilder() + .setServiceName(lightstepService) + .setAccessToken(lightstepAccessToken) + .setTracesEndpoint(lightstepEndPoint) + .install(); + Tracer tracer = GlobalOpenTelemetry + .getTracer(lightstepTracer, lightstepTracerVersion); + return tracer; + } } From 6e80e2b4648c4c6eb137b66be3483a8e92d4f112 Mon Sep 17 00:00:00 2001 From: Surabhi Date: Mon, 10 Jan 2022 16:01:25 +0530 Subject: [PATCH 2/3] changes --- .../com/uci/utils/kafka/SimpleProducer1.java | 77 +++++++++++++++++++ 1 file changed, 77 insertions(+) create mode 100644 src/main/java/com/uci/utils/kafka/SimpleProducer1.java diff --git a/src/main/java/com/uci/utils/kafka/SimpleProducer1.java b/src/main/java/com/uci/utils/kafka/SimpleProducer1.java new file mode 100644 index 0000000..aadadd8 --- /dev/null +++ b/src/main/java/com/uci/utils/kafka/SimpleProducer1.java @@ -0,0 +1,77 @@ +package com.uci.utils.kafka; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; + +import io.fusionauth.jwt.domain.Header; +import io.opentelemetry.api.GlobalOpenTelemetry; +import io.opentelemetry.context.Context; +import io.opentelemetry.context.propagation.TextMapSetter; +import lombok.extern.slf4j.Slf4j; + +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.header.internals.RecordHeader; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.support.SendResult; +import org.springframework.stereotype.Service; +import org.springframework.util.concurrent.ListenableFuture; +import org.springframework.util.concurrent.ListenableFutureCallback; + +import java.util.Arrays; +import java.util.List; + +import javax.validation.constraints.NotNull; + +@Service +@Slf4j +public class SimpleProducer1 { + + private final KafkaTemplate producer; + + public SimpleProducer1(KafkaTemplate producer) { + this.producer = producer; + } + + public void send(String topic, String message, Context currentContext) { + List headers = Arrays.asList(new RecordHeader("header_key", "header_value".getBytes())); + GlobalOpenTelemetry.getPropagators().getTextMapPropagator().inject(currentContext, headers, null); + log.info("headers:"+headers); + Context extracted = GlobalOpenTelemetry.getPropagators().getTextMapPropagator().extract(Context.current(), headers, null); + log.info("extracted: "+extracted); + ProducerRecord record = new ProducerRecord(topic, null, "", message, headers); + + producer + .send(record) + .addCallback(new ListenableFutureCallback>() { + @Override + public void onFailure(@NotNull Throwable throwable) { + log.error("Unable to push {} to {} topic due to {}", message, topic, throwable.getMessage()); + } + + @Override + public void onSuccess(SendResult stringStringSendResult) { + log.info("Pushed to topic {}", topic); + } + }); + } + + public void send(String topic, String message) { + List headers = Arrays.asList(new RecordHeader("header_key", "header_value".getBytes())); + ProducerRecord record = new ProducerRecord(topic, null, "", message, headers); + + producer + .send(record) + .addCallback(new ListenableFutureCallback>() { + @Override + public void onFailure(@NotNull Throwable throwable) { + log.error("Unable to push {} to {} topic due to {}", message, topic, throwable.getMessage()); + } + + @Override + public void onSuccess(SendResult stringStringSendResult) { + log.info("Pushed to topic {}", topic); + } + }); + } +} \ No newline at end of file From 025bb4011a11c56984a208fad356fc43235f8c84 Mon Sep 17 00:00:00 2001 From: Surabhi Date: Wed, 19 Jan 2022 09:59:52 +0530 Subject: [PATCH 3/3] context propagation & extract --- ...mpleProducer1.java => RecordProducer.java} | 40 ++++++------------- .../kafka/adapter/TextMapGetterAdapter.java | 36 +++++++++++++++++ .../kafka/adapter/TextMapSetterAdapter.java | 16 ++++++++ 3 files changed, 65 insertions(+), 27 deletions(-) rename src/main/java/com/uci/utils/kafka/{SimpleProducer1.java => RecordProducer.java} (58%) create mode 100644 src/main/java/com/uci/utils/kafka/adapter/TextMapGetterAdapter.java create mode 100644 src/main/java/com/uci/utils/kafka/adapter/TextMapSetterAdapter.java diff --git a/src/main/java/com/uci/utils/kafka/SimpleProducer1.java b/src/main/java/com/uci/utils/kafka/RecordProducer.java similarity index 58% rename from src/main/java/com/uci/utils/kafka/SimpleProducer1.java rename to src/main/java/com/uci/utils/kafka/RecordProducer.java index aadadd8..fe93c87 100644 --- a/src/main/java/com/uci/utils/kafka/SimpleProducer1.java +++ b/src/main/java/com/uci/utils/kafka/RecordProducer.java @@ -2,14 +2,17 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; +import com.uci.utils.kafka.adapter.TextMapSetterAdapter; import io.fusionauth.jwt.domain.Header; import io.opentelemetry.api.GlobalOpenTelemetry; import io.opentelemetry.context.Context; +import io.opentelemetry.context.propagation.TextMapGetter; import io.opentelemetry.context.propagation.TextMapSetter; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.header.internals.RecordHeader; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; @@ -18,6 +21,7 @@ import org.springframework.util.concurrent.ListenableFuture; import org.springframework.util.concurrent.ListenableFutureCallback; +import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.List; @@ -25,40 +29,22 @@ @Service @Slf4j -public class SimpleProducer1 { +public class RecordProducer { private final KafkaTemplate producer; - public SimpleProducer1(KafkaTemplate producer) { + public RecordProducer(KafkaTemplate producer) { this.producer = producer; } - - public void send(String topic, String message, Context currentContext) { - List headers = Arrays.asList(new RecordHeader("header_key", "header_value".getBytes())); - GlobalOpenTelemetry.getPropagators().getTextMapPropagator().inject(currentContext, headers, null); - log.info("headers:"+headers); - Context extracted = GlobalOpenTelemetry.getPropagators().getTextMapPropagator().extract(Context.current(), headers, null); - log.info("extracted: "+extracted); - ProducerRecord record = new ProducerRecord(topic, null, "", message, headers); - - producer - .send(record) - .addCallback(new ListenableFutureCallback>() { - @Override - public void onFailure(@NotNull Throwable throwable) { - log.error("Unable to push {} to {} topic due to {}", message, topic, throwable.getMessage()); - } - - @Override - public void onSuccess(SendResult stringStringSendResult) { - log.info("Pushed to topic {}", topic); - } - }); - } - public void send(String topic, String message) { - List headers = Arrays.asList(new RecordHeader("header_key", "header_value".getBytes())); + public void send(String topic, String message, Context currentContext) { + List headers = Arrays.asList(); ProducerRecord record = new ProducerRecord(topic, null, "", message, headers); + /* Propagate open telemetry current context by injecting it to kafka headers */ + GlobalOpenTelemetry.getPropagators().getTextMapPropagator().inject(currentContext, record.headers(), TextMapSetterAdapter.setter); +// log.info("headers:"+record.headers()); +// Context extracted = GlobalOpenTelemetry.getPropagators().getTextMapPropagator().extract(Context.current(), record.headers(), getter); +// log.info("extracted: "+extracted); producer .send(record) diff --git a/src/main/java/com/uci/utils/kafka/adapter/TextMapGetterAdapter.java b/src/main/java/com/uci/utils/kafka/adapter/TextMapGetterAdapter.java new file mode 100644 index 0000000..6f30366 --- /dev/null +++ b/src/main/java/com/uci/utils/kafka/adapter/TextMapGetterAdapter.java @@ -0,0 +1,36 @@ +package com.uci.utils.kafka.adapter; + +import java.nio.charset.StandardCharsets; +import java.util.List; + +import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.header.Header; + +import io.opentelemetry.context.propagation.TextMapGetter; + +public class TextMapGetterAdapter { + public static TextMapGetter getter = new TextMapGetter() { + @Override + public String get(Headers headers, String key) { + Header header = headers.lastHeader(key); + if (header == null) { + return null; + } + byte[] value = header.value(); + if (value == null) { + return null; + } + return new String(value, StandardCharsets.UTF_8); + } + + @Override + public Iterable keys(Headers headers) { + List keyset = null; + headers.forEach(h -> { + String key = h.key(); + keyset.add(key); + }); + return keyset; + } + }; +} diff --git a/src/main/java/com/uci/utils/kafka/adapter/TextMapSetterAdapter.java b/src/main/java/com/uci/utils/kafka/adapter/TextMapSetterAdapter.java new file mode 100644 index 0000000..fceedb7 --- /dev/null +++ b/src/main/java/com/uci/utils/kafka/adapter/TextMapSetterAdapter.java @@ -0,0 +1,16 @@ +package com.uci.utils.kafka.adapter; + +import java.nio.charset.StandardCharsets; + +import org.apache.kafka.common.header.Headers; + +import io.opentelemetry.context.propagation.TextMapSetter; + +public class TextMapSetterAdapter { + public static TextMapSetter setter = new TextMapSetter() { + @Override + public void set(Headers headers, String key, String value) { + headers.remove(key).add(key, value.getBytes(StandardCharsets.UTF_8)); + } + }; +}