-
Notifications
You must be signed in to change notification settings - Fork 13
Description
Overview
This task is to complete the implementation of trace emission.
This work is started in the improve-observability branch, and is not yet available on develop. Base your work off of the improve-observability branch.
In the improve-observability branch, we have introduced a new design for emitting tracing / OTEL-style events when Orbital interacts with services (databases , kafka topics, etc).
We're not removing the old approach (yet), but adding new trace event emission alongside.
The general pattern is as follows:
First, create a span:
// collectionName is a meaningful resource name - ie., topic name for kafka, collection name for mongo.
// It's not always possible to use one here (eg., it turned out to be unreasonably difficult in SQL, as an operation may interact with multiple tables)
val traceContext = eventDispatcher.createOperationTraceSpan(service, operation, collectionName)
Emit a request event before performing I/O
Before invoking a call, emit an event. If working in a reactive context, this might be doOnSubscribe { ... }
When building a request event, the payload in the exhcnage metadata should be the thing we're sending.
eg: The body of a request in HTTP, the raw SQL when interacting with a database, etc.
Some requests don't have a body (eg., subscribing to a topic on Kafka), and that's OK, just use null here.
traceContext.emitEvent(
kind = TracingEventKind.OK,
spanState = SpanState.ACTIVE,
// payload type is used to control how payloads are exposed when publishing / storing an event.
// set this to the Vyne type instance that describes a payload being sent or received in this span
payloadType = null,
// There are specific TracingEventExchangeMetadata implementations for each of the types of operations we perform.
// See TracingEventExchangeMetadata for the available implementations
exchangeMetadata = traceRequestMetadata,
verb = "Upsert"
)
Note that payload metadata is sent in a lambda, rather than as a value on the the request, as this can be expensive to compute, and we defer computation unless we know we're going to capture the event.
Emit a request capturing the response.
Similarly, one a response is returned, capture the response on a trace event:
val recordEvent = traceSpan.emitEvent(
TracingEventKind.OK,
SpanState.COMPLETE, // responses generally update the span to be complete, but not alwasy (eg., a streaming response stays open)
vyneType,
DatabaseResponseRecord() { objectMapper.writeValueAsString(mapValue) },
"Record received"
)
When capturing a value returned from a read operation (including reading from a Kafka stream etc), then store a reference to the emitted event of the result being returned.
Then, when constructing a typed instance, set the eventIds on the data source.
// previously:
source = operationResult.asOperationReferenceDataSource()
// now:
source = operationResult.asOperationReferenceDataSource(recordEvent.idSet)
Capture error scenarios
If either the request or the response resulted in an error, this should also be captured and an event emitted
traceContext.emitEvent(
TracingEventKind.ERROR,
SpanState.COMPLETE,
null,
DatabaseResponse(-1) { error.message },
"Read error"
)
Examples
This pattern has been implemented across a few of our invokers already. See:
- JdbcInvoker (and it's collaborators
JdbcUpsertInvokerandJdbcQueryInvoker MongoDbInvoker(and it's collaborators,MongoReadOnlyQueryInvokerandMongoMutatingQueryInvokerRestTemplateInvokerKafkaInvoker(including updates toKafkaStreamManager, which ultimately handles stream subscription, etc.
Testing
There are also tests implemented for these.
You can get a sense of the test coverage we're aiming for here by diffing the relavnt tests against develop.
Generally, the pattern is:
// Get a dedicated event sink using the test-helper function:
val (eventBroker, eventSink) = QueryContextEventBroker.withTestTraceSpan()
// other stuff
// pass the eventBroker into the query:
val result = vyne.query(
"""
given { Flight = {
objectId: null,
code : 'LHR-AKL',
cost: 200.15
}
}
call FlightsDb::insertFlight
""", eventBroker = eventBroker
)
// other stuff
// then assert that events were collected into the event sink:
eventSink.collectedEvents.shouldHaveSize(2)
val writeRequestEvent = eventSink.collectedEvents.first()
val databaseRequestMetadata = writeRequestEvent.exchangeMetadata.shouldBeInstanceOf<DatabaseRequest>()
val upsertRequest = databaseRequestMetadata.payload()
upsertRequest.shouldNotBeNull().shouldStartWith("""{"code":"LHR-AKL","cost":200.15"""")
val upsertResponseEvent = eventSink.collectedEvents.last()
upsertResponseEvent.spanState.shouldBe(SpanState.COMPLETE)
Requirement
Implement the updated tracing approach, and associated tests for the following invokers:
- DynamoInvoker
- HazelcastInvoker
- LamdaInvoker
- S3Invoker
- ServiceBusInvoker
- SOAPInvoker
- SqsInvoker
- StoreInvoker