Skip to content

Commit

Permalink
Support Cassandra Storage in v3 (#3570)
Browse files Browse the repository at this point in the history
  • Loading branch information
mrproliu authored Oct 30, 2023
1 parent b34a84e commit 04feb90
Show file tree
Hide file tree
Showing 39 changed files with 5,354 additions and 0 deletions.
1 change: 1 addition & 0 deletions .github/workflows/test-v3.yml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ jobs:
- name: receiver-zipkin-kafka
- name: receiver-zipkin-rabbitmq
- name: receiver-zipkin-scribe
- name: storage-cassandra
steps:
- name: Checkout Repository
uses: actions/checkout@v2
Expand Down
1 change: 1 addition & 0 deletions zipkin-server/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
<module>receiver-zipkin-activemq</module>
<module>receiver-zipkin-rabbitmq</module>
<module>receiver-zipkin-scribe</module>
<module>storage-cassandra</module>
<module>http-query-plugin</module>
<module>health-query-plugin</module>
<module>telemetry-zipkin</module>
Expand Down
10 changes: 10 additions & 0 deletions zipkin-server/server-starter/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,11 @@
<artifactId>storage-elasticsearch-plugin</artifactId>
<version>${skywalking.version}</version>
</dependency>
<dependency>
<groupId>io.zipkin</groupId>
<artifactId>storage-cassandra</artifactId>
<version>${project.version}</version>
</dependency>

<!-- storage ext -->
<dependency>
Expand All @@ -72,6 +77,11 @@
<artifactId>zipkin-dependency-storage-banyandb</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.zipkin</groupId>
<artifactId>zipkin-dependency-storage-cassandra</artifactId>
<version>${project.version}</version>
</dependency>

<!-- zipkin receiver -->
<dependency>
Expand Down
17 changes: 17 additions & 0 deletions zipkin-server/server-starter/src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,23 @@ storage: &storage
superDatasetBlockIntervalHours: ${ZIPKIN_STORAGE_BANYANDB_SUPER_DATASET_BLOCK_INTERVAL_HOURS:4} # Unit is hour
superDatasetSegmentIntervalDays: ${ZIPKIN_STORAGE_BANYANDB_SUPER_DATASET_SEGMENT_INTERVAL_DAYS:1} # Unit is day
specificGroupSettings: ${ZIPKIN_STORAGE_BANYANDB_SPECIFIC_GROUP_SETTINGS:""} # For example, {"group1": {"blockIntervalHours": 4, "segmentIntervalDays": 1}}
cassandra:
keyspace: ${ZIPKIN_STORAGE_CASSANDRA_KEYSPACE:zipkin}
# Comma separated list of host addresses part of Cassandra cluster. Ports default to 9042 but you can also specify a custom port with 'host:port'.
contactPoints: ${ZIPKIN_STORAGE_CASSANDRA_CONTACT_POINTS:localhost}
# Name of the datacenter that will be considered "local" for load balancing.
localDc: ${ZIPKIN_STORAGE_CASSANDRA_LOCAL_DC:datacenter1}
# Will throw an exception on startup if authentication fails.
username: ${ZIPKIN_STORAGE_CASSANDRA_USERNAME:}
password: ${ZIPKIN_STORAGE_CASSANDRA_PASSWORD:}
# Ensuring that schema exists, if enabled tries to execute script /zipkin-server/storage-cassandra/resources/zipkin-schemas.cql.
ensureSchema: ${ZIPKIN_STORAGE_CASSANDRA_ENSURE_SCHEMA:true}
# Max pooled connections per datacenter-local host.
maxConnections: ${ZIPKIN_STORAGE_CASSANDRA_MAX_CONNECTIONS:8}
# Using ssl for connection, rely on Keystore
use-ssl: ${ZIPKIN_STORAGE_CASSANDRA_USE_SSL:false}
maxSizeOfBatchCql: ${ZIPKIN_STORAGE_CASSANDRA_MAX_SIZE_OF_BATCH_CQL:2000}
asyncBatchPersistentPoolSize: ${ZIPKIN_STORAGE_CASSANDRA_ASYNC_BATCH_PERSISTENT_POOL_SIZE:4}

zipkin-dependency-storage-ext: *storage

Expand Down
73 changes: 73 additions & 0 deletions zipkin-server/storage-cassandra/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
<artifactId>zipkin-server-parent</artifactId>
<groupId>io.zipkin</groupId>
<version>2.24.4-SNAPSHOT</version>
</parent>

<artifactId>storage-cassandra</artifactId>
<name>Storage: Cassandra</name>

<dependencies>
<dependency>
<groupId>io.zipkin</groupId>
<artifactId>zipkin-server-core</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>storage-jdbc-hikaricp-plugin</artifactId>
<version>${skywalking.version}</version>
</dependency>

<dependency>
<groupId>io.zipkin</groupId>
<artifactId>zipkin-server-core</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>com.google.auto.value</groupId>
<artifactId>auto-value-annotations</artifactId>
<version>${auto-value.version}</version>
</dependency>
<dependency>
<groupId>com.google.auto.value</groupId>
<artifactId>auto-value</artifactId>
<version>${auto-value.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>com.datastax.oss</groupId>
<artifactId>java-driver-core</artifactId>
<version>${java-driver.version}</version>
<!-- Exclude unused graph and geo dependencies -->
<exclusions>
<exclusion>
<groupId>com.esri.geometry</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.tinkerpop</groupId>
<artifactId>*</artifactId>
</exclusion>
<!-- We retain reactivestreams even though we don't use it because
Mockito dies trying to mock CqlSession without it. -->
</exclusions>
</dependency>

<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>cluster-standalone-plugin</artifactId>
<version>${skywalking.version}</version>
<scope>test</scope>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package zipkin.server.storage.cassandra;

import org.apache.skywalking.oap.server.core.storage.SessionCacheCallback;
import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
import org.apache.skywalking.oap.server.library.client.request.UpdateRequest;

import java.util.ArrayList;
import java.util.List;

public class CQLExecutor implements InsertRequest, UpdateRequest {
private final String cql;
private final List<Object> params;
private final SessionCacheCallback callback;
private List<CQLExecutor> additionalCQLs;

public CQLExecutor(String cql, List<Object> params, SessionCacheCallback callback, List<CQLExecutor> additionalCQLs) {
this.cql = cql;
this.params = params;
this.callback = callback;
this.additionalCQLs = additionalCQLs;
}

public void appendAdditionalCQLs(List<CQLExecutor> cqlExecutors) {
if (additionalCQLs == null) {
additionalCQLs = new ArrayList<>();
}
additionalCQLs.addAll(cqlExecutors);
}

@Override
public String toString() {
return cql;
}

@Override
public void onInsertCompleted() {
if (callback != null)
callback.onInsertCompleted();
}

@Override
public void onUpdateFailure() {
if (callback != null)
callback.onUpdateFailure();
}

public List<CQLExecutor> getAdditionalCQLs() {
return additionalCQLs;
}

public String getCql() {
return cql;
}

public List<Object> getParams() {
return params;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
/*
* Copyright 2015-2023 The OpenZipkin Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/

package zipkin.server.storage.cassandra;

import com.datastax.oss.driver.api.core.CqlIdentifier;
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.auth.AuthProvider;
import com.datastax.oss.driver.api.core.config.DriverOption;
import com.datastax.oss.driver.api.core.cql.AsyncResultSet;
import com.datastax.oss.driver.api.core.cql.BoundStatement;
import com.datastax.oss.driver.api.core.cql.ResultSet;
import com.datastax.oss.driver.api.core.cql.Row;
import com.datastax.oss.driver.api.core.metadata.schema.KeyspaceMetadata;
import com.datastax.oss.driver.api.core.metadata.schema.TableMetadata;
import com.datastax.oss.driver.internal.core.auth.ProgrammaticPlainTextAuthProvider;
import org.apache.skywalking.oap.server.library.client.Client;
import org.apache.skywalking.oap.server.library.client.healthcheck.DelegatedHealthChecker;
import org.apache.skywalking.oap.server.library.util.HealthChecker;
import org.apache.skywalking.oap.server.library.util.StringUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import zipkin.server.storage.cassandra.internal.SessionBuilder;

import java.io.IOException;
import java.util.Arrays;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletionStage;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

import static com.datastax.oss.driver.api.core.config.DefaultDriverOption.CONNECTION_MAX_REQUESTS;
import static com.datastax.oss.driver.api.core.config.DefaultDriverOption.CONNECTION_POOL_LOCAL_SIZE;

public class CassandraClient implements Client {
static final Logger LOG = LoggerFactory.getLogger(CassandraClient.class);

public static final String RECORD_UNIQUE_UUID_COLUMN = "uuid_unique";

private final CassandraConfig config;
private final DelegatedHealthChecker healthChecker;

private volatile CqlSession cqlSession;

public CassandraClient(CassandraConfig config) {
this.config = config;
this.healthChecker = new DelegatedHealthChecker();
}

public KeyspaceMetadata getMetadata() {
return cqlSession.getMetadata().getKeyspace(config.getKeyspace()).orElse(null);
}

public CqlSession getSession() {
return cqlSession;
}

public int getDefaultTtl(String table) {
return (int) getMetadata().getTable(table)
.map(TableMetadata::getOptions)
.flatMap(o -> Optional.ofNullable(o.get(CqlIdentifier.fromCql("default_time_to_live"))))
.orElse(0);
}

public <T> List<T> executeQuery(String cql, ResultHandler<T> resultHandler, Object... params) {
if (LOG.isDebugEnabled()) {
LOG.debug("Executing CQL: {}", cql);
LOG.debug("CQL parameters: {}", Arrays.toString(params));
}
final BoundStatement stmt = cqlSession.prepare(cql).bind(params);
final ResultSet resultSet = cqlSession.execute(stmt);
healthChecker.health();
if (resultHandler != null) {
return StreamSupport.stream(resultSet.spliterator(), false)
.map(resultHandler::handle).collect(Collectors.toList());
}
return null;
}

public <T> CompletionStage<List<T>> executeAsyncQuery(String cql, ResultHandler<T> resultHandler, Object... params) {
if (LOG.isDebugEnabled()) {
LOG.debug("Executing CQL: {}", cql);
LOG.debug("CQL parameters: {}", Arrays.toString(params));
}
final BoundStatement stmt = cqlSession.prepare(cql).bind(params);
final CompletionStage<AsyncResultSet> resultSet = cqlSession.executeAsync(stmt);
healthChecker.health();
if (resultHandler != null) {
return resultSet.thenApply(s -> StreamSupport.stream(s.currentPage().spliterator(), false)
.map(resultHandler::handle).collect(Collectors.toList()));
}
return null;
}

public void execute(String cql) {
if (LOG.isDebugEnabled()) {
LOG.debug("Executing CQL: {}", cql);
}
cqlSession.execute(cql);
healthChecker.health();
}

public void registerChecker(HealthChecker healthChecker) {
this.healthChecker.register(healthChecker);
}

@Override
public void connect() throws Exception {
AuthProvider authProvider = null;
if (StringUtil.isNotEmpty(config.getUsername())) {
authProvider = new ProgrammaticPlainTextAuthProvider(config.getUsername(), config.getPassword());
}
this.cqlSession = SessionBuilder.buildSession(config.getContactPoints(),
config.getLocalDc(),
poolingOptions(),
authProvider,
config.getUseSsl());

// create keyspace if needs
final String keyspace = config.getKeyspace();
KeyspaceMetadata keyspaceMetadata = this.cqlSession.getMetadata().getKeyspace(keyspace).orElse(null);
if (keyspaceMetadata == null) {
String createKeyspaceCql = String.format(
"CREATE KEYSPACE IF NOT EXISTS %s " +
"WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'} " +
"AND durable_writes = false;",
keyspace);
this.cqlSession.execute(createKeyspaceCql);
}

this.cqlSession.execute("USE " + keyspace);
}

@Override
public void shutdown() throws IOException {
}

private Map<DriverOption, Integer> poolingOptions() {
Map<DriverOption, Integer> result = new LinkedHashMap<>();
result.put(CONNECTION_POOL_LOCAL_SIZE, config.getMaxConnections());
result.put(CONNECTION_MAX_REQUESTS, 40960 / config.getMaxConnections());
return result;
}

@FunctionalInterface
public interface ResultHandler<T> {
T handle(Row resultSet);
}
}
Loading

0 comments on commit 04feb90

Please sign in to comment.