Skip to content

Commit 9794ebd

Browse files
authored
Merge pull request #7 from schultyy/metrics
Add service metrics
2 parents 1c504b5 + d066f16 commit 9794ebd

File tree

5 files changed

+170
-6
lines changed

5 files changed

+170
-6
lines changed

Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,13 @@ tracing-subscriber = { version = "0.3.19", features = ["env-filter", "json"] }
1212
clap = { version = "4.4", features = ["derive"] }
1313
fake = { version = "4", features = ["derive"] }
1414
tracing-opentelemetry = "0.29.0"
15-
opentelemetry = "0.29.0"
15+
opentelemetry = { version = "0.29.0", features = ["metrics"] }
1616
opentelemetry-otlp = { version = "0.29.0", features = ["tonic", "grpc-tonic"] }
1717
opentelemetry_sdk = { version = "0.29.0", features = [
1818
"rt-tokio",
1919
"tokio",
2020
"opentelemetry-http",
21+
"metrics",
2122
] }
2223
tonic = "0.12.3"
2324
opentelemetry-appender-tracing = "0.29.0"

src/code_gen/instruction.rs

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,32 @@ pub const CHECK_INTERRUPT_CODE: u8 = 0x12;
8585
pub const CALL_CODE: u8 = 0x13;
8686
pub const RET_CODE: u8 = 0x14;
8787

88+
pub fn code_to_name(code: u8) -> String {
89+
match code {
90+
PUSH_STRING_CODE => "PushString".to_string(),
91+
PUSH_INT_CODE => "PushInt".to_string(),
92+
POP_CODE => "Pop".to_string(),
93+
DEC_CODE => "Dec".to_string(),
94+
JMP_IF_ZERO_CODE => "JmpIfZero".to_string(),
95+
LABEL_CODE => "Label".to_string(),
96+
STDOUT_CODE => "Stdout".to_string(),
97+
STDERR_CODE => "Stderr".to_string(),
98+
SLEEP_CODE => "Sleep".to_string(),
99+
STORE_VAR_CODE => "StoreVar".to_string(),
100+
LOAD_VAR_CODE => "LoadVar".to_string(),
101+
DUP_CODE => "Dup".to_string(),
102+
JUMP_CODE => "Jump".to_string(),
103+
PRINTF_CODE => "Printf".to_string(),
104+
REMOTE_CALL_CODE => "RemoteCall".to_string(),
105+
START_CONTEXT_CODE => "StartContext".to_string(),
106+
END_CONTEXT_CODE => "EndContext".to_string(),
107+
CHECK_INTERRUPT_CODE => "CheckInterrupt".to_string(),
108+
CALL_CODE => "Call".to_string(),
109+
RET_CODE => "Ret".to_string(),
110+
_ => "Unknown".to_string(),
111+
}
112+
}
113+
88114
impl Instruction {
89115
pub fn code(&self) -> u8 {
90116
match self {

src/main.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -130,11 +130,14 @@ async fn execute_service(
130130
let tracer = vm::setup_tracer(&otel_endpoint, &service_name)
131131
.map_err(|e| RuntimeError::InitTraceError(e))?;
132132

133+
let meter_provider = vm::init_meter_provider(Some(&otel_endpoint), &service_name)
134+
.map_err(|e| RuntimeError::InitMeterError(e))?;
135+
133136
let mut vm = vm::VM::new(service_code.clone(), &service_name, print_tx)
134137
.with_remote_call_tx(coordinator.get_main_tx().clone())
135138
.with_remote_call_rx(remote_call_rx)
136-
.with_tracer(tracer.clone());
137-
139+
.with_tracer(tracer.clone())
140+
.with_meter_provider(meter_provider);
138141
if let Some(remote_call_limit) = args.remote_call_limit {
139142
vm = vm.with_custom_remote_call_limit(remote_call_limit);
140143
}

src/runtime_error.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ pub enum RuntimeError {
77
VMError(vm::VMError),
88
ServiceError(JoinError),
99
InitTraceError(opentelemetry_otlp::ExporterBuildError),
10+
InitMeterError(opentelemetry_otlp::ExporterBuildError),
1011
}
1112

1213
impl std::error::Error for RuntimeError {}
@@ -17,6 +18,7 @@ impl std::fmt::Display for RuntimeError {
1718
RuntimeError::VMError(e) => write!(f, "VM error: {}", e),
1819
RuntimeError::ServiceError(e) => write!(f, "Service error: {}", e),
1920
RuntimeError::InitTraceError(e) => write!(f, "Init trace error: {}", e),
21+
RuntimeError::InitMeterError(e) => write!(f, "Init meter error: {}", e),
2022
}
2123
}
2224
}

src/vm.rs

Lines changed: 135 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
use std::collections::HashMap;
22

3+
use opentelemetry::metrics::Counter;
4+
use opentelemetry::metrics::Gauge;
5+
use opentelemetry::metrics::MeterProvider;
36
use opentelemetry::propagation::TextMapPropagator;
47
use opentelemetry::trace::{TraceContextExt, TracerProvider};
58
use opentelemetry::{global, KeyValue};
@@ -8,6 +11,8 @@ use opentelemetry::{
811
Context,
912
};
1013
use opentelemetry_otlp::{WithExportConfig, WithTonicConfig};
14+
use opentelemetry_sdk::metrics::SdkMeterProvider;
15+
use opentelemetry_sdk::metrics::Temporality;
1116
use opentelemetry_sdk::propagation::TraceContextPropagator;
1217
use opentelemetry_sdk::trace::SdkTracerProvider;
1318
use opentelemetry_sdk::Resource;
@@ -105,6 +110,40 @@ pub fn setup_tracer(
105110
Ok(provider)
106111
}
107112

113+
pub(crate) fn init_meter_provider(
114+
endpoint: Option<&str>,
115+
service_name: &str,
116+
) -> Result<opentelemetry_sdk::metrics::SdkMeterProvider, opentelemetry_otlp::ExporterBuildError> {
117+
let provider = if let Some(endpoint) = endpoint {
118+
let exporter = opentelemetry_otlp::MetricExporter::builder()
119+
.with_temporality(Temporality::Delta)
120+
.with_tonic()
121+
.with_endpoint(endpoint.to_string())
122+
.build()?;
123+
let resource = Resource::builder()
124+
.with_service_name(service_name.to_string())
125+
.build();
126+
127+
SdkMeterProvider::builder()
128+
.with_periodic_exporter(exporter)
129+
.with_resource(resource)
130+
.build()
131+
} else {
132+
let exporter = opentelemetry_stdout::MetricExporter::default();
133+
134+
let resource = Resource::builder()
135+
.with_service_name(service_name.to_string())
136+
.build();
137+
138+
SdkMeterProvider::builder()
139+
.with_periodic_exporter(exporter)
140+
.with_resource(resource)
141+
.build()
142+
};
143+
144+
Ok(provider)
145+
}
146+
108147
#[derive(Debug, Clone, PartialEq, Eq)]
109148
pub enum PrintMessage {
110149
Stdout(String),
@@ -130,6 +169,7 @@ pub struct VM {
130169
remote_call_limit: usize,
131170
service_name: String,
132171
tracer: Option<SdkTracerProvider>,
172+
meter_provider: SdkMeterProvider,
133173
otel_context: Option<opentelemetry::Context>,
134174
}
135175

@@ -161,7 +201,9 @@ impl VM {
161201
service_name: &str,
162202
print_tx: mpsc::Sender<PrintMessage>,
163203
) -> Self {
204+
let service_name = service_name.to_string();
164205
let (code, label_jump_map, label_index_map) = generate_bytecode(code);
206+
165207
Self {
166208
code,
167209
label_jump_map,
@@ -179,6 +221,7 @@ impl VM {
179221
service_name: service_name.to_string(),
180222
tracer: None,
181223
otel_context: None,
224+
meter_provider: init_meter_provider(None, &service_name).unwrap(),
182225
}
183226
}
184227

@@ -207,13 +250,63 @@ impl VM {
207250
self
208251
}
209252

253+
pub fn with_meter_provider(mut self, meter_provider: SdkMeterProvider) -> Self {
254+
self.meter_provider = meter_provider;
255+
self
256+
}
257+
258+
fn build_counters(
259+
&self,
260+
) -> Result<(Counter<u64>, Counter<u64>, Gauge<u64>, Gauge<u64>), VMError> {
261+
let remote_invocation_counter = self
262+
.meter_provider
263+
.meter("remote_invocation_counter")
264+
.u64_counter("remote_invocation_counter")
265+
.build()
266+
.to_owned();
267+
268+
let local_invocation_counter = self
269+
.meter_provider
270+
.meter("local_invocation_counter")
271+
.u64_counter("local_invocation_counter")
272+
.build()
273+
.to_owned();
274+
275+
let instruction_duration = self
276+
.meter_provider
277+
.meter("instruction_duration")
278+
.u64_gauge("instruction_duration")
279+
.with_unit("ms")
280+
.with_description("The duration of executing an instruction in milliseconds")
281+
.build()
282+
.to_owned();
283+
284+
let remote_call_duration = self
285+
.meter_provider
286+
.meter("remote_call_duration")
287+
.u64_gauge("remote_call_duration")
288+
.with_unit("ms")
289+
.with_description("The duration of a remote call in milliseconds")
290+
.build()
291+
.to_owned();
292+
293+
Ok((
294+
remote_invocation_counter,
295+
local_invocation_counter,
296+
instruction_duration,
297+
remote_call_duration,
298+
))
299+
}
300+
210301
pub async fn run(&mut self) -> Result<(), VMError> {
211302
let mut execution_counter = 0;
303+
let counters = self.build_counters()?;
304+
212305
while self.ip < self.code.len() {
213306
if self.ip >= self.code.len() {
214307
return Err(VMError::IPOutOfBounds(self.ip, self.code.len()));
215308
}
216-
self.execute_instruction().await?;
309+
self.execute_instruction(counters.clone()).await?;
217310
execution_counter += 1;
218311
if let Some(max_execution_counter) = self.max_execution_counter {
219312
if execution_counter > max_execution_counter {
@@ -261,8 +354,18 @@ impl VM {
261354
self.stack.last_mut().ok_or(VMError::MissingStackFrame)
262355
}
263356

264-
async fn execute_instruction(&mut self) -> Result<(), VMError> {
357+
async fn execute_instruction(
358+
&mut self,
359+
counters: (Counter<u64>, Counter<u64>, Gauge<u64>, Gauge<u64>),
360+
) -> Result<(), VMError> {
265361
let instruction = self.code[self.ip];
362+
let (
363+
remote_invocation_counter,
364+
local_invocation_counter,
365+
instruction_duration,
366+
remote_call_duration,
367+
) = counters;
368+
let start = std::time::Instant::now();
266369
match instruction {
267370
PUSH_STRING_CODE => {
268371
let (_start, end, str_len) = self.extract_length();
@@ -443,6 +546,7 @@ impl VM {
443546
self.ip += 1;
444547
}
445548
REMOTE_CALL_CODE => {
549+
let start = std::time::Instant::now();
446550
let remote_call_tx = self
447551
.remote_call_tx
448552
.as_ref()
@@ -497,6 +601,23 @@ impl VM {
497601
.await
498602
.map_err(|e| VMError::RemoteCallError(e.to_string()))?;
499603

604+
remote_invocation_counter.add(
605+
1,
606+
&[
607+
KeyValue::new("service", self.service_name.clone()),
608+
KeyValue::new("method", remote_method.to_string().clone()),
609+
],
610+
);
611+
612+
let duration = start.elapsed();
613+
let duration_ms = duration.as_millis() as u64;
614+
remote_call_duration.record(
615+
duration_ms,
616+
&[
617+
KeyValue::new("service", self.service_name.clone()),
618+
KeyValue::new("method", remote_method.to_string().clone()),
619+
],
620+
);
500621
if let Some(cx) = cx {
501622
cx.span()
502623
.set_attributes(vec![KeyValue::new("response", "OK")]);
@@ -537,7 +658,9 @@ impl VM {
537658
let (_start, end, label_len) = self.extract_length();
538659
let label = &self.code[end..end + label_len];
539660
let label = String::from_utf8(label.to_vec()).unwrap();
540-
self.handle_local_call(label).await?;
661+
self.handle_local_call(label.clone()).await?;
662+
local_invocation_counter
663+
.add(1, &[KeyValue::new("method", label.to_string().clone())]);
541664
}
542665
RET_CODE => {
543666
self.ip = self.return_addresses.pop().unwrap();
@@ -547,6 +670,15 @@ impl VM {
547670
return Err(VMError::InvalidInstruction(instruction));
548671
}
549672
}
673+
let duration = start.elapsed();
674+
let duration_ms = duration.as_millis() as u64;
675+
instruction_duration.record(
676+
duration_ms,
677+
&[KeyValue::new(
678+
"instruction",
679+
crate::code_gen::instruction::code_to_name(instruction),
680+
)],
681+
);
550682
Ok(())
551683
}
552684

0 commit comments

Comments
 (0)