Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 57 additions & 8 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,16 +160,61 @@ impl Otel {
// Graceful shutdown that flushes any pending metrics and logs to the exporter.
info!("shutting down otel component");

if let Err(metrics_error) = self.meter_provider.force_flush() {
warn!("encountered error while flushing metrics: {metrics_error:?}");
// Use a timeout for flush/shutdown operations to prevent hanging
// when the server is unavailable
let shutdown_timeout = Duration::from_secs(10);

let flush_result = tokio::time::timeout(
shutdown_timeout,
tokio::task::spawn_blocking({
let meter_provider = self.meter_provider.clone();
move || meter_provider.force_flush()
})
).await;

match flush_result {
Err(_) => warn!("meter provider force_flush timed out"),
Ok(Err(e)) => warn!("meter provider force_flush task failed: {e:?}"),
Ok(Ok(Err(e))) => warn!("encountered error while flushing metrics: {e:?}"),
Ok(Ok(Ok(()))) => debug!("meter provider force_flush completed"),
}
if let Err(metrics_error) = self.meter_provider.shutdown() {
warn!("encountered error while shutting down meter provider: {metrics_error:?}");

let shutdown_result = tokio::time::timeout(
shutdown_timeout,
tokio::task::spawn_blocking({
let meter_provider = self.meter_provider.clone();
move || meter_provider.shutdown()
})
).await;

match shutdown_result {
Err(_) => warn!("meter provider shutdown timed out"),
Ok(Err(e)) => warn!("meter provider shutdown task failed: {e:?}"),
Ok(Ok(Err(e))) => warn!("encountered error while shutting down meter provider: {e:?}"),
Ok(Ok(Ok(()))) => debug!("meter provider shutdown completed"),
}

if let Some(logger_provider) = self.logger_provider.clone() {
logger_provider.force_flush();
let _ = logger_provider.shutdown();
let flush_result = tokio::time::timeout(
shutdown_timeout,
tokio::task::spawn_blocking({
let lp = logger_provider.clone();
move || lp.force_flush()
})
).await;

if let Err(_) | Ok(Err(_)) = flush_result {
warn!("logger provider force_flush timed out or failed");
}

let shutdown_result = tokio::time::timeout(
shutdown_timeout,
tokio::task::spawn_blocking(move || logger_provider.shutdown())
).await;

if let Err(_) | Ok(Err(_)) = shutdown_result {
warn!("logger provider shutdown timed out or failed");
}
}

}
Expand Down Expand Up @@ -243,7 +288,7 @@ impl TemporalitySelector for DeltaTemporalitySelector {
/// setup the stdout metrics writer if enabled, and initializes STATIC Metrics.
///
/// Returns the Prometheus Registry or None if Prometheus was disabled.
///
#[allow(clippy::too_many_lines)]
fn init_metrics(config: Config) -> (Option<PrometheusRegistry>, SdkMeterProvider) {
let mut keys = vec![KeyValue::new(SERVICE_NAME_KEY, config.service_name.clone())];
if let Some(resource_attributes) = config.resource_attributes {
Expand Down Expand Up @@ -333,6 +378,7 @@ fn init_metrics(config: Config) -> (Option<PrometheusRegistry>, SdkMeterProvider

let reader = PeriodicReader::builder(exporter, runtime::Tokio)
.with_interval(Duration::from_secs(export_target.interval_secs))
.with_timeout(Duration::from_secs(export_target.timeout))
.build();
meter_provider_builder = meter_provider_builder.with_reader(reader);
}
Expand All @@ -348,7 +394,9 @@ fn init_metrics(config: Config) -> (Option<PrometheusRegistry>, SdkMeterProvider
})
.build();

let reader = PeriodicReader::builder(exporter, runtime::Tokio).build();
let reader = PeriodicReader::builder(exporter, runtime::Tokio)
.with_timeout(Duration::from_secs(30))
.build();
meter_provider_builder = meter_provider_builder.with_reader(reader);
}

Expand Down Expand Up @@ -471,6 +519,7 @@ fn handle_tls(
}
});
let channel = tonic_endpoint
.connect_timeout(timeout)
.timeout(timeout)
.connect_with_connector_lazy(custom_connector);
Ok(exporter_builder.with_channel(channel))
Expand Down