From 7a47c2b80e03dca7bd6efe8fc411c373d9df6de9 Mon Sep 17 00:00:00 2001 From: Mrproliu <741550557@qq.com> Date: Wed, 1 Nov 2023 10:12:12 +0800 Subject: [PATCH] Enhance the performance and storage spze of the cassandra storage --- .../storage/cassandra/CassandraClient.java | 35 +++- .../cassandra/CassandraTableInstaller.java | 14 ++ .../cassandra/dao/CassandraBatchDAO.java | 30 ++-- .../cassandra/dao/CassandraCqlExecutor.java | 13 +- .../dao/CassandraTableExtension.java | 2 +- .../dao/CassandraZipkinQueryDAO.java | 155 +++++------------- .../dao/executor/BaseQueryExecutor.java | 146 +++++++++++++++++ .../executor/MultipleTraceQueryExecutor.java | 43 +++++ .../RemoteServiceNameQueryExecutor.java | 40 +++++ .../executor/ServiceNameQueryExecutor.java | 38 +++++ .../executor/SingleTraceQueryExecutor.java | 39 +++++ .../dao/executor/SpanNameQueryExecutor.java | 40 +++++ .../TraceIDByAnnotationQueryExecutor.java | 41 +++++ .../TraceIDByServiceQueryExecutor.java | 90 ++++++++++ .../src/main/resources/zipkin-schemas.cql | 5 - 15 files changed, 582 insertions(+), 149 deletions(-) create mode 100644 zipkin-server/storage-cassandra/src/main/java/zipkin/server/storage/cassandra/dao/executor/BaseQueryExecutor.java create mode 100644 zipkin-server/storage-cassandra/src/main/java/zipkin/server/storage/cassandra/dao/executor/MultipleTraceQueryExecutor.java create mode 100644 zipkin-server/storage-cassandra/src/main/java/zipkin/server/storage/cassandra/dao/executor/RemoteServiceNameQueryExecutor.java create mode 100644 zipkin-server/storage-cassandra/src/main/java/zipkin/server/storage/cassandra/dao/executor/ServiceNameQueryExecutor.java create mode 100644 zipkin-server/storage-cassandra/src/main/java/zipkin/server/storage/cassandra/dao/executor/SingleTraceQueryExecutor.java create mode 100644 zipkin-server/storage-cassandra/src/main/java/zipkin/server/storage/cassandra/dao/executor/SpanNameQueryExecutor.java create mode 100644 zipkin-server/storage-cassandra/src/main/java/zipkin/server/storage/cassandra/dao/executor/TraceIDByAnnotationQueryExecutor.java create mode 100644 zipkin-server/storage-cassandra/src/main/java/zipkin/server/storage/cassandra/dao/executor/TraceIDByServiceQueryExecutor.java diff --git a/zipkin-server/storage-cassandra/src/main/java/zipkin/server/storage/cassandra/CassandraClient.java b/zipkin-server/storage-cassandra/src/main/java/zipkin/server/storage/cassandra/CassandraClient.java index 11f7c881611..6cc2cd1738d 100644 --- a/zipkin-server/storage-cassandra/src/main/java/zipkin/server/storage/cassandra/CassandraClient.java +++ b/zipkin-server/storage-cassandra/src/main/java/zipkin/server/storage/cassandra/CassandraClient.java @@ -20,8 +20,10 @@ import com.datastax.oss.driver.api.core.config.DriverOption; import com.datastax.oss.driver.api.core.cql.AsyncResultSet; import com.datastax.oss.driver.api.core.cql.BoundStatement; +import com.datastax.oss.driver.api.core.cql.PreparedStatement; import com.datastax.oss.driver.api.core.cql.ResultSet; import com.datastax.oss.driver.api.core.cql.Row; +import com.datastax.oss.driver.api.core.cql.Statement; import com.datastax.oss.driver.api.core.metadata.schema.KeyspaceMetadata; import com.datastax.oss.driver.api.core.metadata.schema.TableMetadata; import com.datastax.oss.driver.internal.core.auth.ProgrammaticPlainTextAuthProvider; @@ -77,11 +79,23 @@ public int getDefaultTtl(String table) { } public List executeQuery(String cql, ResultHandler resultHandler, Object... params) { + return executeQuery(cqlSession.prepare(cql), resultHandler, params); + } + + public CompletionStage> executeAsyncQuery(String cql, ResultHandler resultHandler, Object... params) { + return executeAsyncQuery(cqlSession.prepare(cql), resultHandler, params); + } + + public PreparedStatement prepare(String cql) { + return cqlSession.prepare(cql); + } + + public List executeQuery(PreparedStatement statement, ResultHandler resultHandler, Object... params) { if (LOG.isDebugEnabled()) { - LOG.debug("Executing CQL: {}", cql); + LOG.debug("Executing CQL: {}", statement.getQuery()); LOG.debug("CQL parameters: {}", Arrays.toString(params)); } - final BoundStatement stmt = cqlSession.prepare(cql).bind(params); + final BoundStatement stmt = statement.bind(params); final ResultSet resultSet = cqlSession.execute(stmt); healthChecker.health(); if (resultHandler != null) { @@ -91,12 +105,23 @@ public List executeQuery(String cql, ResultHandler resultHandler, Obje return null; } - public CompletionStage> executeAsyncQuery(String cql, ResultHandler resultHandler, Object... params) { + public CompletionStage> executeAsyncQuery(PreparedStatement statement, ResultHandler resultHandler, Object... params) { if (LOG.isDebugEnabled()) { - LOG.debug("Executing CQL: {}", cql); + LOG.debug("Executing CQL: {}", statement.getQuery()); LOG.debug("CQL parameters: {}", Arrays.toString(params)); } - final BoundStatement stmt = cqlSession.prepare(cql).bind(params); + final BoundStatement stmt = statement.bind(params); + return executeAsyncQuery0(stmt, resultHandler); + } + + public CompletionStage> executeAsyncQueryWithCustomBind(PreparedStatement original, Statement statement, ResultHandler resultHandler) { + if (LOG.isDebugEnabled()) { + LOG.debug("Executing Custom Bind CQL: {}", original.getQuery()); + } + return executeAsyncQuery0(statement, resultHandler); + } + + private CompletionStage> executeAsyncQuery0(Statement stmt, ResultHandler resultHandler) { final CompletionStage resultSet = cqlSession.executeAsync(stmt); healthChecker.health(); if (resultHandler != null) { diff --git a/zipkin-server/storage-cassandra/src/main/java/zipkin/server/storage/cassandra/CassandraTableInstaller.java b/zipkin-server/storage-cassandra/src/main/java/zipkin/server/storage/cassandra/CassandraTableInstaller.java index 3d64bcfc78a..a3ade9597d2 100644 --- a/zipkin-server/storage-cassandra/src/main/java/zipkin/server/storage/cassandra/CassandraTableInstaller.java +++ b/zipkin-server/storage-cassandra/src/main/java/zipkin/server/storage/cassandra/CassandraTableInstaller.java @@ -14,10 +14,13 @@ package zipkin.server.storage.cassandra; +import org.apache.skywalking.oap.server.core.storage.StorageData; import org.apache.skywalking.oap.server.core.storage.StorageException; import org.apache.skywalking.oap.server.core.storage.model.Model; import org.apache.skywalking.oap.server.core.storage.model.ModelCreator; +import org.apache.skywalking.oap.server.core.zipkin.ZipkinSpanRecord; import org.apache.skywalking.oap.server.storage.plugin.jdbc.TableMetaInfo; +import org.apache.skywalking.oap.server.storage.plugin.jdbc.common.JDBCTableInstaller; public class CassandraTableInstaller implements ModelCreator.CreatingListener { private final CassandraClient client; @@ -43,6 +46,17 @@ public void start() { @Override public void whenCreating(Model model) throws StorageException { + if (ZipkinSpanRecord.INDEX_NAME.equals(model.getName())) { + // remove unnecessary columns + for (int i = model.getColumns().size() - 1; i >= 0; i--) { + final String columnName = model.getColumns().get(i).getColumnName().getStorageName(); + if (StorageData.TIME_BUCKET.equals(columnName) || + ZipkinSpanRecord.TIMESTAMP_MILLIS.equals(columnName) || + JDBCTableInstaller.TABLE_COLUMN.equals(columnName)) { + model.getColumns().remove(i); + } + } + } TableMetaInfo.addModel(model); } } diff --git a/zipkin-server/storage-cassandra/src/main/java/zipkin/server/storage/cassandra/dao/CassandraBatchDAO.java b/zipkin-server/storage-cassandra/src/main/java/zipkin/server/storage/cassandra/dao/CassandraBatchDAO.java index c82216fe269..45d5d4ab8ba 100644 --- a/zipkin-server/storage-cassandra/src/main/java/zipkin/server/storage/cassandra/dao/CassandraBatchDAO.java +++ b/zipkin-server/storage-cassandra/src/main/java/zipkin/server/storage/cassandra/dao/CassandraBatchDAO.java @@ -14,6 +14,7 @@ package zipkin.server.storage.cassandra.dao; import com.datastax.oss.driver.api.core.cql.BoundStatement; +import com.datastax.oss.driver.api.core.cql.PreparedStatement; import org.apache.skywalking.oap.server.core.storage.IBatchDAO; import org.apache.skywalking.oap.server.library.client.request.InsertRequest; import org.apache.skywalking.oap.server.library.client.request.PrepareRequest; @@ -29,6 +30,7 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; public class CassandraBatchDAO implements IBatchDAO { static final Logger LOG = LoggerFactory.getLogger(CassandraBatchDAO.class); @@ -37,6 +39,8 @@ public class CassandraBatchDAO implements IBatchDAO { private final DataCarrier dataCarrier; private final int maxBatchSqlSize; + private static final ConcurrentHashMap PREPARED_STATEMENT_MAP = new ConcurrentHashMap<>(); + public CassandraBatchDAO(CassandraClient client, int maxBatchCqlSize, int asyncBatchPersistentPoolSize) { this.client = client; String name = "CASSANDRA_ASYNCHRONOUS_BATCH_PERSISTENT"; @@ -76,7 +80,7 @@ public CompletableFuture flush(List prepareRequests) { } final long start = System.currentTimeMillis(); - boolean success = true; + final boolean isInsert = cqls.get(0) instanceof InsertRequest; try { for (PrepareRequest cql : cqls) { final CQLExecutor executor = (CQLExecutor) cql; @@ -85,28 +89,26 @@ public CompletableFuture flush(List prepareRequests) { LOG.debug("CQL parameters: {}", executor.getParams()); } - final BoundStatement stmt = client.getSession().prepare(executor.getCql()) - .bind(((CQLExecutor) cql).getParams().toArray()); - client.getSession().execute(stmt); + final PreparedStatement stmt = PREPARED_STATEMENT_MAP.computeIfAbsent(executor.getCql(), e -> client.getSession().prepare(e)); + + final BoundStatement boundStatement = stmt.bind(((CQLExecutor) cql).getParams().toArray()); + client.getSession().executeAsync(boundStatement).thenAccept(result -> { + if (isInsert) { + ((InsertRequest) executor).onInsertCompleted(); + } else if (!result.wasApplied()) { + ((UpdateRequest) executor).onUpdateFailure(); + }; + }); } } catch (Exception e) { // Just to avoid one execution failure makes the rest of batch failure. LOG.error(e.getMessage(), e); - success = false; } - final boolean isInsert = cqls.get(0) instanceof InsertRequest; - for (PrepareRequest executor : cqls) { - if (isInsert) { - ((InsertRequest) executor).onInsertCompleted(); - } else if (!success) { - ((UpdateRequest) executor).onUpdateFailure(); - } - } if (LOG.isDebugEnabled()) { long end = System.currentTimeMillis(); long cost = end - start; - LOG.debug("execute sql statements done, data size: {}, maxBatchSqlSize: {}, cost:{}ms", prepareRequests.size(), maxBatchSqlSize, cost); + LOG.debug("execute sync sql statements done, data size: {}, maxBatchSqlSize: {}, cost:{}ms", prepareRequests.size(), maxBatchSqlSize, cost); } return CompletableFuture.completedFuture(null); } diff --git a/zipkin-server/storage-cassandra/src/main/java/zipkin/server/storage/cassandra/dao/CassandraCqlExecutor.java b/zipkin-server/storage-cassandra/src/main/java/zipkin/server/storage/cassandra/dao/CassandraCqlExecutor.java index 2ff081b5dce..041a167677c 100644 --- a/zipkin-server/storage-cassandra/src/main/java/zipkin/server/storage/cassandra/dao/CassandraCqlExecutor.java +++ b/zipkin-server/storage-cassandra/src/main/java/zipkin/server/storage/cassandra/dao/CassandraCqlExecutor.java @@ -14,7 +14,6 @@ package zipkin.server.storage.cassandra.dao; -import com.datastax.oss.driver.api.core.CqlIdentifier; import com.datastax.oss.driver.api.core.cql.Row; import org.apache.skywalking.oap.server.core.storage.SessionCacheCallback; import org.apache.skywalking.oap.server.core.storage.StorageData; @@ -35,6 +34,8 @@ import zipkin.server.storage.cassandra.CassandraClient; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -137,10 +138,6 @@ private List buildAdditionalInsertExecutor( List param = new ArrayList<>(); final StringBuilder cqlBuilder = new StringBuilder("INSERT INTO ").append(tableName); - columnNames.add(ID_COLUMN); - values.add("?"); - param.add(TableHelper.generateId(model, metrics.id().build())); - int position = 0; List valueList = new ArrayList(); for (int i = 0; i < columns.size(); i++) { @@ -172,7 +169,7 @@ private List buildAdditionalInsertExecutor( if (!CollectionUtils.isEmpty(valueList)) { for (Object object : valueList) { List paramCopy = new ArrayList<>(param); - paramCopy.set(position, object); + paramCopy.set(position - 1, object); sqlExecutors.add(new CQLExecutor(sql, paramCopy, callback, null)); } } else { @@ -196,7 +193,7 @@ private CQLExecutor buildInsertExecutor(Model model, final List columns = model.getColumns(); final List columnNames = Stream.concat( - Stream.of(ID_COLUMN, JDBCTableInstaller.TABLE_COLUMN), + (model.isRecord() ? Collections.emptyList() : Arrays.asList(ID_COLUMN, JDBCTableInstaller.TABLE_COLUMN)).stream(), columns .stream() .map(ModelColumn::getColumnName) @@ -210,7 +207,7 @@ private CQLExecutor buildInsertExecutor(Model model, cqlBuilder.append(columnNames.stream().map(it -> "?").collect(Collectors.joining(",", "(", ")"))); final List params = Stream.concat( - Stream.of(TableHelper.generateId(model, metrics.id().build()), model.getName()), + (model.isRecord() ? Collections.emptyList() : Arrays.asList(TableHelper.generateId(model, metrics.id().build()), model.getName())).stream(), columns .stream() .map(ModelColumn::getColumnName) diff --git a/zipkin-server/storage-cassandra/src/main/java/zipkin/server/storage/cassandra/dao/CassandraTableExtension.java b/zipkin-server/storage-cassandra/src/main/java/zipkin/server/storage/cassandra/dao/CassandraTableExtension.java index eec4d8afa63..e398da98200 100644 --- a/zipkin-server/storage-cassandra/src/main/java/zipkin/server/storage/cassandra/dao/CassandraTableExtension.java +++ b/zipkin-server/storage-cassandra/src/main/java/zipkin/server/storage/cassandra/dao/CassandraTableExtension.java @@ -65,7 +65,7 @@ public static List buildExtensionsForSpan(ZipkinSpanRecord span, Se public static int durationIndexBucket(long ts_milli) { // if the window constant has microsecond precision, the division produces negative getValues - return (int) (ts_milli / (DURATION_INDEX_BUCKET_WINDOW_SECONDS)); + return (int) (ts_milli / (DURATION_INDEX_BUCKET_WINDOW_SECONDS)) / 1000; } private static CQLExecutor buildServiceSpan(String service, String span, int bucket, UUID ts, String trace_id, long durationMillis, diff --git a/zipkin-server/storage-cassandra/src/main/java/zipkin/server/storage/cassandra/dao/CassandraZipkinQueryDAO.java b/zipkin-server/storage-cassandra/src/main/java/zipkin/server/storage/cassandra/dao/CassandraZipkinQueryDAO.java index ce041621e48..f27018aa2a7 100644 --- a/zipkin-server/storage-cassandra/src/main/java/zipkin/server/storage/cassandra/dao/CassandraZipkinQueryDAO.java +++ b/zipkin-server/storage-cassandra/src/main/java/zipkin/server/storage/cassandra/dao/CassandraZipkinQueryDAO.java @@ -14,22 +14,21 @@ package zipkin.server.storage.cassandra.dao; -import com.datastax.oss.driver.api.core.cql.Row; import com.datastax.oss.driver.api.core.uuid.Uuids; -import com.google.gson.Gson; -import com.google.gson.JsonElement; -import com.google.gson.JsonObject; import org.apache.skywalking.oap.server.core.query.input.Duration; import org.apache.skywalking.oap.server.core.storage.query.IZipkinQueryDAO; -import org.apache.skywalking.oap.server.core.zipkin.ZipkinServiceRelationTraffic; -import org.apache.skywalking.oap.server.core.zipkin.ZipkinServiceSpanTraffic; -import org.apache.skywalking.oap.server.core.zipkin.ZipkinServiceTraffic; import org.apache.skywalking.oap.server.core.zipkin.ZipkinSpanRecord; import org.apache.skywalking.oap.server.library.util.CollectionUtils; import org.apache.skywalking.oap.server.library.util.StringUtil; import zipkin.server.storage.cassandra.CassandraClient; import zipkin.server.storage.cassandra.CassandraTableHelper; -import zipkin2.Endpoint; +import zipkin.server.storage.cassandra.dao.executor.MultipleTraceQueryExecutor; +import zipkin.server.storage.cassandra.dao.executor.RemoteServiceNameQueryExecutor; +import zipkin.server.storage.cassandra.dao.executor.ServiceNameQueryExecutor; +import zipkin.server.storage.cassandra.dao.executor.SingleTraceQueryExecutor; +import zipkin.server.storage.cassandra.dao.executor.SpanNameQueryExecutor; +import zipkin.server.storage.cassandra.dao.executor.TraceIDByAnnotationQueryExecutor; +import zipkin.server.storage.cassandra.dao.executor.TraceIDByServiceQueryExecutor; import zipkin2.Span; import zipkin2.storage.QueryRequest; @@ -51,16 +50,29 @@ import static zipkin.server.storage.cassandra.dao.CassandraTableExtension.durationIndexBucket; public class CassandraZipkinQueryDAO implements IZipkinQueryDAO { - private final static int NAME_QUERY_MAX_SIZE = Integer.MAX_VALUE; - private static final Gson GSON = new Gson(); - private final CassandraClient client; private final CassandraTableHelper tableHelper; private int indexTtl; + private final ServiceNameQueryExecutor serviceNameQueryExecutor; + private final RemoteServiceNameQueryExecutor remoteServiceNameQueryExecutor; + private final SpanNameQueryExecutor spanNameQueryExecutor; + private final SingleTraceQueryExecutor singleTraceQueryExecutor; + private final MultipleTraceQueryExecutor multipleTraceQueryExecutor; + private final TraceIDByAnnotationQueryExecutor traceIDByAnnotationQueryExecutor; + private final TraceIDByServiceQueryExecutor traceIDByServiceQueryExecutor; + public CassandraZipkinQueryDAO(CassandraClient client, CassandraTableHelper tableHelper) { this.client = client; this.tableHelper = tableHelper; + + this.serviceNameQueryExecutor = new ServiceNameQueryExecutor(client, tableHelper); + this.remoteServiceNameQueryExecutor = new RemoteServiceNameQueryExecutor(client, tableHelper); + this.spanNameQueryExecutor = new SpanNameQueryExecutor(client, tableHelper); + this.singleTraceQueryExecutor = new SingleTraceQueryExecutor(client, tableHelper); + this.multipleTraceQueryExecutor = new MultipleTraceQueryExecutor(client, tableHelper); + this.traceIDByAnnotationQueryExecutor = new TraceIDByAnnotationQueryExecutor(client, tableHelper); + this.traceIDByServiceQueryExecutor = new TraceIDByServiceQueryExecutor(client, tableHelper); } private int getIndexTtl() { @@ -73,37 +85,22 @@ private int getIndexTtl() { @Override public List getServiceNames() throws IOException { - return client.executeQuery("select " + ZipkinServiceTraffic.SERVICE_NAME + " from " + - tableHelper.getTableForRead(ZipkinServiceTraffic.INDEX_NAME) + " limit " + NAME_QUERY_MAX_SIZE, - row -> row.getString(ZipkinServiceTraffic.SERVICE_NAME)); + return serviceNameQueryExecutor.get(); } @Override public List getRemoteServiceNames(String serviceName) throws IOException { - return client.executeQuery("select " + ZipkinServiceRelationTraffic.REMOTE_SERVICE_NAME + - " from " + tableHelper.getTableForRead(ZipkinServiceRelationTraffic.INDEX_NAME) + - " where " + ZipkinServiceRelationTraffic.SERVICE_NAME + " = ?" + - " limit " + NAME_QUERY_MAX_SIZE, - row -> row.getString(ZipkinServiceRelationTraffic.REMOTE_SERVICE_NAME), - serviceName); + return remoteServiceNameQueryExecutor.get(serviceName); } @Override public List getSpanNames(String serviceName) throws IOException { - return client.executeQuery("select " + ZipkinServiceSpanTraffic.SPAN_NAME + - " from " + tableHelper.getTableForRead(ZipkinServiceSpanTraffic.INDEX_NAME) + - " where " + ZipkinServiceSpanTraffic.SERVICE_NAME + " = ?" + - " limit " + NAME_QUERY_MAX_SIZE, - row -> row.getString(ZipkinServiceSpanTraffic.SPAN_NAME), - serviceName); + return spanNameQueryExecutor.get(serviceName); } @Override public List getTrace(String traceId) { - return client.executeQuery("select * from " + tableHelper.getTableForRead(ZipkinSpanRecord.INDEX_NAME) + - " where " + ZipkinSpanRecord.TRACE_ID + " = ?" + - " limit " + NAME_QUERY_MAX_SIZE, - this::buildSpan, traceId); + return singleTraceQueryExecutor.get(traceId); } @Override @@ -111,14 +108,10 @@ public List> getTraces(QueryRequest request, Duration duration) throw List>> completionTraceIds = new ArrayList<>(); if (CollectionUtils.isNotEmpty(request.annotationQuery())) { for (Map.Entry entry : request.annotationQuery().entrySet()) { - completionTraceIds.add(client.executeAsyncQuery("select " + ZipkinSpanRecord.TRACE_ID + - " from " + ZipkinSpanRecord.ADDITIONAL_QUERY_TABLE + - " where " + ZipkinSpanRecord.QUERY + " = ?" + - " and " + ZipkinSpanRecord.TIME_BUCKET + " >= ?" + - " and " + ZipkinSpanRecord.TIME_BUCKET + " <= ?", - row -> row.getString(ZipkinSpanRecord.TRACE_ID), + completionTraceIds.add(traceIDByAnnotationQueryExecutor.asyncGet( entry.getValue().isEmpty() ? entry.getKey() : entry.getKey() + "=" + entry.getValue(), - duration.getStartTimeBucket(), duration.getEndTimeBucket())); + duration.getStartTimeBucket(), duration.getEndTimeBucket() + )); } } @@ -156,31 +149,26 @@ private CompletionStage> newBucketedTraceIdCall(QueryRequest reques end_duration = maxDuration != null ? maxDuration / 1000L : Long.MAX_VALUE; } - String traceByServiceSpanBaseCql = "select trace_id from " + CassandraTableExtension.TABLE_TRACE_BY_SERVICE_SPAN - + " where service=? and span=? and bucket=? and ts>=? and ts<=?"; // each service names for (String serviceName : serviceNames) { for (int bucket = endBucket; bucket >= startBucket; bucket--) { boolean addSpanQuery = true; if (remoteService != null) { - result.add(client.executeAsyncQuery("select trace_id from " + CassandraTableExtension.TABLE_TRACE_BY_SERVICE_REMOTE_SERVICE - + " where service=? and remote_service=? and bucket=? and ts>=? and ts<=?", - resultSet -> resultSet.getString(0), - serviceName, remoteService, bucket, timestampRange.startUUID, timestampRange.endUUID)); + result.add(traceIDByServiceQueryExecutor.asyncWithRemoteService( + serviceName, remoteService, bucket, timestampRange.startUUID, timestampRange.endUUID) + ); // If the remote service query can satisfy the request, don't make a redundant span query addSpanQuery = !spanName.isEmpty() || minDuration != null; } if (!addSpanQuery) continue; if (start_duration != null) { - result.add(client.executeAsyncQuery(traceByServiceSpanBaseCql + " and duration>=? and duration<=?", - resultSet -> resultSet.getString(0), - serviceName, spanName, bucket, timestampRange.startUUID, timestampRange.endUUID, start_duration, end_duration) - ); + result.add(traceIDByServiceQueryExecutor.asyncWithSpanAndDuration( + serviceName, spanName, bucket, timestampRange.startUUID, timestampRange.endUUID, start_duration, end_duration, + request.limit())); } else { - result.add(client.executeAsyncQuery(traceByServiceSpanBaseCql, - resultSet -> resultSet.getString(0), - serviceName, spanName, bucket, timestampRange.startUUID, timestampRange.endUUID)); + result.add(traceIDByServiceQueryExecutor.asyncWithSpan( + serviceName, spanName, bucket, timestampRange.startUUID, timestampRange.endUUID, request.limit())); } } } @@ -208,74 +196,9 @@ private Set retainTraceIdList(List>> comple @Override public List> getTraces(Set traceIds) throws IOException { - if (CollectionUtils.isEmpty(traceIds)) { - return Collections.emptyList(); - } - - String table = tableHelper.getTableForRead(ZipkinSpanRecord.INDEX_NAME); - return traceIds.stream().map(traceId -> - client.executeAsyncQuery("select * from " + table + " where " + ZipkinSpanRecord.TRACE_ID + " = ?", this::buildSpan, traceId) - ).map(CompletionStage::toCompletableFuture).map(CompletableFuture::join).collect(toList()); + return multipleTraceQueryExecutor.get(traceIds); } - private Span buildSpan(Row row) { - Span.Builder span = Span.newBuilder(); - span.traceId(row.getString(ZipkinSpanRecord.TRACE_ID)); - span.id(row.getString(ZipkinSpanRecord.SPAN_ID)); - span.parentId(row.getString(ZipkinSpanRecord.PARENT_ID)); - String kind = row.getString(ZipkinSpanRecord.KIND); - if (!StringUtil.isEmpty(kind)) { - span.kind(Span.Kind.valueOf(kind)); - } - span.timestamp(row.getLong(ZipkinSpanRecord.TIMESTAMP)); - span.duration(row.getLong(ZipkinSpanRecord.DURATION)); - span.name(row.getString(ZipkinSpanRecord.NAME)); - - if (row.getInt(ZipkinSpanRecord.DEBUG) > 0) { - span.debug(Boolean.TRUE); - } - if (row.getInt(ZipkinSpanRecord.SHARED) > 0) { - span.shared(Boolean.TRUE); - } - //Build localEndpoint - Endpoint.Builder localEndpoint = Endpoint.newBuilder(); - localEndpoint.serviceName(row.getString(ZipkinSpanRecord.LOCAL_ENDPOINT_SERVICE_NAME)); - if (!StringUtil.isEmpty(row.getString(ZipkinSpanRecord.LOCAL_ENDPOINT_IPV4))) { - localEndpoint.parseIp(row.getString(ZipkinSpanRecord.LOCAL_ENDPOINT_IPV4)); - } else { - localEndpoint.parseIp(row.getString(ZipkinSpanRecord.LOCAL_ENDPOINT_IPV6)); - } - localEndpoint.port(row.getInt(ZipkinSpanRecord.LOCAL_ENDPOINT_PORT)); - span.localEndpoint(localEndpoint.build()); - //Build remoteEndpoint - Endpoint.Builder remoteEndpoint = Endpoint.newBuilder(); - remoteEndpoint.serviceName(row.getString(ZipkinSpanRecord.REMOTE_ENDPOINT_SERVICE_NAME)); - if (!StringUtil.isEmpty(row.getString(ZipkinSpanRecord.REMOTE_ENDPOINT_IPV4))) { - remoteEndpoint.parseIp(row.getString(ZipkinSpanRecord.REMOTE_ENDPOINT_IPV4)); - } else { - remoteEndpoint.parseIp(row.getString(ZipkinSpanRecord.REMOTE_ENDPOINT_IPV6)); - } - remoteEndpoint.port(row.getInt(ZipkinSpanRecord.REMOTE_ENDPOINT_PORT)); - span.remoteEndpoint(remoteEndpoint.build()); - - //Build tags - String tagsString = row.getString(ZipkinSpanRecord.TAGS); - if (!StringUtil.isEmpty(tagsString)) { - JsonObject tagsJson = GSON.fromJson(tagsString, JsonObject.class); - for (Map.Entry tag : tagsJson.entrySet()) { - span.putTag(tag.getKey(), tag.getValue().getAsString()); - } - } - //Build annotation - String annotationString = row.getString(ZipkinSpanRecord.ANNOTATIONS); - if (!StringUtil.isEmpty(annotationString)) { - JsonObject annotationJson = GSON.fromJson(annotationString, JsonObject.class); - for (Map.Entry annotation : annotationJson.entrySet()) { - span.addAnnotation(Long.parseLong(annotation.getKey()), annotation.getValue().getAsString()); - } - } - return span.build(); - } static final class TimestampRange { long startMillis; diff --git a/zipkin-server/storage-cassandra/src/main/java/zipkin/server/storage/cassandra/dao/executor/BaseQueryExecutor.java b/zipkin-server/storage-cassandra/src/main/java/zipkin/server/storage/cassandra/dao/executor/BaseQueryExecutor.java new file mode 100644 index 00000000000..e9ff550a857 --- /dev/null +++ b/zipkin-server/storage-cassandra/src/main/java/zipkin/server/storage/cassandra/dao/executor/BaseQueryExecutor.java @@ -0,0 +1,146 @@ +/* + * Copyright 2015-2023 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package zipkin.server.storage.cassandra.dao.executor; + +import com.datastax.oss.driver.api.core.cql.PreparedStatement; +import com.datastax.oss.driver.api.core.cql.Row; +import com.datastax.oss.driver.api.core.cql.Statement; +import com.google.gson.Gson; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import org.apache.skywalking.oap.server.core.zipkin.ZipkinSpanRecord; +import org.apache.skywalking.oap.server.library.util.StringUtil; +import zipkin.server.storage.cassandra.CassandraClient; +import zipkin.server.storage.cassandra.CassandraTableHelper; +import zipkin2.Endpoint; +import zipkin2.Span; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletionStage; +import java.util.function.Function; +import java.util.function.Supplier; + +public abstract class BaseQueryExecutor { + public static final int NAME_QUERY_MAX_SIZE = Integer.MAX_VALUE; + private static final Gson GSON = new Gson(); + + private final CassandraClient client; + + public BaseQueryExecutor(CassandraClient client, CassandraTableHelper tableHelper) { + this.client = client; + } + + protected Query buildQuery(Supplier cql, CassandraClient.ResultHandler handler) { + return new Query<>(client, cql, handler); + } + + protected List executeSync(Query query, Object... params) { + return this.client.executeQuery(query.getStatement(), query.handler, params); + } + + public CompletionStage> executeAsync(Query query, Object... params) { + return this.client.executeAsyncQuery(query.getStatement(), query.handler, params); + } + + public CompletionStage> executeAsyncWithCustomizeStatement(Query query, Function statement) { + final PreparedStatement original = query.getStatement(); + final Statement stmt = statement.apply(original); + return this.client.executeAsyncQueryWithCustomBind(original, stmt, query.handler); + } + + protected Span buildSpan(Row row) { + Span.Builder span = Span.newBuilder(); + span.traceId(row.getString(ZipkinSpanRecord.TRACE_ID)); + span.id(row.getString(ZipkinSpanRecord.SPAN_ID)); + span.parentId(row.getString(ZipkinSpanRecord.PARENT_ID)); + String kind = row.getString(ZipkinSpanRecord.KIND); + if (!StringUtil.isEmpty(kind)) { + span.kind(Span.Kind.valueOf(kind)); + } + span.timestamp(row.getLong(ZipkinSpanRecord.TIMESTAMP)); + span.duration(row.getLong(ZipkinSpanRecord.DURATION)); + span.name(row.getString(ZipkinSpanRecord.NAME)); + + if (row.getInt(ZipkinSpanRecord.DEBUG) > 0) { + span.debug(Boolean.TRUE); + } + if (row.getInt(ZipkinSpanRecord.SHARED) > 0) { + span.shared(Boolean.TRUE); + } + //Build localEndpoint + Endpoint.Builder localEndpoint = Endpoint.newBuilder(); + localEndpoint.serviceName(row.getString(ZipkinSpanRecord.LOCAL_ENDPOINT_SERVICE_NAME)); + if (!StringUtil.isEmpty(row.getString(ZipkinSpanRecord.LOCAL_ENDPOINT_IPV4))) { + localEndpoint.parseIp(row.getString(ZipkinSpanRecord.LOCAL_ENDPOINT_IPV4)); + } else { + localEndpoint.parseIp(row.getString(ZipkinSpanRecord.LOCAL_ENDPOINT_IPV6)); + } + localEndpoint.port(row.getInt(ZipkinSpanRecord.LOCAL_ENDPOINT_PORT)); + span.localEndpoint(localEndpoint.build()); + //Build remoteEndpoint + Endpoint.Builder remoteEndpoint = Endpoint.newBuilder(); + remoteEndpoint.serviceName(row.getString(ZipkinSpanRecord.REMOTE_ENDPOINT_SERVICE_NAME)); + if (!StringUtil.isEmpty(row.getString(ZipkinSpanRecord.REMOTE_ENDPOINT_IPV4))) { + remoteEndpoint.parseIp(row.getString(ZipkinSpanRecord.REMOTE_ENDPOINT_IPV4)); + } else { + remoteEndpoint.parseIp(row.getString(ZipkinSpanRecord.REMOTE_ENDPOINT_IPV6)); + } + remoteEndpoint.port(row.getInt(ZipkinSpanRecord.REMOTE_ENDPOINT_PORT)); + span.remoteEndpoint(remoteEndpoint.build()); + + //Build tags + String tagsString = row.getString(ZipkinSpanRecord.TAGS); + if (!StringUtil.isEmpty(tagsString)) { + JsonObject tagsJson = GSON.fromJson(tagsString, JsonObject.class); + for (Map.Entry tag : tagsJson.entrySet()) { + span.putTag(tag.getKey(), tag.getValue().getAsString()); + } + } + //Build annotation + String annotationString = row.getString(ZipkinSpanRecord.ANNOTATIONS); + if (!StringUtil.isEmpty(annotationString)) { + JsonObject annotationJson = GSON.fromJson(annotationString, JsonObject.class); + for (Map.Entry annotation : annotationJson.entrySet()) { + span.addAnnotation(Long.parseLong(annotation.getKey()), annotation.getValue().getAsString()); + } + } + return span.build(); + } + + + protected class Query { + private final Supplier statementSupplier; + private volatile PreparedStatement statement; + private final CassandraClient.ResultHandler handler; + + public Query(CassandraClient client, Supplier query, CassandraClient.ResultHandler handler) { + this.statementSupplier = () -> client.prepare(query.get()); + this.handler = handler; + } + + public PreparedStatement getStatement() { + if (statement == null) { + synchronized (this) { + if (statement == null) { + statement = statementSupplier.get(); + } + } + } + return statement; + } + } + +} diff --git a/zipkin-server/storage-cassandra/src/main/java/zipkin/server/storage/cassandra/dao/executor/MultipleTraceQueryExecutor.java b/zipkin-server/storage-cassandra/src/main/java/zipkin/server/storage/cassandra/dao/executor/MultipleTraceQueryExecutor.java new file mode 100644 index 00000000000..0d68ff4804c --- /dev/null +++ b/zipkin-server/storage-cassandra/src/main/java/zipkin/server/storage/cassandra/dao/executor/MultipleTraceQueryExecutor.java @@ -0,0 +1,43 @@ +/* + * Copyright 2015-2023 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package zipkin.server.storage.cassandra.dao.executor; + +import org.apache.skywalking.oap.server.core.zipkin.ZipkinSpanRecord; +import zipkin.server.storage.cassandra.CassandraClient; +import zipkin.server.storage.cassandra.CassandraTableHelper; +import zipkin2.Span; + +import java.util.List; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; + +import static java.util.stream.Collectors.toList; + +public class MultipleTraceQueryExecutor extends BaseQueryExecutor { + private final Query query; + public MultipleTraceQueryExecutor(CassandraClient client, CassandraTableHelper tableHelper) { + super(client, tableHelper); + this.query = buildQuery( + () -> "select * from " + tableHelper.getTableForRead(ZipkinSpanRecord.INDEX_NAME) + " where " + ZipkinSpanRecord.TRACE_ID + " = ?", + this::buildSpan + ); + } + + public List> get(Set traceIds) { + return traceIds.stream().map(s -> executeAsync(query, s)) + .map(CompletionStage::toCompletableFuture).map(CompletableFuture::join).collect(toList()); + } +} diff --git a/zipkin-server/storage-cassandra/src/main/java/zipkin/server/storage/cassandra/dao/executor/RemoteServiceNameQueryExecutor.java b/zipkin-server/storage-cassandra/src/main/java/zipkin/server/storage/cassandra/dao/executor/RemoteServiceNameQueryExecutor.java new file mode 100644 index 00000000000..5b80de7fa17 --- /dev/null +++ b/zipkin-server/storage-cassandra/src/main/java/zipkin/server/storage/cassandra/dao/executor/RemoteServiceNameQueryExecutor.java @@ -0,0 +1,40 @@ +/* + * Copyright 2015-2023 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package zipkin.server.storage.cassandra.dao.executor; + +import org.apache.skywalking.oap.server.core.zipkin.ZipkinServiceRelationTraffic; +import zipkin.server.storage.cassandra.CassandraClient; +import zipkin.server.storage.cassandra.CassandraTableHelper; + +import java.util.List; + +public class RemoteServiceNameQueryExecutor extends BaseQueryExecutor { + private final Query query; + + public RemoteServiceNameQueryExecutor(CassandraClient client, CassandraTableHelper tableHelper) { + super(client, tableHelper); + this.query = buildQuery( + () -> "select " + ZipkinServiceRelationTraffic.REMOTE_SERVICE_NAME + + " from " + tableHelper.getTableForRead(ZipkinServiceRelationTraffic.INDEX_NAME) + + " where " + ZipkinServiceRelationTraffic.SERVICE_NAME + " = ?" + + " limit " + NAME_QUERY_MAX_SIZE, + row -> row.getString(ZipkinServiceRelationTraffic.REMOTE_SERVICE_NAME) + ); + } + + public List get(String serviceName) { + return executeSync(query, serviceName); + } +} diff --git a/zipkin-server/storage-cassandra/src/main/java/zipkin/server/storage/cassandra/dao/executor/ServiceNameQueryExecutor.java b/zipkin-server/storage-cassandra/src/main/java/zipkin/server/storage/cassandra/dao/executor/ServiceNameQueryExecutor.java new file mode 100644 index 00000000000..00e6db80bf5 --- /dev/null +++ b/zipkin-server/storage-cassandra/src/main/java/zipkin/server/storage/cassandra/dao/executor/ServiceNameQueryExecutor.java @@ -0,0 +1,38 @@ +/* + * Copyright 2015-2023 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package zipkin.server.storage.cassandra.dao.executor; + +import org.apache.skywalking.oap.server.core.zipkin.ZipkinServiceTraffic; +import zipkin.server.storage.cassandra.CassandraClient; +import zipkin.server.storage.cassandra.CassandraTableHelper; + +import java.util.List; + +public class ServiceNameQueryExecutor extends BaseQueryExecutor { + private final Query query; + + public ServiceNameQueryExecutor(CassandraClient client, CassandraTableHelper tableHelper) { + super(client, tableHelper); + query = buildQuery( + () -> "select " + ZipkinServiceTraffic.SERVICE_NAME + " from " + + tableHelper.getTableForRead(ZipkinServiceTraffic.INDEX_NAME) + " limit " + NAME_QUERY_MAX_SIZE, + resultSet -> resultSet.getString(ZipkinServiceTraffic.SERVICE_NAME) + ); + } + + public List get() { + return executeSync(query); + } +} diff --git a/zipkin-server/storage-cassandra/src/main/java/zipkin/server/storage/cassandra/dao/executor/SingleTraceQueryExecutor.java b/zipkin-server/storage-cassandra/src/main/java/zipkin/server/storage/cassandra/dao/executor/SingleTraceQueryExecutor.java new file mode 100644 index 00000000000..52fe3b8f49d --- /dev/null +++ b/zipkin-server/storage-cassandra/src/main/java/zipkin/server/storage/cassandra/dao/executor/SingleTraceQueryExecutor.java @@ -0,0 +1,39 @@ +/* + * Copyright 2015-2023 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package zipkin.server.storage.cassandra.dao.executor; + +import org.apache.skywalking.oap.server.core.zipkin.ZipkinSpanRecord; +import zipkin.server.storage.cassandra.CassandraClient; +import zipkin.server.storage.cassandra.CassandraTableHelper; +import zipkin2.Span; + +import java.util.List; + +public class SingleTraceQueryExecutor extends BaseQueryExecutor { + private final Query query; + public SingleTraceQueryExecutor(CassandraClient client, CassandraTableHelper tableHelper) { + super(client, tableHelper); + this.query = buildQuery( + () -> "select * from " + tableHelper.getTableForRead(ZipkinSpanRecord.INDEX_NAME) + + " where " + ZipkinSpanRecord.TRACE_ID + " = ?" + + " limit " + NAME_QUERY_MAX_SIZE, + this::buildSpan + ); + } + + public List get(String traceId) { + return executeSync(query, traceId); + } +} diff --git a/zipkin-server/storage-cassandra/src/main/java/zipkin/server/storage/cassandra/dao/executor/SpanNameQueryExecutor.java b/zipkin-server/storage-cassandra/src/main/java/zipkin/server/storage/cassandra/dao/executor/SpanNameQueryExecutor.java new file mode 100644 index 00000000000..f3ed7c1d8b0 --- /dev/null +++ b/zipkin-server/storage-cassandra/src/main/java/zipkin/server/storage/cassandra/dao/executor/SpanNameQueryExecutor.java @@ -0,0 +1,40 @@ +/* + * Copyright 2015-2023 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package zipkin.server.storage.cassandra.dao.executor; + +import org.apache.skywalking.oap.server.core.zipkin.ZipkinServiceSpanTraffic; +import zipkin.server.storage.cassandra.CassandraClient; +import zipkin.server.storage.cassandra.CassandraTableHelper; + +import java.util.List; + +public class SpanNameQueryExecutor extends BaseQueryExecutor { + private final Query query; + + public SpanNameQueryExecutor(CassandraClient client, CassandraTableHelper tableHelper) { + super(client, tableHelper); + this.query = buildQuery( + () -> "select " + ZipkinServiceSpanTraffic.SPAN_NAME + + " from " + tableHelper.getTableForRead(ZipkinServiceSpanTraffic.INDEX_NAME) + + " where " + ZipkinServiceSpanTraffic.SERVICE_NAME + " = ?" + + " limit " + NAME_QUERY_MAX_SIZE, + row -> row.getString(ZipkinServiceSpanTraffic.SPAN_NAME) + ); + } + + public List get(String serviceName) { + return executeSync(query, serviceName); + } +} diff --git a/zipkin-server/storage-cassandra/src/main/java/zipkin/server/storage/cassandra/dao/executor/TraceIDByAnnotationQueryExecutor.java b/zipkin-server/storage-cassandra/src/main/java/zipkin/server/storage/cassandra/dao/executor/TraceIDByAnnotationQueryExecutor.java new file mode 100644 index 00000000000..a3cf00c0fb5 --- /dev/null +++ b/zipkin-server/storage-cassandra/src/main/java/zipkin/server/storage/cassandra/dao/executor/TraceIDByAnnotationQueryExecutor.java @@ -0,0 +1,41 @@ +/* + * Copyright 2015-2023 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package zipkin.server.storage.cassandra.dao.executor; + +import org.apache.skywalking.oap.server.core.zipkin.ZipkinSpanRecord; +import zipkin.server.storage.cassandra.CassandraClient; +import zipkin.server.storage.cassandra.CassandraTableHelper; + +import java.util.List; +import java.util.concurrent.CompletionStage; + +public class TraceIDByAnnotationQueryExecutor extends BaseQueryExecutor { + private final Query query; + public TraceIDByAnnotationQueryExecutor(CassandraClient client, CassandraTableHelper tableHelper) { + super(client, tableHelper); + this.query = buildQuery( + () -> "select " + ZipkinSpanRecord.TRACE_ID + + " from " + ZipkinSpanRecord.ADDITIONAL_QUERY_TABLE + + " where " + ZipkinSpanRecord.QUERY + " = ?" + + " and " + ZipkinSpanRecord.TIME_BUCKET + " >= ?" + + " and " + ZipkinSpanRecord.TIME_BUCKET + " <= ?", + row -> row.getString(ZipkinSpanRecord.TRACE_ID) + ); + } + + public CompletionStage> asyncGet(String query, long startTimeBucket, long endTimeBucket) { + return executeAsync(this.query, query, startTimeBucket, endTimeBucket); + } +} diff --git a/zipkin-server/storage-cassandra/src/main/java/zipkin/server/storage/cassandra/dao/executor/TraceIDByServiceQueryExecutor.java b/zipkin-server/storage-cassandra/src/main/java/zipkin/server/storage/cassandra/dao/executor/TraceIDByServiceQueryExecutor.java new file mode 100644 index 00000000000..264ff35b110 --- /dev/null +++ b/zipkin-server/storage-cassandra/src/main/java/zipkin/server/storage/cassandra/dao/executor/TraceIDByServiceQueryExecutor.java @@ -0,0 +1,90 @@ +/* + * Copyright 2015-2023 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package zipkin.server.storage.cassandra.dao.executor; + +import com.datastax.oss.driver.api.core.cql.BoundStatementBuilder; +import zipkin.server.storage.cassandra.CassandraClient; +import zipkin.server.storage.cassandra.CassandraTableHelper; +import zipkin.server.storage.cassandra.dao.CassandraTableExtension; + +import javax.annotation.CheckReturnValue; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.CompletionStage; + +public class TraceIDByServiceQueryExecutor extends BaseQueryExecutor { + private final Query queryWithRemoteService; + private final Query queryWithSpan; + private final Query queryWithSpanAndDuration; + + public TraceIDByServiceQueryExecutor(CassandraClient client, CassandraTableHelper tableHelper) { + super(client, tableHelper); + String baseCql = "select trace_id from " + CassandraTableExtension.TABLE_TRACE_BY_SERVICE_SPAN + + " where service=? and span=? and bucket=? and ts>=? and ts<=? "; + this.queryWithRemoteService = buildQuery( + () -> "select trace_id from " + CassandraTableExtension.TABLE_TRACE_BY_SERVICE_REMOTE_SERVICE + + " where service=? and remote_service=? and bucket=? and ts>=? and ts<=?", + row -> row.getString(0) + ); + this.queryWithSpan = buildQuery( + () -> baseCql + " limit ?", + row -> row.getString(0) + ); + this.queryWithSpanAndDuration = buildQuery( + () -> baseCql + " and duration>=? and duration<=? limit ?", + row -> row.getString(0) + ); + } + + public CompletionStage> asyncWithRemoteService( + String serviceName, String remoteService, int bucket, UUID tsStart, UUID tsEnd) { + return executeAsync(queryWithRemoteService, serviceName, remoteService, bucket, tsStart, tsEnd); + } + + public CompletionStage> asyncWithSpan( + String serviceName, String spanName, int bucket, UUID tsStart, UUID tsEnd, int limit) { + return asyncSpan0(queryWithSpan, serviceName, spanName, bucket, tsStart, tsEnd, null, null, limit); + } + + public CompletionStage> asyncWithSpanAndDuration( + String serviceName, String spanName, int bucket, UUID tsStart, UUID tsEnd, long durationStart, long durationEnd, int limit) { + return asyncSpan0(queryWithSpanAndDuration, serviceName, spanName, bucket, tsStart, tsEnd, durationStart, durationEnd, limit); + } + + @SuppressWarnings("CheckReturnValue") + private CompletionStage> asyncSpan0( + Query query, String serviceName, String spanName, int bucket, UUID tsStart, UUID tsEnd, Long durationStart, Long durationEnd, int limit) { + return this.executeAsyncWithCustomizeStatement(query, stmt -> { + int i = 0; + final BoundStatementBuilder bound = stmt.boundStatementBuilder() + .setString(i++, serviceName) + .setString(i++, spanName) + .setInt(i++, bucket) + .setUuid(i++, tsStart) + .setUuid(i++, tsEnd); + + if (durationStart != null) { + bound.setLong(i++, durationStart) + .setLong(i++, durationEnd); + } + + bound.setInt(i, limit) + .setPageSize(limit); + + return bound.build(); + }); + } + +} diff --git a/zipkin-server/storage-cassandra/src/main/resources/zipkin-schemas.cql b/zipkin-server/storage-cassandra/src/main/resources/zipkin-schemas.cql index 508ced3c89f..7b1a699c2db 100644 --- a/zipkin-server/storage-cassandra/src/main/resources/zipkin-schemas.cql +++ b/zipkin-server/storage-cassandra/src/main/resources/zipkin-schemas.cql @@ -3,15 +3,12 @@ CREATE KEYSPACE IF NOT EXISTS zipkin2 AND durable_writes = false; CREATE TABLE IF NOT EXISTS zipkin_span ( - id text, - table_name text, trace_id text, span_id text, parent_id text, name text, duration BIGINT, kind text, - timestamp_millis BIGINT, TIMESTAMP BIGINT, local_endpoint_service_name text, local_endpoint_ipv4 text, @@ -25,7 +22,6 @@ CREATE TABLE IF NOT EXISTS zipkin_span ( tags text, debug INT, shared INT, - time_bucket BIGINT, uuid_unique text, PRIMARY KEY(trace_id, uuid_unique) ) @@ -88,7 +84,6 @@ CREATE TABLE IF NOT EXISTS zipkin_service_traffic ( AND comment = 'Secondary table for looking up all services'; CREATE TABLE IF NOT EXISTS zipkin_query ( - id text, trace_id text, query text, time_bucket BIGINT,