Skip to content

Commit

Permalink
Adds Cassandra server integration
Browse files Browse the repository at this point in the history
This contains a tracing implementation for [Cassandra](https://github.com/apache/cassandra/blob/trunk/src/java/org/apache/cassandra/tracing/Tracing.java).

`brave.cassandra.Tracing` extracts trace state from the custom payload
of incoming requests. How long each request takes, each suboperation,
and relevant tags like the session ID are reported to Zipkin.

Clients must enable tracing. If they want to continue an existing trace,
they must add propagation fields to the custom payload of a statement.

```java
// minimally, you need to prepare a statement and enable tracing
preparedStatement.enableTracing();

// By default, B3 style is used, so instrumented clients do something like this
Map<String, ByteBuffer> payload = new LinkedHashMap<>();
payload.set("X-B3-TraceId", byteBuffer("463ac35c9f6413ad"));
payload.set("X-B3-ParentSpanId", byteBuffer("463ac35c9f6413ad"));
payload.set("X-B3-SpanId", byteBuffer("72485a3953bb6124"));
payload.set("X-B3-Sampled", byteBuffer("1"));
preparedStatement.setOutgoingPayload(payload);
```

For this to work, you must add classes to your cassandra classpath. It
then needs the system property "cassandra.custom_tracing_class" set to
"brave.cassandra.Tracing".

The easiest way to get started is to place this project's "all" jar in
cassandra's lib directory and start cassandra with java options like this:

```bash
$ JVM_OPTS='-Dzipkin.http_endpoint=http://localhost:9411/api/v1/spans -Dcassandra.custom_tracing_class=brave.cassandra.Tracing' cassandra
```

Note this jar is about 200KiB and does not depend on any classes besides
Brave and Zipkin. It has limited configuration to the below:

System property | Default | Description
--- | --- | ---
zipkin.http_endpoint | none | The url to Zipkin's POST endpoint. Ex. http://myhost:9411/api/v1/spans
zipkin.service_name | cassandra | The name that shows up in Zipkin's search and dependency graph

Some may want to make a custom jar that includes explicit configuration
for their site's Zipkin service. Others may want to relegate that to yaml
or configuration. This plugin can use either an implicit `brave.Tracing`
component setup elsewhere in the JVM, or one explicitly called via the
constructor.
  • Loading branch information
Adrian Cole committed May 12, 2017
1 parent ca6503c commit e79581a
Show file tree
Hide file tree
Showing 9 changed files with 1,157 additions and 0 deletions.
52 changes: 52 additions & 0 deletions instrumentation/cassandra/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
# brave-instrumentation-cassandra
This contains a tracing implementation for [Cassandra](https://github.com/apache/cassandra/blob/trunk/src/java/org/apache/cassandra/tracing/Tracing.java).

`Tracing` extracts trace state from the custom payload of incoming
requests. How long each request takes, along with relevant tags like the
session ID are reported to Zipkin.

## Client side requirements
Clients must enable tracing. If they want to continue an existing trace,
they must add propagation fields to the custom payload of a statement.

```java
// minimally, you need to prepare a statement and enable tracing
preparedStatement.enableTracing();

// By default, B3 style is used, so instrumented clients do something like this
Map<String, ByteBuffer> payload = new LinkedHashMap<>();
payload.set("X-B3-TraceId", byteBuffer("463ac35c9f6413ad"));
payload.set("X-B3-ParentSpanId", byteBuffer("463ac35c9f6413ad"));
payload.set("X-B3-SpanId", byteBuffer("72485a3953bb6124"));
payload.set("X-B3-Sampled", byteBuffer("1"));
preparedStatement.setOutgoingPayload(payload);
```

## Server Integration
For this to work, you must add classes to your cassandra classpath. It
then needs the system property "cassandra.custom_tracing_class" set to
"brave.cassandra.Tracing".

### Http
The easiest way to get started is to place this project's "all" jar in
cassandra's lib directory and start cassandra with java options like this:

```bash
$ JVM_OPTS='-Dzipkin.http_endpoint=http://localhost:9411/api/v1/spans -Dcassandra.custom_tracing_class=brave.cassandra.Tracing' cassandra
```

Note this jar is about 200KiB and does not depend on any classes besides
Brave and Zipkin. It has limited configuration to the below:

System property | Default | Description
--- | --- | ---
zipkin.http_endpoint | none | The url to Zipkin's POST endpoint. Ex. http://myhost:9411/api/v1/spans
zipkin.service_name | cassandra | The name that shows up in Zipkin's search and dependency graph

### Custom

Some may want to make a custom jar that includes explicit configuration
for their site's Zipkin service. Others may want to relegate that to yaml
or configuration. This plugin can use either an implicit `brave.Tracing`
component setup elsewhere in the JVM, or one explicitly called via the
constructor.
69 changes: 69 additions & 0 deletions instrumentation/cassandra/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
<?xml version="1.0"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>io.zipkin.brave</groupId>
<artifactId>brave-instrumentation-parent</artifactId>
<version>4.3.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>brave-instrumentation-cassandra</artifactId>
<name>Brave Instrumentation: Cassandra</name>

<properties>
<main.basedir>${project.basedir}/../..</main.basedir>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.cassandra</groupId>
<artifactId>cassandra-all</artifactId>
<version>3.10</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
</exclusion>
</exclusions>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.zipkin.reporter</groupId>
<artifactId>zipkin-sender-urlconnection</artifactId>
<version>${zipkin-reporter.version}</version>
</dependency>
<dependency>
<groupId>com.datastax.cassandra</groupId>
<artifactId>cassandra-driver-core</artifactId>
<version>3.2.0</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-shade-plugin</artifactId>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<shadedArtifactAttached>true</shadedArtifactAttached>
<minimizeJar>true</minimizeJar>
<shadedClassifierName>all</shadedClassifierName>
<createDependencyReducedPom>false</createDependencyReducedPom>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
123 changes: 123 additions & 0 deletions instrumentation/cassandra/src/main/java/brave/cassandra/Tracing.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
package brave.cassandra;

import brave.Span;
import brave.Tracer;
import brave.propagation.SamplingFlags;
import brave.propagation.TraceContextOrSamplingFlags;
import java.net.InetAddress;
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.UUID;
import org.apache.cassandra.tracing.TraceState;
import org.apache.cassandra.utils.FBUtilities;
import zipkin.Endpoint;
import zipkin.reporter.AsyncReporter;
import zipkin.reporter.urlconnection.URLConnectionSender;

/**
* This creates Zipkin server spans for incoming cassandra requests. Spans are created when there's
* a tracing component available, and the incoming payload is not explicitly unsampled.
*
* <h3>Configuring a tracing component</h3>
* If the system property "zipkin.http_endpoint" is set, a basic tracing component is setup.
*
* <p>Otherwise, {@link brave.Tracing#current()} is used. This relies on external bootstrapping of
* {@link brave.Tracing}.
*
* <p>Alternatively, you can subclass this and fix configuration to your favorite mechanism.
*/
public class Tracing extends org.apache.cassandra.tracing.Tracing {
final InetAddress coordinator = FBUtilities.getLocalAddress();
final TracingComponent component;

public Tracing(brave.Tracing tracing) { // subclassable to pin configuration
component = new TracingComponent.Explicit(tracing);
}

public Tracing() {
String endpoint = System.getProperty("zipkin.http_endpoint");
if (endpoint == null) {
component = new TracingComponent.Current();
return;
}
brave.Tracing tracing = brave.Tracing.newBuilder()
.localServiceName(System.getProperty("zipkin.service_name", "cassandra"))
.reporter(AsyncReporter.create(URLConnectionSender.create(endpoint)))
.build();
component = new TracingComponent.Explicit(tracing);
}

/**
* When tracing is enabled and available, this tries to extract trace keys from the custom
* payload. If that's possible, it re-uses the trace identifiers and starts a server span.
* Otherwise, a new trace is created.
*/
@Override protected final UUID newSession(UUID sessionId, TraceType traceType,
Map<String, ByteBuffer> customPayload) {
Tracer tracer = component.tracer();
if (tracer == null || traceType == TraceType.NONE) {
return super.newSession(sessionId, traceType, customPayload);
}
Span span = spanFromPayload(tracer, customPayload);

// override instead of call from super as otherwise we cannot store a reference to the span
assert get() == null;
TraceState state = new ZipkinTraceState(coordinator, sessionId, traceType, span);
set(state);
sessions.put(sessionId, state);
return sessionId;
}

/** This extracts the RPC span encoded in the custom payload, or starts a new trace */
Span spanFromPayload(Tracer tracer, Map<String, ByteBuffer> payload) {
TraceContextOrSamplingFlags contextOrFlags = payload == null
? TraceContextOrSamplingFlags.create(SamplingFlags.EMPTY)
: component.extractor().extract(payload);
return contextOrFlags.context() != null
? tracer.joinSpan(contextOrFlags.context())
: tracer.newTrace(contextOrFlags.samplingFlags());
}

@Override protected final void stopSessionImpl() {
ZipkinTraceState state = (ZipkinTraceState) get();
if (state != null) state.incoming.finish();
}

@Override public final TraceState begin(String request, InetAddress client,
Map<String, String> parameters) {
ZipkinTraceState state = ((ZipkinTraceState) get());
Span span = state.incoming;
if (span.isNoop()) return state;

span.name(request).kind(Span.Kind.SERVER);
span.tag("cassandra.session_id", state.sessionId.toString());
Endpoint.Builder remoteEndpoint = Endpoint.builder().serviceName("");
remoteEndpoint.parseIp(client);
span.remoteEndpoint(remoteEndpoint.build());
span.start();
return state;
}

@Override protected final TraceState newTraceState(InetAddress coordinator, UUID sessionId,
TraceType traceType) {
throw new AssertionError();
}

@Override public final void trace(ByteBuffer sessionId, String message, int ttl) {
// not current tracing outbound messages
}

static final class ZipkinTraceState extends TraceState {
final Span incoming;

ZipkinTraceState(InetAddress coordinator, UUID sessionId, TraceType traceType,
Span incoming) {
super(coordinator, sessionId, traceType);
this.incoming = incoming;
}

@Override protected void traceImpl(String message) {
incoming.annotate(message); // skip creating local spans for now
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package brave.cassandra;

import brave.Tracer;
import brave.propagation.Propagation;
import brave.propagation.TraceContext;
import java.nio.ByteBuffer;
import java.util.Map;
import zipkin.internal.Util;

abstract class TracingComponent {
/** Getter that pulls trace fields from ascii values */
static final Propagation.Getter<Map<String, ByteBuffer>, String> GETTER = (carrier, key) -> {
ByteBuffer buf = carrier.get(key);
return buf != null ? Util.UTF_8.decode(buf).toString() : null;
};

abstract Tracer tracer();

abstract TraceContext.Extractor<Map<String, ByteBuffer>> extractor();

static final class Current extends TracingComponent {
@Override Tracer tracer() {
return brave.Tracing.currentTracer();
}

@Override TraceContext.Extractor<Map<String, ByteBuffer>> extractor() {
brave.Tracing tracing = brave.Tracing.current();
return tracing != null ? tracing.propagation().extractor(GETTER) : null;
}
}

static final class Explicit extends TracingComponent {
final Tracer tracer;
final TraceContext.Extractor<Map<String, ByteBuffer>> extractor;

Explicit(brave.Tracing tracing) {
if (tracing == null) throw new NullPointerException("tracing == null");
this.tracer = tracing.tracer();
this.extractor = tracing.propagation().extractor(GETTER);
}

@Override Tracer tracer() {
return tracer;
}

@Override TraceContext.Extractor<Map<String, ByteBuffer>> extractor() {
return extractor;
}
}
}
Loading

0 comments on commit e79581a

Please sign in to comment.