diff --git a/pom.xml b/pom.xml index 00c2d6a..605efec 100644 --- a/pom.xml +++ b/pom.xml @@ -148,7 +148,29 @@ io.projectreactor.addons reactor-extra - + + + + com.google.guava + guava + 31.0.1-jre + + + com.lightstep.opentelemetry + opentelemetry-launcher + 1.5.0 + + + io.opentelemetry + opentelemetry-api + 1.7.1 + + + io.opentelemetry + opentelemetry-extension-annotations + 1.7.1 + + diff --git a/src/main/java/com/uci/utils/UtilAppConfiguration.java b/src/main/java/com/uci/utils/UtilAppConfiguration.java index 6b5de67..558e13b 100644 --- a/src/main/java/com/uci/utils/UtilAppConfiguration.java +++ b/src/main/java/com/uci/utils/UtilAppConfiguration.java @@ -12,6 +12,10 @@ import com.github.benmanes.caffeine.cache.Cache; import com.github.benmanes.caffeine.cache.Caffeine; +import com.lightstep.opentelemetry.launcher.OpenTelemetryConfiguration; + +import io.opentelemetry.api.GlobalOpenTelemetry; +import io.opentelemetry.api.trace.Tracer; @Configuration @EnableAutoConfiguration @@ -28,6 +32,21 @@ public class UtilAppConfiguration { @Value("${caffeine.cache.exprie.duration.seconds}") public Integer cacheExpireDuration; + + @Value("${opentelemetry.lightstep.service}") + private String lightstepService; + + @Value("${opentelemetry.lightstep.access.token}") + private String lightstepAccessToken; + + @Value("${opentelemetry.lightstep.end.point}") + private String lightstepEndPoint; + + @Value("${opentelemetry.lightstep.tracer}") + private String lightstepTracer; + + @Value("${opentelemetry.lightstep.tracer.version}") + private String lightstepTracerVersion; public Caffeine caffeineCacheBuilder() { return Caffeine.newBuilder() @@ -45,5 +64,17 @@ public Cache cache() { public WebClient getWebClient() { return WebClient.builder().baseUrl(CAMPAIGN_URL).defaultHeader("admin-token", CAMPAIGN_ADMIN_TOKEN).build(); } + + @Bean + public Tracer OpenTelemetryTracer() { + OpenTelemetryConfiguration.newBuilder() + .setServiceName(lightstepService) + .setAccessToken(lightstepAccessToken) + .setTracesEndpoint(lightstepEndPoint) + .install(); + Tracer tracer = GlobalOpenTelemetry + .getTracer(lightstepTracer, lightstepTracerVersion); + return tracer; + } } diff --git a/src/main/java/com/uci/utils/kafka/RecordProducer.java b/src/main/java/com/uci/utils/kafka/RecordProducer.java new file mode 100644 index 0000000..fe93c87 --- /dev/null +++ b/src/main/java/com/uci/utils/kafka/RecordProducer.java @@ -0,0 +1,63 @@ +package com.uci.utils.kafka; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.uci.utils.kafka.adapter.TextMapSetterAdapter; + +import io.fusionauth.jwt.domain.Header; +import io.opentelemetry.api.GlobalOpenTelemetry; +import io.opentelemetry.context.Context; +import io.opentelemetry.context.propagation.TextMapGetter; +import io.opentelemetry.context.propagation.TextMapSetter; +import lombok.extern.slf4j.Slf4j; + +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.header.internals.RecordHeader; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.support.SendResult; +import org.springframework.stereotype.Service; +import org.springframework.util.concurrent.ListenableFuture; +import org.springframework.util.concurrent.ListenableFutureCallback; + +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.List; + +import javax.validation.constraints.NotNull; + +@Service +@Slf4j +public class RecordProducer { + + private final KafkaTemplate producer; + + public RecordProducer(KafkaTemplate producer) { + this.producer = producer; + } + + public void send(String topic, String message, Context currentContext) { + List headers = Arrays.asList(); + ProducerRecord record = new ProducerRecord(topic, null, "", message, headers); + /* Propagate open telemetry current context by injecting it to kafka headers */ + GlobalOpenTelemetry.getPropagators().getTextMapPropagator().inject(currentContext, record.headers(), TextMapSetterAdapter.setter); +// log.info("headers:"+record.headers()); +// Context extracted = GlobalOpenTelemetry.getPropagators().getTextMapPropagator().extract(Context.current(), record.headers(), getter); +// log.info("extracted: "+extracted); + + producer + .send(record) + .addCallback(new ListenableFutureCallback>() { + @Override + public void onFailure(@NotNull Throwable throwable) { + log.error("Unable to push {} to {} topic due to {}", message, topic, throwable.getMessage()); + } + + @Override + public void onSuccess(SendResult stringStringSendResult) { + log.info("Pushed to topic {}", topic); + } + }); + } +} \ No newline at end of file diff --git a/src/main/java/com/uci/utils/kafka/adapter/TextMapGetterAdapter.java b/src/main/java/com/uci/utils/kafka/adapter/TextMapGetterAdapter.java new file mode 100644 index 0000000..6f30366 --- /dev/null +++ b/src/main/java/com/uci/utils/kafka/adapter/TextMapGetterAdapter.java @@ -0,0 +1,36 @@ +package com.uci.utils.kafka.adapter; + +import java.nio.charset.StandardCharsets; +import java.util.List; + +import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.header.Header; + +import io.opentelemetry.context.propagation.TextMapGetter; + +public class TextMapGetterAdapter { + public static TextMapGetter getter = new TextMapGetter() { + @Override + public String get(Headers headers, String key) { + Header header = headers.lastHeader(key); + if (header == null) { + return null; + } + byte[] value = header.value(); + if (value == null) { + return null; + } + return new String(value, StandardCharsets.UTF_8); + } + + @Override + public Iterable keys(Headers headers) { + List keyset = null; + headers.forEach(h -> { + String key = h.key(); + keyset.add(key); + }); + return keyset; + } + }; +} diff --git a/src/main/java/com/uci/utils/kafka/adapter/TextMapSetterAdapter.java b/src/main/java/com/uci/utils/kafka/adapter/TextMapSetterAdapter.java new file mode 100644 index 0000000..fceedb7 --- /dev/null +++ b/src/main/java/com/uci/utils/kafka/adapter/TextMapSetterAdapter.java @@ -0,0 +1,16 @@ +package com.uci.utils.kafka.adapter; + +import java.nio.charset.StandardCharsets; + +import org.apache.kafka.common.header.Headers; + +import io.opentelemetry.context.propagation.TextMapSetter; + +public class TextMapSetterAdapter { + public static TextMapSetter setter = new TextMapSetter() { + @Override + public void set(Headers headers, String key, String value) { + headers.remove(key).add(key, value.getBytes(StandardCharsets.UTF_8)); + } + }; +}