Skip to content

Commit

Permalink
added span size gauge
Browse files Browse the repository at this point in the history
  • Loading branch information
mamunto committed Nov 21, 2024
1 parent d57d5ab commit 6e92546
Show file tree
Hide file tree
Showing 6 changed files with 81 additions and 66 deletions.
30 changes: 7 additions & 23 deletions Sources/OpenTelemetryApi/Metrics/Stable/DefaultStableMeter.swift
Original file line number Diff line number Diff line change
Expand Up @@ -61,14 +61,6 @@ public class DefaultStableMeter : StableMeter {
}

private class NoopLongGaugeBuilder : LongGaugeBuilder {
func setUnit(_ unit: String) -> any LongGaugeBuilder {
self
}

func setDescription(_ description: String) -> any LongGaugeBuilder {
self
}

func buildWithCallback(_ callback: @escaping (ObservableLongMeasurement) -> Void) -> ObservableLongGauge {
NoopObservableLongGauge()
}
Expand Down Expand Up @@ -132,25 +124,17 @@ public class DefaultStableMeter : StableMeter {
}

private class NoopLongCounterBuilder : LongCounterBuilder {
func setUnit(_ unit: String) -> any LongCounterBuilder {
self
func ofDoubles() -> DoubleCounterBuilder {
NoopDoubleCounterBuilder()
}

func setDescription(_ description: String) -> any LongCounterBuilder {
self
func build() -> LongCounter {
NoopLongCounter()
}

func ofDoubles() -> DoubleCounterBuilder {
NoopDoubleCounterBuilder()
}

func build() -> LongCounter {
NoopLongCounter()
}

func buildWithCallback(_ callback: @escaping (ObservableLongMeasurement) -> Void) -> ObservableLongCounter {
NoopObservableLongCounter()
}
func buildWithCallback(_ callback: @escaping (ObservableLongMeasurement) -> Void) -> ObservableLongCounter {
NoopObservableLongCounter()
}
}

private class NoopDoubleCounterBuilder : DoubleCounterBuilder {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@
import Foundation

public protocol LongCounterBuilder : AnyObject {
func setUnit(_ unit: String) -> LongCounterBuilder
func setDescription(_ description: String) -> LongCounterBuilder
func ofDoubles() -> DoubleCounterBuilder
func build() -> LongCounter
func buildWithCallback(_ callback: @escaping (ObservableLongMeasurement) -> Void) -> ObservableLongCounter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,5 @@
import Foundation

public protocol LongGaugeBuilder : AnyObject {
func setUnit(_ unit: String) -> LongGaugeBuilder
func setDescription(_ description: String) -> LongGaugeBuilder
func buildWithCallback(_ callback: @escaping (ObservableLongMeasurement) -> Void) -> ObservableLongGauge
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,4 @@ public class LongCounterMeterBuilderSdk: LongCounterBuilder, InstrumentBuilder {
-> OpenTelemetryApi.ObservableLongCounter {
registerLongAsynchronousInstrument(type: .observableCounter, updater: callback)
}

public func setDescription(_ description: String) -> LongCounterBuilder {
self.description = description
return self
}

public func setUnit(_ unit: String) -> LongCounterBuilder {
self.unit = unit
return self
}
}
10 changes: 0 additions & 10 deletions Sources/OpenTelemetrySdk/Metrics/Stable/LongGaugeBuilderSdk.swift
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,4 @@ public class LongGaugeBuilderSdk : LongGaugeBuilder, InstrumentBuilder {
public func buildWithCallback(_ callback: @escaping (OpenTelemetryApi.ObservableLongMeasurement) -> Void) -> OpenTelemetryApi.ObservableLongGauge {
registerLongAsynchronousInstrument(type: type, updater: callback)
}

public func setDescription(_ description: String) -> LongGaugeBuilder {
self.description = description
return self
}

public func setUnit(_ unit: String) -> LongGaugeBuilder {
self.unit = unit
return self
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,12 @@ private class BatchWorker: Thread {
var queue: OperationQueue

private var queueSizeGauge: ObservableLongGauge?
private var processedSpansCounter: LongCounter
private var spanGaugeObserver: ObservableLongGauge?

private var processedSpansCounter: LongCounter?
private let droppedAttrs: [String: AttributeValue]
private let exportedAttrs: [String: AttributeValue]
private let spanGaugeBuilder: LongGaugeBuilder
init(
spanExporter: SpanExporter,
meterProvider: StableMeterProvider,
Expand All @@ -110,23 +113,33 @@ private class BatchWorker: Thread {
queue.maxConcurrentOperationCount = 1

let meter = meterProvider.meterBuilder(name: "io.opentelemetry.sdk.trace").build()
self.queueSizeGauge = meter.gaugeBuilder(name: "queueSize")
do {
self.queueSizeGauge = try meter.gaugeBuilder(name: "queueSize")
.ofLongs()
.asLongSdk()
.setDescription("The number of items queued")
.setUnit("1")
.buildWithCallback { result in
result.record(
value: maxQueueSize,
attributes: [
BatchSpanProcessor.SPAN_PROCESSOR_TYPE_LABEL: .string(BatchSpanProcessor.SPAN_PROCESSOR_TYPE_VALUE)
]
)
}
} catch {}

self.spanGaugeBuilder = meter.gaugeBuilder(name: "spanSize")
.ofLongs()
.setDescription("The number of items queued")
.setUnit("1")
.buildWithCallback { result in
result.record(
value: maxQueueSize,
attributes: [
BatchSpanProcessor.SPAN_PROCESSOR_TYPE_LABEL: .string(BatchSpanProcessor.SPAN_PROCESSOR_TYPE_VALUE)
]
)
}

processedSpansCounter = meter.counterBuilder(name: "processedSpans")
.setUnit("1")
.setDescription("The number of spans processed by the BatchSpanProcessor. [dropped=true if they were dropped due to high throughput]")
.build()
do {
processedSpansCounter = try meter.counterBuilder(name: "processedSpans")
.asLongSdk()
.setUnit("1")
.setDescription("The number of spans processed by the BatchSpanProcessor. [dropped=true if they were dropped due to high throughput]")
.build()
} catch {}

droppedAttrs = [
BatchSpanProcessor.SPAN_PROCESSOR_TYPE_LABEL: .string(BatchSpanProcessor.SPAN_PROCESSOR_TYPE_VALUE),
BatchSpanProcessor.SPAN_PROCESSOR_DROPPED_LABEL: .bool(true)
Expand All @@ -137,16 +150,35 @@ private class BatchWorker: Thread {
]
}

deinit {
// Cleanup all gauge observer
self.queueSizeGauge?.close()
self.spanGaugeObserver?.close()
}

func addSpan(span: ReadableSpan) {
cond.lock()
defer { cond.unlock() }

if spanList.count == maxQueueSize {
processedSpansCounter.add(value: 1, attribute: droppedAttrs)
processedSpansCounter?.add(value: 1, attribute: droppedAttrs)
return
}
// TODO: Record a gauge for referenced spans.
spanList.append(span)

// If there is any observer before, let's close it
self.spanGaugeObserver?.close()
// Subscribe to new gauge observer
self.spanGaugeObserver = self.spanGaugeBuilder
.buildWithCallback { [count = spanList.count] result in
result.record(
value: count,
attributes: [
BatchSpanProcessor.SPAN_PROCESSOR_TYPE_LABEL: .string(BatchSpanProcessor.SPAN_PROCESSOR_TYPE_VALUE)
]
)
}

// Notify the worker thread that at half of the queue is available. It will take
// time anyway for the thread to wake up.
if spanList.count >= halfMaxQueueSize {
Expand Down Expand Up @@ -211,9 +243,32 @@ private class BatchWorker: Thread {
let result = spanExporter.export(spans: spansToExport, explicitTimeout: explicitTimeout)
if result == .success {
cond.lock()
processedSpansCounter.add(value: spanList.count, attribute: exportedAttrs)
processedSpansCounter?.add(value: spanList.count, attribute: exportedAttrs)
cond.unlock()
}
}
}
}

public enum GuageError: Error {
case invalidType
}

/// Helper function to handle
extension LongGaugeBuilder {
public func asLongSdk() throws -> LongGaugeBuilderSdk {
guard let sdkType = self as? LongGaugeBuilderSdk else {
throw GuageError.invalidType
}
return sdkType
}
}

extension LongCounterBuilder {
public func asLongSdk() throws -> LongCounterMeterBuilderSdk {
guard let sdkType = self as? LongCounterMeterBuilderSdk else {
throw GuageError.invalidType
}
return sdkType
}
}

0 comments on commit 6e92546

Please sign in to comment.