Skip to content

Commit

Permalink
Adds Cassandra server integration
Browse files Browse the repository at this point in the history
This contains tracing instrumentation for [Cassandra](https://github.com/apache/cassandra/blob/trunk/src/java/org/apache/cassandra/tracing/Tracing.java).

`brave.cassandra.Tracing` extracts trace state from the custom payload
of incoming requests. How long each request takes, each suboperation,
and relevant tags like the session ID are reported to Zipkin.
  • Loading branch information
Adrian Cole committed May 12, 2017
1 parent ca6503c commit 1f18ddb
Show file tree
Hide file tree
Showing 9 changed files with 1,157 additions and 0 deletions.
52 changes: 52 additions & 0 deletions instrumentation/cassandra/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
# brave-instrumentation-cassandra
This contains tracing instrumentation for [Cassandra](https://github.com/apache/cassandra/blob/trunk/src/java/org/apache/cassandra/tracing/Tracing.java).

`brave.cassandra.Tracing` extracts trace state from the custom payload
of incoming requests. How long each request takes, each suboperation,
and relevant tags like the session ID are reported to Zipkin.

## Client side requirements
Clients must enable tracing. If they want to continue an existing trace,
they must add propagation fields to the custom payload of a statement.

```java
// minimally, you need to prepare a statement and enable tracing
preparedStatement.enableTracing();

// By default, B3 style is used, so instrumented clients do something like this
Map<String, ByteBuffer> payload = new LinkedHashMap<>();
payload.set("X-B3-TraceId", byteBuffer("463ac35c9f6413ad"));
payload.set("X-B3-ParentSpanId", byteBuffer("463ac35c9f6413ad"));
payload.set("X-B3-SpanId", byteBuffer("72485a3953bb6124"));
payload.set("X-B3-Sampled", byteBuffer("1"));
preparedStatement.setOutgoingPayload(payload);
```

## Server Integration
For this to work, you must add classes to your cassandra classpath. It
then needs the system property "cassandra.custom_tracing_class" set to
"brave.cassandra.Tracing".

### Http
The easiest way to get started is to place this project's "all" jar in
cassandra's lib directory and start cassandra with java options like this:

```bash
$ JVM_OPTS='-Dzipkin.http_endpoint=http://localhost:9411/api/v1/spans -Dcassandra.custom_tracing_class=brave.cassandra.Tracing' cassandra
```

Note this jar is about 200KiB and does not depend on any classes besides
Brave and Zipkin. It has limited configuration to the below:

System property | Default | Description
--- | --- | ---
zipkin.http_endpoint | none | The url to Zipkin's POST endpoint. Ex. http://myhost:9411/api/v1/spans
zipkin.service_name | cassandra | The name that shows up in Zipkin's search and dependency graph

### Custom

Some may want to make a custom jar that includes explicit configuration
for their site's Zipkin service. Others may want to relegate that to yaml
or configuration. This plugin can use either an implicit `brave.Tracing`
component setup elsewhere in the JVM, or one explicitly called via the
constructor.
69 changes: 69 additions & 0 deletions instrumentation/cassandra/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
<?xml version="1.0"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>io.zipkin.brave</groupId>
<artifactId>brave-instrumentation-parent</artifactId>
<version>4.3.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>brave-instrumentation-cassandra</artifactId>
<name>Brave Instrumentation: Cassandra</name>

<properties>
<main.basedir>${project.basedir}/../..</main.basedir>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.cassandra</groupId>
<artifactId>cassandra-all</artifactId>
<version>3.10</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
</exclusion>
</exclusions>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.zipkin.reporter</groupId>
<artifactId>zipkin-sender-urlconnection</artifactId>
<version>${zipkin-reporter.version}</version>
</dependency>
<dependency>
<groupId>com.datastax.cassandra</groupId>
<artifactId>cassandra-driver-core</artifactId>
<version>3.2.0</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-shade-plugin</artifactId>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<shadedArtifactAttached>true</shadedArtifactAttached>
<minimizeJar>true</minimizeJar>
<shadedClassifierName>all</shadedClassifierName>
<createDependencyReducedPom>false</createDependencyReducedPom>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
123 changes: 123 additions & 0 deletions instrumentation/cassandra/src/main/java/brave/cassandra/Tracing.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
package brave.cassandra;

import brave.Span;
import brave.Tracer;
import brave.propagation.SamplingFlags;
import brave.propagation.TraceContextOrSamplingFlags;
import java.net.InetAddress;
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.UUID;
import org.apache.cassandra.tracing.TraceState;
import org.apache.cassandra.utils.FBUtilities;
import zipkin.Endpoint;
import zipkin.reporter.AsyncReporter;
import zipkin.reporter.urlconnection.URLConnectionSender;

/**
* This creates Zipkin server spans for incoming cassandra requests. Spans are created when there's
* a tracing component available, and the incoming payload is not explicitly unsampled.
*
* <h3>Configuring a tracing component</h3>
* If the system property "zipkin.http_endpoint" is set, a basic tracing component is setup.
*
* <p>Otherwise, {@link brave.Tracing#current()} is used. This relies on external bootstrapping of
* {@link brave.Tracing}.
*
* <p>Alternatively, you can subclass this and fix configuration to your favorite mechanism.
*/
public class Tracing extends org.apache.cassandra.tracing.Tracing {
final InetAddress coordinator = FBUtilities.getLocalAddress();
final TracingComponent component;

public Tracing(brave.Tracing tracing) { // subclassable to pin configuration
component = new TracingComponent.Explicit(tracing);
}

public Tracing() {
String endpoint = System.getProperty("zipkin.http_endpoint");
if (endpoint == null) {
component = new TracingComponent.Current();
return;
}
brave.Tracing tracing = brave.Tracing.newBuilder()
.localServiceName(System.getProperty("zipkin.service_name", "cassandra"))
.reporter(AsyncReporter.create(URLConnectionSender.create(endpoint)))
.build();
component = new TracingComponent.Explicit(tracing);
}

/**
* When tracing is enabled and available, this tries to extract trace keys from the custom
* payload. If that's possible, it re-uses the trace identifiers and starts a server span.
* Otherwise, a new trace is created.
*/
@Override protected final UUID newSession(UUID sessionId, TraceType traceType,
Map<String, ByteBuffer> customPayload) {
Tracer tracer = component.tracer();
if (tracer == null || traceType == TraceType.NONE) {
return super.newSession(sessionId, traceType, customPayload);
}
Span span = spanFromPayload(tracer, customPayload);

// override instead of call from super as otherwise we cannot store a reference to the span
assert get() == null;
TraceState state = new ZipkinTraceState(coordinator, sessionId, traceType, span);
set(state);
sessions.put(sessionId, state);
return sessionId;
}

/** This extracts the RPC span encoded in the custom payload, or starts a new trace */
Span spanFromPayload(Tracer tracer, Map<String, ByteBuffer> payload) {
TraceContextOrSamplingFlags contextOrFlags = payload == null
? TraceContextOrSamplingFlags.create(SamplingFlags.EMPTY)
: component.extractor().extract(payload);
return contextOrFlags.context() != null
? tracer.joinSpan(contextOrFlags.context())
: tracer.newTrace(contextOrFlags.samplingFlags());
}

@Override protected final void stopSessionImpl() {
ZipkinTraceState state = (ZipkinTraceState) get();
if (state != null) state.incoming.finish();
}

@Override public final TraceState begin(String request, InetAddress client,
Map<String, String> parameters) {
ZipkinTraceState state = ((ZipkinTraceState) get());
Span span = state.incoming;
if (span.isNoop()) return state;

span.name(request).kind(Span.Kind.SERVER);
span.tag("cassandra.session_id", state.sessionId.toString());
Endpoint.Builder remoteEndpoint = Endpoint.builder().serviceName("");
remoteEndpoint.parseIp(client);
span.remoteEndpoint(remoteEndpoint.build());
span.start();
return state;
}

@Override protected final TraceState newTraceState(InetAddress coordinator, UUID sessionId,
TraceType traceType) {
throw new AssertionError();
}

@Override public final void trace(ByteBuffer sessionId, String message, int ttl) {
// not current tracing outbound messages
}

static final class ZipkinTraceState extends TraceState {
final Span incoming;

ZipkinTraceState(InetAddress coordinator, UUID sessionId, TraceType traceType,
Span incoming) {
super(coordinator, sessionId, traceType);
this.incoming = incoming;
}

@Override protected void traceImpl(String message) {
incoming.annotate(message); // skip creating local spans for now
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package brave.cassandra;

import brave.Tracer;
import brave.propagation.Propagation;
import brave.propagation.TraceContext;
import java.nio.ByteBuffer;
import java.util.Map;
import zipkin.internal.Util;

abstract class TracingComponent {
/** Getter that pulls trace fields from ascii values */
static final Propagation.Getter<Map<String, ByteBuffer>, String> GETTER = (carrier, key) -> {
ByteBuffer buf = carrier.get(key);
return buf != null ? Util.UTF_8.decode(buf).toString() : null;
};

abstract Tracer tracer();

abstract TraceContext.Extractor<Map<String, ByteBuffer>> extractor();

static final class Current extends TracingComponent {
@Override Tracer tracer() {
return brave.Tracing.currentTracer();
}

@Override TraceContext.Extractor<Map<String, ByteBuffer>> extractor() {
brave.Tracing tracing = brave.Tracing.current();
return tracing != null ? tracing.propagation().extractor(GETTER) : null;
}
}

static final class Explicit extends TracingComponent {
final Tracer tracer;
final TraceContext.Extractor<Map<String, ByteBuffer>> extractor;

Explicit(brave.Tracing tracing) {
if (tracing == null) throw new NullPointerException("tracing == null");
this.tracer = tracing.tracer();
this.extractor = tracing.propagation().extractor(GETTER);
}

@Override Tracer tracer() {
return tracer;
}

@Override TraceContext.Extractor<Map<String, ByteBuffer>> extractor() {
return extractor;
}
}
}
Loading

0 comments on commit 1f18ddb

Please sign in to comment.