Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added BatchSpanProcessor metrics on drop and export span #635

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,15 @@ public class DefaultStableMeter : StableMeter {
}
}

private class NoopLongGaugeBuilder : LongGaugeBuilder {
private class NoopLongGaugeBuilder : LongGaugeBuilder {
func setUnit(_ unit: String) -> any LongGaugeBuilder {
self
}

func setDescription(_ description: String) -> any LongGaugeBuilder {
self
}

func buildWithCallback(_ callback: @escaping (ObservableLongMeasurement) -> Void) -> ObservableLongGauge {
NoopObservableLongGauge()
}
Expand Down Expand Up @@ -123,7 +131,15 @@ public class DefaultStableMeter : StableMeter {
func add(value: Int, attribute: [String : AttributeValue]) {}
}

private class NoopLongCounterBuilder : LongCounterBuilder {
private class NoopLongCounterBuilder : LongCounterBuilder {
mamunto marked this conversation as resolved.
Show resolved Hide resolved
func setUnit(_ unit: String) -> any LongCounterBuilder {
self
}

func setDescription(_ description: String) -> any LongCounterBuilder {
self
}

func ofDoubles() -> DoubleCounterBuilder {
NoopDoubleCounterBuilder()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import Foundation
public class DefaultStableMeterProvider: StableMeterProvider {
static let noopMeterBuilder = NoopMeterBuilder()

public init() {}

public static func noop() -> MeterBuilder {
noopMeterBuilder
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
import Foundation

public protocol LongCounterBuilder : AnyObject {
func setUnit(_ unit: String) -> LongCounterBuilder
mamunto marked this conversation as resolved.
Show resolved Hide resolved
func setDescription(_ description: String) -> LongCounterBuilder
func ofDoubles() -> DoubleCounterBuilder
func build() -> LongCounter
func buildWithCallback(_ callback: @escaping (ObservableLongMeasurement) -> Void) -> ObservableLongCounter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,7 @@
import Foundation

public protocol LongGaugeBuilder : AnyObject {
func setUnit(_ unit: String) -> LongGaugeBuilder
mamunto marked this conversation as resolved.
Show resolved Hide resolved
func setDescription(_ description: String) -> LongGaugeBuilder
func buildWithCallback(_ callback: @escaping (ObservableLongMeasurement) -> Void) -> ObservableLongGauge
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,14 @@ public class LongCounterMeterBuilderSdk: LongCounterBuilder, InstrumentBuilder {
-> OpenTelemetryApi.ObservableLongCounter {
registerLongAsynchronousInstrument(type: .observableCounter, updater: callback)
}

public func setDescription(_ description: String) -> LongCounterBuilder {
mamunto marked this conversation as resolved.
Show resolved Hide resolved
self.description = description
return self
}

public func setUnit(_ unit: String) -> LongCounterBuilder {
self.unit = unit
return self
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,13 @@ public class LongGaugeBuilderSdk : LongGaugeBuilder, InstrumentBuilder {
registerLongAsynchronousInstrument(type: type, updater: callback)
}



public func setDescription(_ description: String) -> LongGaugeBuilder {
mamunto marked this conversation as resolved.
Show resolved Hide resolved
self.description = description
return self
}

public func setUnit(_ unit: String) -> LongGaugeBuilder {
self.unit = unit
return self
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,36 @@ import OpenTelemetryApi
/// exports the spans to wake up and start a new export cycle.
/// This batchSpanProcessor can cause high contention in a very high traffic service.
public struct BatchSpanProcessor: SpanProcessor {
fileprivate static let SPAN_PROCESSOR_TYPE_LABEL: String = "processorType"
fileprivate static let SPAN_PROCESSOR_DROPPED_LABEL: String = "dropped"
fileprivate static let SPAN_PROCESSOR_TYPE_VALUE: String = BatchSpanProcessor.name

fileprivate var worker: BatchWorker


fileprivate var worker: BatchWorker

public init(spanExporter: SpanExporter, scheduleDelay: TimeInterval = 5, exportTimeout: TimeInterval = 30,
maxQueueSize: Int = 2048, maxExportBatchSize: Int = 512, willExportCallback: ((inout [SpanData]) -> Void)? = nil)
{
worker = BatchWorker(spanExporter: spanExporter,
scheduleDelay: scheduleDelay,
exportTimeout: exportTimeout,
maxQueueSize: maxQueueSize,
maxExportBatchSize: maxExportBatchSize,
willExportCallback: willExportCallback)
worker.start()
}
public static var name: String {
String(describing: Self.self)
}

public init(
spanExporter: SpanExporter,
meterProvider: StableMeterProvider,
scheduleDelay: TimeInterval = 5,
exportTimeout: TimeInterval = 30,
maxQueueSize: Int = 2048,
maxExportBatchSize: Int = 512,
willExportCallback: ((inout [SpanData]) -> Void)? = nil
) {
worker = BatchWorker(
spanExporter: spanExporter,
meterProvider: meterProvider,
scheduleDelay: scheduleDelay,
exportTimeout: exportTimeout,
maxQueueSize: maxQueueSize,
maxExportBatchSize: maxExportBatchSize,
willExportCallback: willExportCallback
)
worker.start()
}

public let isStartRequired = false
public let isEndRequired = true
Expand Down Expand Up @@ -57,36 +72,77 @@ public struct BatchSpanProcessor: SpanProcessor {
/// the data.
/// The list of batched data is protected by a NSCondition which ensures full concurrency.
private class BatchWorker: Thread {
let spanExporter: SpanExporter
let scheduleDelay: TimeInterval
let maxQueueSize: Int
let exportTimeout: TimeInterval
let maxExportBatchSize: Int
let willExportCallback: ((inout [SpanData]) -> Void)?
let halfMaxQueueSize: Int
private let cond = NSCondition()
var spanList = [ReadableSpan]()
var queue: OperationQueue

init(spanExporter: SpanExporter, scheduleDelay: TimeInterval, exportTimeout: TimeInterval, maxQueueSize: Int, maxExportBatchSize: Int, willExportCallback: ((inout [SpanData]) -> Void)?) {
self.spanExporter = spanExporter
self.scheduleDelay = scheduleDelay
self.exportTimeout = exportTimeout
self.maxQueueSize = maxQueueSize
halfMaxQueueSize = maxQueueSize >> 1
self.maxExportBatchSize = maxExportBatchSize
self.willExportCallback = willExportCallback
queue = OperationQueue()
queue.name = "BatchWorker Queue"
queue.maxConcurrentOperationCount = 1
}
let spanExporter: SpanExporter
let meterProvider: StableMeterProvider
let scheduleDelay: TimeInterval
let maxQueueSize: Int
let exportTimeout: TimeInterval
let maxExportBatchSize: Int
let willExportCallback: ((inout [SpanData]) -> Void)?
let halfMaxQueueSize: Int
private let cond = NSCondition()
var spanList = [ReadableSpan]()
var queue: OperationQueue

private var queueSizeGauge: ObservableLongGauge?
private var processedSpansCounter: LongCounter
private let droppedAttrs: [String: AttributeValue]
private let exportedAttrs: [String: AttributeValue]
init(
spanExporter: SpanExporter,
meterProvider: StableMeterProvider,
scheduleDelay: TimeInterval,
exportTimeout: TimeInterval,
maxQueueSize: Int,
maxExportBatchSize: Int,
willExportCallback: ((inout [SpanData]) -> Void)?
) {
self.spanExporter = spanExporter
self.meterProvider = meterProvider
self.scheduleDelay = scheduleDelay
self.exportTimeout = exportTimeout
self.maxQueueSize = maxQueueSize
halfMaxQueueSize = maxQueueSize >> 1
self.maxExportBatchSize = maxExportBatchSize
self.willExportCallback = willExportCallback
queue = OperationQueue()
queue.name = "BatchWorker Queue"
queue.maxConcurrentOperationCount = 1

let meter = meterProvider.meterBuilder(name: "io.opentelemetry.sdk.trace").build()
self.queueSizeGauge = meter.gaugeBuilder(name: "queueSize")
mamunto marked this conversation as resolved.
Show resolved Hide resolved
.ofLongs()
.setDescription("The number of items queued")
.setUnit("1")
.buildWithCallback { result in
result.record(
value: maxQueueSize,
attributes: [
BatchSpanProcessor.SPAN_PROCESSOR_TYPE_LABEL: .string(BatchSpanProcessor.SPAN_PROCESSOR_TYPE_VALUE)
]
)
}
mamunto marked this conversation as resolved.
Show resolved Hide resolved

processedSpansCounter = meter.counterBuilder(name: "processedSpans")
.setUnit("1")
.setDescription("The number of spans processed by the BatchSpanProcessor. [dropped=true if they were dropped due to high throughput]")
.build()
droppedAttrs = [
BatchSpanProcessor.SPAN_PROCESSOR_TYPE_LABEL: .string(BatchSpanProcessor.SPAN_PROCESSOR_TYPE_VALUE),
BatchSpanProcessor.SPAN_PROCESSOR_DROPPED_LABEL: .bool(true)
]
exportedAttrs = [
BatchSpanProcessor.SPAN_PROCESSOR_TYPE_LABEL: .string(BatchSpanProcessor.SPAN_PROCESSOR_TYPE_VALUE),
BatchSpanProcessor.SPAN_PROCESSOR_DROPPED_LABEL: .bool(false)
]
}

func addSpan(span: ReadableSpan) {
cond.lock()
defer { cond.unlock() }

if spanList.count == maxQueueSize {
// TODO: Record a counter for dropped spans.
processedSpansCounter.add(value: 1, attribute: droppedAttrs)
return
}
// TODO: Record a gauge for referenced spans.
mamunto marked this conversation as resolved.
Show resolved Hide resolved
Expand Down Expand Up @@ -148,11 +204,16 @@ private class BatchWorker: Thread {
timeoutTimer.cancel()
}

private func exportAction(spanList: [ReadableSpan], explicitTimeout: TimeInterval? = nil) {
stride(from: 0, to: spanList.endIndex, by: maxExportBatchSize).forEach {
var spansToExport = spanList[$0 ..< min($0 + maxExportBatchSize, spanList.count)].map { $0.toSpanData() }
willExportCallback?(&spansToExport)
spanExporter.export(spans: spansToExport, explicitTimeout: explicitTimeout)
private func exportAction(spanList: [ReadableSpan], explicitTimeout: TimeInterval? = nil) {
stride(from: 0, to: spanList.endIndex, by: maxExportBatchSize).forEach {
var spansToExport = spanList[$0 ..< min($0 + maxExportBatchSize, spanList.count)].map { $0.toSpanData() }
willExportCallback?(&spansToExport)
let result = spanExporter.export(spans: spansToExport, explicitTimeout: explicitTimeout)
if result == .success {
cond.lock()
processedSpansCounter.add(value: spanList.count, attribute: exportedAttrs)
cond.unlock()
mamunto marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,22 @@ class BatchSpansProcessorTests: XCTestCase {
}

func testStartEndRequirements() {
let spansProcessor = BatchSpanProcessor(spanExporter: WaitingSpanExporter(numberToWaitFor: 0))
let spansProcessor = BatchSpanProcessor(
spanExporter: WaitingSpanExporter(numberToWaitFor: 0),
meterProvider: DefaultStableMeterProvider()
)
XCTAssertFalse(spansProcessor.isStartRequired)
XCTAssertTrue(spansProcessor.isEndRequired)
}

func testExportDifferentSampledSpans() {
let waitingSpanExporter = WaitingSpanExporter(numberToWaitFor: 2)

tracerSdkFactory.addSpanProcessor(BatchSpanProcessor(spanExporter: waitingSpanExporter, scheduleDelay: maxScheduleDelay))
tracerSdkFactory.addSpanProcessor(BatchSpanProcessor(
spanExporter: waitingSpanExporter,
meterProvider: DefaultStableMeterProvider(),
scheduleDelay: maxScheduleDelay)
)
let span1 = createSampledEndedSpan(spanName: spanName1)
let span2 = createSampledEndedSpan(spanName: spanName2)
let exported = waitingSpanExporter.waitForExport()
Expand All @@ -63,7 +70,12 @@ class BatchSpansProcessorTests: XCTestCase {
func testExportMoreSpansThanTheBufferSize() {
let waitingSpanExporter = WaitingSpanExporter(numberToWaitFor: 6)

tracerSdkFactory.addSpanProcessor(BatchSpanProcessor(spanExporter: waitingSpanExporter, scheduleDelay: maxScheduleDelay, maxQueueSize: 6, maxExportBatchSize: 2))
tracerSdkFactory.addSpanProcessor(BatchSpanProcessor(
spanExporter: waitingSpanExporter,
meterProvider: DefaultStableMeterProvider(),
scheduleDelay: maxScheduleDelay,
maxQueueSize: 6, maxExportBatchSize: 2)
)

let span1 = createSampledEndedSpan(spanName: spanName1)
let span2 = createSampledEndedSpan(spanName: spanName1)
Expand All @@ -82,7 +94,13 @@ class BatchSpansProcessorTests: XCTestCase {

func testForceExport() {
let waitingSpanExporter = WaitingSpanExporter(numberToWaitFor: 1)
let batchSpansProcessor = BatchSpanProcessor(spanExporter: waitingSpanExporter, scheduleDelay: 10, maxQueueSize: 10000, maxExportBatchSize: 2000)
let batchSpansProcessor = BatchSpanProcessor(
spanExporter: waitingSpanExporter,
meterProvider: DefaultStableMeterProvider(),
scheduleDelay: 10,
maxQueueSize: 10000,
maxExportBatchSize: 2000
)
tracerSdkFactory.addSpanProcessor(batchSpansProcessor)

for _ in 0 ..< 100 {
Expand All @@ -96,7 +114,10 @@ class BatchSpansProcessorTests: XCTestCase {
func testExportSpansToMultipleServices() {
let waitingSpanExporter = WaitingSpanExporter(numberToWaitFor: 2)
let waitingSpanExporter2 = WaitingSpanExporter(numberToWaitFor: 2)
tracerSdkFactory.addSpanProcessor(BatchSpanProcessor(spanExporter: MultiSpanExporter(spanExporters: [waitingSpanExporter, waitingSpanExporter2]), scheduleDelay: maxScheduleDelay))
tracerSdkFactory.addSpanProcessor(BatchSpanProcessor(
spanExporter: MultiSpanExporter(spanExporters: [waitingSpanExporter, waitingSpanExporter2]),
meterProvider: DefaultStableMeterProvider(),
scheduleDelay: maxScheduleDelay))

let span1 = createSampledEndedSpan(spanName: spanName1)
let span2 = createSampledEndedSpan(spanName: spanName2)
Expand All @@ -110,7 +131,13 @@ class BatchSpansProcessorTests: XCTestCase {
let maxQueuedSpans = 8
let waitingSpanExporter = WaitingSpanExporter(numberToWaitFor: maxQueuedSpans)

tracerSdkFactory.addSpanProcessor(BatchSpanProcessor(spanExporter: MultiSpanExporter(spanExporters: [waitingSpanExporter, blockingSpanExporter]), scheduleDelay: maxScheduleDelay, maxQueueSize: maxQueuedSpans, maxExportBatchSize: maxQueuedSpans / 2))
tracerSdkFactory.addSpanProcessor(BatchSpanProcessor(
spanExporter: MultiSpanExporter(spanExporters: [waitingSpanExporter, blockingSpanExporter]),
meterProvider: DefaultStableMeterProvider(),
scheduleDelay: maxScheduleDelay,
maxQueueSize: maxQueuedSpans,
maxExportBatchSize: maxQueuedSpans / 2)
)

var spansToExport = [SpanData]()
// Wait to block the worker thread in the BatchSampledSpansProcessor. This ensures that no items
Expand Down Expand Up @@ -162,7 +189,11 @@ class BatchSpansProcessorTests: XCTestCase {
func testExportNotSampledSpans() {
let waitingSpanExporter = WaitingSpanExporter(numberToWaitFor: 1)

tracerSdkFactory.addSpanProcessor(BatchSpanProcessor(spanExporter: waitingSpanExporter, scheduleDelay: maxScheduleDelay))
tracerSdkFactory.addSpanProcessor(BatchSpanProcessor(
spanExporter: waitingSpanExporter,
meterProvider: DefaultStableMeterProvider(),
scheduleDelay: maxScheduleDelay)
)

createNotSampledEndedSpan(spanName: spanName1)
createNotSampledEndedSpan(spanName: spanName2)
Expand All @@ -181,7 +212,11 @@ class BatchSpansProcessorTests: XCTestCase {
let waitingSpanExporter = WaitingSpanExporter(numberToWaitFor: 1)

// Set the export delay to zero, for no timeout, in order to confirm the #flush() below works
tracerSdkFactory.addSpanProcessor(BatchSpanProcessor(spanExporter: waitingSpanExporter, scheduleDelay: 0.1))
tracerSdkFactory.addSpanProcessor(BatchSpanProcessor(
spanExporter: waitingSpanExporter,
meterProvider: DefaultStableMeterProvider(),
scheduleDelay: 0.1)
)

let span2 = createSampledEndedSpan(spanName: spanName2)

Expand Down
Loading