Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use dedicated thread for Metrics PeriodicReader #2142

Open
wants to merge 48 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 38 commits
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
e5678e8
Use dedicated thread for metrics processing
cijothomas Sep 24, 2024
f2eff31
Merge branch 'main' into cijothomas/periodicreader
cijothomas Sep 24, 2024
91f7c2e
add comment on shutdown
cijothomas Sep 25, 2024
aedbd2c
pass timeout to exporters
cijothomas Oct 4, 2024
4198792
named thread and pass timeout to export
cijothomas Oct 4, 2024
966c9a6
test fixes
cijothomas Oct 4, 2024
8f71c7b
Add tests for periodic reader from various RT combinations (#2147)
cijothomas Sep 25, 2024
63728cc
Update doc comments (#2149)
utpilla Sep 25, 2024
c85ae55
Use weaver for semantic convention codegen (#2098)
lquerel Sep 26, 2024
d2c4ef5
Fix CI - Regenerate proto files, and selective msrv-patching (#2159)
lalitb Sep 30, 2024
6be205b
For Delta Temporarlity, avoid exporting when no new measurements are …
cijothomas Sep 30, 2024
7182be1
Improve internal opentelemetry logging - directly using tracing mcros…
lalitb Sep 30, 2024
5003dd3
MeterProvider modified to not do shutdown if user has already shut do…
cijothomas Sep 30, 2024
29ec004
Minor cleanups in Metrics module (#2155)
cijothomas Sep 30, 2024
ea2b9a7
Prepare 0.26.0 release, update Metrics API to Beta (#2160)
cijothomas Oct 1, 2024
ea0d788
Nit fixes to bug template (#2161)
cijothomas Oct 1, 2024
b96ab43
Metrics - Instrument Name Validation fixes (#2166)
cijothomas Oct 2, 2024
9eaba05
Improve internal opentelemetry logging (#2128)
lalitb Oct 4, 2024
e1e993d
Remove comments that are not relevant anymore (#2171)
cijothomas Oct 4, 2024
f17cde9
Docs: Remove unnecessary indentation for better readability (#2174)
moshensky Oct 4, 2024
10e2df8
Merge branch 'main' into cijothomas/periodicreader
cijothomas Oct 4, 2024
2b562b6
bring new change back
cijothomas Oct 4, 2024
020fb03
add internal logs liberally for now
cijothomas Oct 4, 2024
018cfde
fix otlp exporter for metric to not need rt
cijothomas Oct 4, 2024
a5b7de2
fix fmt
cijothomas Oct 4, 2024
58a3ba1
clenaups
cijothomas Oct 5, 2024
30b09a4
let more sleep for CI
cijothomas Oct 5, 2024
e51bca6
tests from tokio rt
cijothomas Oct 5, 2024
bc2aea2
test refactor
cijothomas Oct 5, 2024
218d9bb
make sync
cijothomas Oct 5, 2024
b78a3f3
small fix
cijothomas Oct 5, 2024
76899f2
fix stdout
cijothomas Oct 5, 2024
9e3be6a
inmeory
cijothomas Oct 5, 2024
ec4384d
continue on error for ci
cijothomas Oct 5, 2024
1e98f8c
remove sync
cijothomas Oct 5, 2024
75b4006
reomve ynsc
cijothomas Oct 5, 2024
84fb885
use mutex for messag
cijothomas Oct 6, 2024
3a12310
test for exporter failures
cijothomas Oct 7, 2024
b231d19
Merge branch 'main' into cijothomas/periodicreader
cijothomas Oct 7, 2024
6cc1428
fix clippy
cijothomas Oct 7, 2024
75fecfe
add async callbacks test
cijothomas Oct 7, 2024
883b337
fix clippy
cijothomas Oct 7, 2024
5a5eea4
reveiw comment
cijothomas Oct 7, 2024
9181b09
Merge branch 'main' into cijothomas/periodicreader
cijothomas Oct 7, 2024
fb26138
log interval in milliseconds
cijothomas Oct 7, 2024
f8b6c74
Merge branch 'main' into cijothomas/periodicreader
cijothomas Oct 7, 2024
c3f6931
Merge branch 'main' into cijothomas/periodicreader
cijothomas Oct 8, 2024
e0d7126
Merge branch 'main' into cijothomas/periodicreader
cijothomas Oct 8, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ jobs:
os: [windows-latest, ubuntu-latest]
rust: [1.65.0, 1.71.1]
runs-on: ${{ matrix.os }}
continue-on-error: true
cijothomas marked this conversation as resolved.
Show resolved Hide resolved
steps:
- uses: actions/checkout@v4
with:
Expand Down
4 changes: 2 additions & 2 deletions examples/metrics-advanced/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use opentelemetry::KeyValue;
use opentelemetry_sdk::metrics::{
Aggregation, Instrument, PeriodicReader, SdkMeterProvider, Stream,
};
use opentelemetry_sdk::{runtime, Resource};
use opentelemetry_sdk::Resource;
use std::error::Error;

fn init_meter_provider() -> opentelemetry_sdk::metrics::SdkMeterProvider {
Expand Down Expand Up @@ -45,7 +45,7 @@ fn init_meter_provider() -> opentelemetry_sdk::metrics::SdkMeterProvider {
};

let exporter = opentelemetry_stdout::MetricsExporterBuilder::default().build();
let reader = PeriodicReader::builder(exporter, runtime::Tokio).build();
let reader = PeriodicReader::builder(exporter).build();
let provider = SdkMeterProvider::builder()
.with_reader(reader)
.with_resource(Resource::new([KeyValue::new(
Expand Down
1 change: 1 addition & 0 deletions examples/metrics-basic/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ opentelemetry_sdk = { path = "../../opentelemetry-sdk", features = ["metrics", "
opentelemetry-stdout = { path = "../../opentelemetry-stdout", features = ["metrics"]}
tokio = { workspace = true, features = ["full"] }
serde_json = { workspace = true }
tracing-subscriber = { version = "0.3.18", features = ["env-filter","registry", "std"]}

[features]
default = ["otel_unstable"]
Expand Down
17 changes: 15 additions & 2 deletions examples/metrics-basic/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
use opentelemetry::global;
use opentelemetry::KeyValue;
use opentelemetry_sdk::metrics::{PeriodicReader, SdkMeterProvider};
use opentelemetry_sdk::{runtime, Resource};
use opentelemetry_sdk::Resource;
use std::error::Error;
use std::vec;
use tracing_subscriber::filter::LevelFilter;
use tracing_subscriber::fmt;
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt;
use tracing_subscriber::Layer;

fn init_meter_provider() -> opentelemetry_sdk::metrics::SdkMeterProvider {
let exporter = opentelemetry_stdout::MetricsExporterBuilder::default().build();
let reader = PeriodicReader::builder(exporter, runtime::Tokio).build();
let reader = PeriodicReader::builder(exporter).build();
let provider = SdkMeterProvider::builder()
.with_reader(reader)
.with_resource(Resource::new([KeyValue::new(
Expand All @@ -21,6 +26,14 @@ fn init_meter_provider() -> opentelemetry_sdk::metrics::SdkMeterProvider {

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
tracing_subscriber::registry()
.with(
fmt::layer()
.with_thread_names(true)
.with_filter(LevelFilter::DEBUG),
)
.init();

// Initialize the MeterProvider with the stdout Exporter.
let meter_provider = init_meter_provider();

Expand Down
2 changes: 1 addition & 1 deletion examples/self-diagnostics/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ fn init_logger_provider() -> opentelemetry_sdk::logs::LoggerProvider {

fn init_meter_provider() -> opentelemetry_sdk::metrics::SdkMeterProvider {
let provider = opentelemetry_otlp::new_pipeline()
.metrics(opentelemetry_sdk::runtime::Tokio)
.metrics()
.with_period(std::time::Duration::from_secs(1))
.with_exporter(
opentelemetry_otlp::new_exporter()
Expand Down
2 changes: 1 addition & 1 deletion opentelemetry-otlp/examples/basic-otlp-http/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ fn init_tracer_provider() -> Result<sdktrace::TracerProvider, TraceError> {

fn init_metrics() -> Result<opentelemetry_sdk::metrics::SdkMeterProvider, MetricsError> {
opentelemetry_otlp::new_pipeline()
.metrics(opentelemetry_sdk::runtime::Tokio)
.metrics()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have you verified that this still works? I believe reqwest relies on Tokio. Now that the outgoing Http call through reqwest is being made on a background thread without Tokio (or any runtime), it might be problematic. Looking to ensure that we don't hit this issue in particular:

there is no reactor running, must be called from the context of Tokio runtime

Related to #248, #2137

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Excellent point!
Based on offline discussion, yes this appear to cause issues when using both reqwest and hyper. These libraries seem to have a strong requirement that it cannot work unless they are inside tokio runtime.
tonic seem to work fine without issues.

Will check if there are ways to work around the http library limitations.

Copy link
Member

@lalitb lalitb Oct 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will check if there are ways to work around the http library limitations.

I did a quick test with Simple Log Processor + OTLP exporter with request::blocking::Client - this works without need of tokio runtime. This should also work with background thread for batch in that case. Enforcing this for background thread could be one option.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will check if there are ways to work around the http library limitations.

I did a quick test with Simple Log Processor + OTLP exporter with request::blocking::Client - this works without need of tokio runtime. This should also work with background thread for batch in that case. Enforcing this for background thread could be one option.

Yes. But there is no blocking version for libraries like tonic, effectively limiting exporting to only support http with reqwest::blockingClient.
Will need to redesign after doing a comparison of all feasible options.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tonic seem to work fine without issues.

This maybe incorrect, as it may have worked during shutdown only. Apologies for the confusion!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, tonic is tricky, there is no alternative either.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lalitb I have another correction to make about tonic, based on more testing:
The changes in this PR works fine with tonic, as long as the main function of the app is a tokio one. Yes the tonic::export call is made from our background thread, still this works.
If the main function is not a tokio main, then the app panics at meterprovider build itself. It looks like we attempt to create a grpc channel at build(), and that fails due to lack of tokio runtime.

Copy link
Member

@lalitb lalitb Oct 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The changes in this PR works fine with tonic, as long as the main function of the app is a tokio one. Yes the tonic::export call is made from our background thread, still this works.

Interesting, I was thinking the spawned thread (std::thread::spawn) doesn't run in the tokio runtime and so will not have access to tokio runtime. thanks for confirming.

Trying to summarize the scenarios:

These work:

 tokio::main -> background-thread -> gRPC (tonic), HTTP (hyper, reqwest, reqwest-blocking) 
 tokio::main -> simple-exporter -> gRPC (tonic), HTTP (hyper, reqwest, reqwest-blocking)  (assuming we do filtering to avoid infinite loop).
 tokio::main -> simple-exporter -> HTTP (reqwest-blocking) 
 main -> simple-exporter -> HTTP (reqwest-blocking)

And these doesn't work:

 main -> background-thread -> gRPC (tonic), HTTP (hyper, reqwest, reqwest-blocking) 
 tokio::main(current_thread) -> simple-exporter -> gRPC (tonic), HTTP (hyper, reqwest, reqwest-blocking) # hangs

how about this ?

tokio::main(current_thread) -> background-thread -> gRPC (tonic), HTTP (hyper, reqwest, reqwest-blocking) 

.with_exporter(
http_exporter()
.with_protocol(Protocol::HttpBinary) //can be changed to `Protocol::HttpJson` to export in JSON format
Expand Down
2 changes: 1 addition & 1 deletion opentelemetry-otlp/examples/basic-otlp/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ fn init_metrics() -> Result<opentelemetry_sdk::metrics::SdkMeterProvider, Metric
..ExportConfig::default()
};
opentelemetry_otlp::new_pipeline()
.metrics(runtime::Tokio)
.metrics()
.with_exporter(
opentelemetry_otlp::new_exporter()
.tonic()
Expand Down
2 changes: 1 addition & 1 deletion opentelemetry-otlp/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@
//! };
//!
//! let meter = opentelemetry_otlp::new_pipeline()
//! .metrics(opentelemetry_sdk::runtime::Tokio)
//! .metrics()
//! .with_exporter(
//! opentelemetry_otlp::new_exporter()
//! .tonic()
Expand Down
40 changes: 14 additions & 26 deletions opentelemetry-otlp/src/metric.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@ use opentelemetry_sdk::{
reader::{DefaultTemporalitySelector, TemporalitySelector},
InstrumentKind, PeriodicReader, SdkMeterProvider,
},
runtime::Runtime,
Resource,
};
use std::fmt::{Debug, Formatter};
use std::time;
use std::{
fmt::{Debug, Formatter},
time::Duration,
};

#[cfg(feature = "http-proto")]
use crate::exporter::http::HttpExporterBuilder;
Expand All @@ -41,12 +43,8 @@ pub const OTEL_EXPORTER_OTLP_METRICS_COMPRESSION: &str = "OTEL_EXPORTER_OTLP_MET
pub const OTEL_EXPORTER_OTLP_METRICS_HEADERS: &str = "OTEL_EXPORTER_OTLP_METRICS_HEADERS";
impl OtlpPipeline {
/// Create a OTLP metrics pipeline.
pub fn metrics<RT>(self, rt: RT) -> OtlpMetricPipeline<RT, NoExporterConfig>
where
RT: Runtime,
{
pub fn metrics(self) -> OtlpMetricPipeline<NoExporterConfig> {
OtlpMetricPipeline {
rt,
temporality_selector: None,
exporter_pipeline: NoExporterConfig(()),
resource: None,
Expand Down Expand Up @@ -117,19 +115,15 @@ impl From<HttpExporterBuilder> for MetricsExporterBuilder {
///
/// Note that currently the OTLP metrics exporter only supports tonic as it's grpc layer and tokio as
/// runtime.
pub struct OtlpMetricPipeline<RT, EB> {
rt: RT,
pub struct OtlpMetricPipeline<EB> {
temporality_selector: Option<Box<dyn TemporalitySelector>>,
exporter_pipeline: EB,
resource: Option<Resource>,
period: Option<time::Duration>,
timeout: Option<time::Duration>,
}

impl<RT, EB> OtlpMetricPipeline<RT, EB>
where
RT: Runtime,
{
impl<EB> OtlpMetricPipeline<EB> {
/// Build with resource key value pairs.
pub fn with_resource(self, resource: Resource) -> Self {
OtlpMetricPipeline {
Expand Down Expand Up @@ -173,18 +167,14 @@ where
}
}

impl<RT> OtlpMetricPipeline<RT, NoExporterConfig>
where
RT: Runtime,
{
impl OtlpMetricPipeline<NoExporterConfig> {
/// Build with the exporter
pub fn with_exporter<B: Into<MetricsExporterBuilder>>(
self,
pipeline: B,
) -> OtlpMetricPipeline<RT, MetricsExporterBuilder> {
) -> OtlpMetricPipeline<MetricsExporterBuilder> {
OtlpMetricPipeline {
exporter_pipeline: pipeline.into(),
rt: self.rt,
temporality_selector: self.temporality_selector,
resource: self.resource,
period: self.period,
Expand All @@ -193,18 +183,15 @@ where
}
}

impl<RT> OtlpMetricPipeline<RT, MetricsExporterBuilder>
where
RT: Runtime,
{
impl OtlpMetricPipeline<MetricsExporterBuilder> {
/// Build MeterProvider
pub fn build(self) -> Result<SdkMeterProvider> {
let exporter = self.exporter_pipeline.build_metrics_exporter(
self.temporality_selector
.unwrap_or_else(|| Box::new(DefaultTemporalitySelector::new())),
)?;

let mut builder = PeriodicReader::builder(exporter, self.rt);
let mut builder = PeriodicReader::builder(exporter);

if let Some(period) = self.period {
builder = builder.with_interval(period);
Expand All @@ -226,7 +213,7 @@ where
}
}

impl<RT, EB: Debug> Debug for OtlpMetricPipeline<RT, EB> {
impl<EB: Debug> Debug for OtlpMetricPipeline<EB> {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
f.debug_struct("OtlpMetricPipeline")
.field("exporter_pipeline", &self.exporter_pipeline)
Expand Down Expand Up @@ -293,7 +280,8 @@ impl TemporalitySelector for MetricsExporter {

#[async_trait]
impl PushMetricsExporter for MetricsExporter {
async fn export(&self, metrics: &mut ResourceMetrics) -> Result<()> {
async fn export(&self, metrics: &mut ResourceMetrics, _timeout: Duration) -> Result<()> {
//TODO: Pass timeout to client
self.client.export(metrics).await
}

Expand Down
4 changes: 3 additions & 1 deletion opentelemetry-sdk/src/metrics/exporter.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
//! Interfaces for exporting metrics
use std::time::Duration;

use async_trait::async_trait;

use opentelemetry::metrics::Result;
Expand All @@ -16,7 +18,7 @@ pub trait PushMetricsExporter: TemporalitySelector + Send + Sync + 'static {
/// implement any retry logic. All errors returned by this function are
/// considered unrecoverable and will be reported to a configured error
/// Handler.
async fn export(&self, metrics: &mut ResourceMetrics) -> Result<()>;
async fn export(&self, metrics: &mut ResourceMetrics, timeout: Duration) -> Result<()>;

/// Flushes any metric data held by an exporter.
async fn force_flush(&self) -> Result<()>;
Expand Down
16 changes: 8 additions & 8 deletions opentelemetry-sdk/src/metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,8 @@ mod tests {
use super::*;
use crate::metrics::data::{ResourceMetrics, Temporality};
use crate::metrics::reader::TemporalitySelector;
use crate::testing::metrics::InMemoryMetricsExporter;
use crate::testing::metrics::InMemoryMetricsExporterBuilder;
use crate::{runtime, testing::metrics::InMemoryMetricsExporter};
use opentelemetry::metrics::{Counter, Meter, UpDownCounter};
use opentelemetry::{metrics::MeterProvider as _, KeyValue};
use rand::{rngs, Rng, SeedableRng};
Expand Down Expand Up @@ -491,7 +491,7 @@ mod tests {
async fn counter_duplicate_instrument_merge() {
// Arrange
let exporter = InMemoryMetricsExporter::default();
let reader = PeriodicReader::builder(exporter.clone(), runtime::Tokio).build();
let reader = PeriodicReader::builder(exporter.clone()).build();
let meter_provider = SdkMeterProvider::builder().with_reader(reader).build();

// Act
Expand Down Expand Up @@ -542,7 +542,7 @@ mod tests {
async fn counter_duplicate_instrument_different_meter_no_merge() {
// Arrange
let exporter = InMemoryMetricsExporter::default();
let reader = PeriodicReader::builder(exporter.clone(), runtime::Tokio).build();
let reader = PeriodicReader::builder(exporter.clone()).build();
let meter_provider = SdkMeterProvider::builder().with_reader(reader).build();

// Act
Expand Down Expand Up @@ -631,7 +631,7 @@ mod tests {
async fn instrumentation_scope_identity_test() {
// Arrange
let exporter = InMemoryMetricsExporter::default();
let reader = PeriodicReader::builder(exporter.clone(), runtime::Tokio).build();
let reader = PeriodicReader::builder(exporter.clone()).build();
let meter_provider = SdkMeterProvider::builder().with_reader(reader).build();

// Act
Expand Down Expand Up @@ -714,7 +714,7 @@ mod tests {

// Arrange
let exporter = InMemoryMetricsExporter::default();
let reader = PeriodicReader::builder(exporter.clone(), runtime::Tokio).build();
let reader = PeriodicReader::builder(exporter.clone()).build();
let criteria = Instrument::new().name("test_histogram");
let stream_invalid_aggregation = Stream::new()
.aggregation(Aggregation::ExplicitBucketHistogram {
Expand Down Expand Up @@ -764,7 +764,7 @@ mod tests {

// Arrange
let exporter = InMemoryMetricsExporter::default();
let reader = PeriodicReader::builder(exporter.clone(), runtime::Tokio).build();
let reader = PeriodicReader::builder(exporter.clone()).build();
let criteria = Instrument::new().name("my_observable_counter");
// View drops all attributes.
let stream_invalid_aggregation = Stream::new().allowed_attribute_keys(vec![]);
Expand Down Expand Up @@ -839,7 +839,7 @@ mod tests {

// Arrange
let exporter = InMemoryMetricsExporter::default();
let reader = PeriodicReader::builder(exporter.clone(), runtime::Tokio).build();
let reader = PeriodicReader::builder(exporter.clone()).build();
let criteria = Instrument::new().name("my_counter");
// View drops all attributes.
let stream_invalid_aggregation = Stream::new().allowed_attribute_keys(vec![]);
Expand Down Expand Up @@ -2373,7 +2373,7 @@ mod tests {
exporter = exporter.with_temporality_selector(TestTemporalitySelector(temporality));

let exporter = exporter.build();
let reader = PeriodicReader::builder(exporter.clone(), runtime::Tokio).build();
let reader = PeriodicReader::builder(exporter.clone()).build();
let meter_provider = SdkMeterProvider::builder().with_reader(reader).build();

TestContext {
Expand Down
Loading
Loading