diff --git a/skywalking b/skywalking index 4621e5191fc..7b01a4ab91e 160000 --- a/skywalking +++ b/skywalking @@ -1 +1 @@ -Subproject commit 4621e5191fcc2fe8271a3049b788fd33c694f703 +Subproject commit 7b01a4ab91eb91f4f7786928d831207a71393ad6 diff --git a/zipkin-server/server-starter/src/main/java/zipkin/server/ZipkinServerBootstrap.java b/zipkin-server/server-starter/src/main/java/zipkin/server/ZipkinServerBootstrap.java index add0c962f01..921263e0cf6 100644 --- a/zipkin-server/server-starter/src/main/java/zipkin/server/ZipkinServerBootstrap.java +++ b/zipkin-server/server-starter/src/main/java/zipkin/server/ZipkinServerBootstrap.java @@ -34,7 +34,7 @@ public static void start() { RunningMode.setMode(mode); ApplicationConfigLoader configLoader = new ApplicationConfigLoader(); - ModuleManager manager = new ModuleManager(); + ModuleManager manager = new ModuleManager("Zipkin Server"); try { ApplicationConfiguration applicationConfiguration = configLoader.load(); manager.init(applicationConfiguration); diff --git a/zipkin-server/storage-cassandra/src/main/java/zipkin/server/storage/cassandra/CassandraClient.java b/zipkin-server/storage-cassandra/src/main/java/zipkin/server/storage/cassandra/CassandraClient.java index 6cc2cd1738d..1a97db76b5e 100644 --- a/zipkin-server/storage-cassandra/src/main/java/zipkin/server/storage/cassandra/CassandraClient.java +++ b/zipkin-server/storage-cassandra/src/main/java/zipkin/server/storage/cassandra/CassandraClient.java @@ -52,6 +52,7 @@ public class CassandraClient implements Client { static final Logger LOG = LoggerFactory.getLogger(CassandraClient.class); public static final String RECORD_UNIQUE_UUID_COLUMN = "uuid_unique"; + public static final String ZIPKIN_SPAN_ANNOTATION_QUERY_COLUMN = "annotation_query"; private final CassandraConfig config; private final DelegatedHealthChecker healthChecker; diff --git a/zipkin-server/storage-cassandra/src/main/java/zipkin/server/storage/cassandra/dao/CassandraCqlExecutor.java b/zipkin-server/storage-cassandra/src/main/java/zipkin/server/storage/cassandra/dao/CassandraCqlExecutor.java index 041a167677c..88786026392 100644 --- a/zipkin-server/storage-cassandra/src/main/java/zipkin/server/storage/cassandra/dao/CassandraCqlExecutor.java +++ b/zipkin-server/storage-cassandra/src/main/java/zipkin/server/storage/cassandra/dao/CassandraCqlExecutor.java @@ -15,18 +15,18 @@ package zipkin.server.storage.cassandra.dao; import com.datastax.oss.driver.api.core.cql.Row; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; import org.apache.skywalking.oap.server.core.storage.SessionCacheCallback; import org.apache.skywalking.oap.server.core.storage.StorageData; import org.apache.skywalking.oap.server.core.storage.model.ColumnName; import org.apache.skywalking.oap.server.core.storage.model.Model; import org.apache.skywalking.oap.server.core.storage.model.ModelColumn; -import org.apache.skywalking.oap.server.core.storage.model.SQLDatabaseModelExtension; import org.apache.skywalking.oap.server.core.storage.type.Convert2Storage; import org.apache.skywalking.oap.server.core.storage.type.HashMapConverter; import org.apache.skywalking.oap.server.core.storage.type.StorageBuilder; import org.apache.skywalking.oap.server.core.storage.type.StorageDataComplexObject; import org.apache.skywalking.oap.server.core.zipkin.ZipkinSpanRecord; -import org.apache.skywalking.oap.server.library.util.CollectionUtils; import org.apache.skywalking.oap.server.storage.plugin.jdbc.TableMetaInfo; import org.apache.skywalking.oap.server.storage.plugin.jdbc.common.JDBCTableInstaller; import org.apache.skywalking.oap.server.storage.plugin.jdbc.common.TableHelper; @@ -44,8 +44,10 @@ import java.util.stream.Stream; import static org.apache.skywalking.oap.server.storage.plugin.jdbc.common.JDBCTableInstaller.ID_COLUMN; +import static zipkin2.internal.RecyclableBuffers.SHORT_STRING_LENGTH; public class CassandraCqlExecutor { + private static final JsonObject EMPTY_JSON_OBJECT = new JsonObject(); protected List getByIDs(CassandraClient client, String modelName, @@ -74,20 +76,6 @@ protected CQLExecutor getInsertExecutor(Model model, T me }); CQLExecutor sqlExecutor = buildInsertExecutor( model, metrics, timeBucket, mainEntity, callback); - //build additional table cql - for (final SQLDatabaseModelExtension.AdditionalTable additionalTable : model.getSqlDBModelExtension().getAdditionalTables().values()) { - Map additionalEntity = new HashMap<>(); - additionalTable.getColumns().forEach(column -> { - additionalEntity.put(column.getColumnName().getName(), objectMap.get(column.getColumnName().getName())); - }); - - List additionalSQLExecutors = buildAdditionalInsertExecutor( - model, additionalTable.getName(), additionalTable.getColumns(), metrics, - timeBucket, additionalEntity, callback - ); - sqlExecutor.appendAdditionalCQLs(additionalSQLExecutors); - } - // extension the span tables for query if (metrics instanceof ZipkinSpanRecord) { sqlExecutor.appendAdditionalCQLs(CassandraTableExtension.buildExtensionsForSpan((ZipkinSpanRecord) metrics, callback)); @@ -125,64 +113,6 @@ protected CQLExecutor getUpdateExecutor(Model model, T m return new CQLExecutor(sqlBuilder.toString(), param, callback, null); } - private List buildAdditionalInsertExecutor(Model model, String tableName, - List columns, - T metrics, - long timeBucket, - Map objectMap, - SessionCacheCallback callback) { - - List sqlExecutors = new ArrayList<>(); - List columnNames = new ArrayList<>(); - List values = new ArrayList<>(); - List param = new ArrayList<>(); - final StringBuilder cqlBuilder = new StringBuilder("INSERT INTO ").append(tableName); - - int position = 0; - List valueList = new ArrayList(); - for (int i = 0; i < columns.size(); i++) { - ModelColumn column = columns.get(i); - if (List.class.isAssignableFrom(column.getType())) { - valueList = (List) objectMap.get(column.getColumnName().getName()); - - columnNames.add(column.getColumnName().getStorageName()); - values.add("?"); - param.add(null); - - position = i + 1; - } else { - columnNames.add(column.getColumnName().getStorageName()); - values.add("?"); - - Object value = objectMap.get(column.getColumnName().getName()); - if (value instanceof StorageDataComplexObject) { - param.add(((StorageDataComplexObject) value).toStorageData()); - } else { - param.add(value); - } - } - } - - cqlBuilder.append("(").append(columnNames.stream().collect(Collectors.joining(", "))).append(")") - .append(" VALUES (").append(values.stream().collect(Collectors.joining(", "))).append(")"); - String sql = cqlBuilder.toString(); - if (!CollectionUtils.isEmpty(valueList)) { - for (Object object : valueList) { - List paramCopy = new ArrayList<>(param); - paramCopy.set(position - 1, object); - sqlExecutors.add(new CQLExecutor(sql, paramCopy, callback, null)); - } - } else { - // if not query data, then ignore the data insert - if ("zipkin_query".equals(tableName)) { - return sqlExecutors; - } - sqlExecutors.add(new CQLExecutor(sql, param, callback, null)); - } - - return sqlExecutors; - } - private CQLExecutor buildInsertExecutor(Model model, T metrics, long timeBucket, @@ -199,8 +129,9 @@ private CQLExecutor buildInsertExecutor(Model model, .map(ModelColumn::getColumnName) .map(ColumnName::getStorageName)) .collect(Collectors.toList()); - if (model.isRecord()) { + if (metrics instanceof ZipkinSpanRecord) { columnNames.add(CassandraClient.RECORD_UNIQUE_UUID_COLUMN); + columnNames.add(CassandraClient.ZIPKIN_SPAN_ANNOTATION_QUERY_COLUMN); } cqlBuilder.append(columnNames.stream().collect(Collectors.joining(",", "(", ")"))); cqlBuilder.append(" VALUES "); @@ -220,8 +151,9 @@ private CQLExecutor buildInsertExecutor(Model model, return it; })) .collect(Collectors.toList()); - if (model.isRecord()) { + if (metrics instanceof ZipkinSpanRecord) { params.add(UUID.randomUUID().toString()); + params.add(annotationQuery((ZipkinSpanRecord) metrics)); } return new CQLExecutor(cqlBuilder.toString(), params, onCompleteCallback, null); @@ -242,4 +174,35 @@ private static String getModelTables(CassandraClient client, String modelName) { final Model model = TableMetaInfo.get(modelName); return TableHelper.getTableName(model); } + + private String annotationQuery(ZipkinSpanRecord span) { + final JsonObject annotation = jsonCheck(span.getAnnotations()); + final JsonObject tags = jsonCheck(span.getTags()); + if (annotation.size() == 0 && tags.size() == 0) return null; + + char delimiter = '░'; // as very unlikely to be in the query + StringBuilder result = new StringBuilder().append(delimiter); + for (Map.Entry annotationEntry : annotation.entrySet()) { + final String annotationValue = annotationEntry.getValue().getAsString(); + if (annotationValue.length() > SHORT_STRING_LENGTH) continue; + + result.append(annotationValue).append(delimiter); + } + + for (Map.Entry tagEntry : tags.entrySet()) { + final String tagValue = tagEntry.getValue().getAsString(); + if (tagValue.length() > SHORT_STRING_LENGTH) continue; + + result.append(tagEntry.getKey()).append(delimiter); // search is possible by key alone + result.append(tagEntry.getKey()).append('=').append(tagValue).append(delimiter); + } + return result.length() == 1 ? null : result.toString(); + } + + private JsonObject jsonCheck(JsonObject json) { + if (json == null) { + json = EMPTY_JSON_OBJECT; + } + return json; + } } diff --git a/zipkin-server/storage-cassandra/src/main/java/zipkin/server/storage/cassandra/dao/CassandraZipkinQueryDAO.java b/zipkin-server/storage-cassandra/src/main/java/zipkin/server/storage/cassandra/dao/CassandraZipkinQueryDAO.java index f27018aa2a7..a0714a13e6c 100644 --- a/zipkin-server/storage-cassandra/src/main/java/zipkin/server/storage/cassandra/dao/CassandraZipkinQueryDAO.java +++ b/zipkin-server/storage-cassandra/src/main/java/zipkin/server/storage/cassandra/dao/CassandraZipkinQueryDAO.java @@ -109,8 +109,8 @@ public List> getTraces(QueryRequest request, Duration duration) throw if (CollectionUtils.isNotEmpty(request.annotationQuery())) { for (Map.Entry entry : request.annotationQuery().entrySet()) { completionTraceIds.add(traceIDByAnnotationQueryExecutor.asyncGet( - entry.getValue().isEmpty() ? entry.getKey() : entry.getKey() + "=" + entry.getValue(), - duration.getStartTimeBucket(), duration.getEndTimeBucket() + request.serviceName(), entry.getValue().isEmpty() ? entry.getKey() : entry.getKey() + "=" + entry.getValue(), + duration.getStartTimestamp() * 1000, duration.getEndTimestamp() * 1000, request.limit() )); } } diff --git a/zipkin-server/storage-cassandra/src/main/java/zipkin/server/storage/cassandra/dao/executor/TraceIDByAnnotationQueryExecutor.java b/zipkin-server/storage-cassandra/src/main/java/zipkin/server/storage/cassandra/dao/executor/TraceIDByAnnotationQueryExecutor.java index a3cf00c0fb5..9463d144e62 100644 --- a/zipkin-server/storage-cassandra/src/main/java/zipkin/server/storage/cassandra/dao/executor/TraceIDByAnnotationQueryExecutor.java +++ b/zipkin-server/storage-cassandra/src/main/java/zipkin/server/storage/cassandra/dao/executor/TraceIDByAnnotationQueryExecutor.java @@ -15,6 +15,7 @@ package zipkin.server.storage.cassandra.dao.executor; import org.apache.skywalking.oap.server.core.zipkin.ZipkinSpanRecord; +import org.apache.skywalking.oap.server.library.util.StringUtil; import zipkin.server.storage.cassandra.CassandraClient; import zipkin.server.storage.cassandra.CassandraTableHelper; @@ -23,19 +24,28 @@ public class TraceIDByAnnotationQueryExecutor extends BaseQueryExecutor { private final Query query; + private final Query queryWithService; public TraceIDByAnnotationQueryExecutor(CassandraClient client, CassandraTableHelper tableHelper) { super(client, tableHelper); - this.query = buildQuery( - () -> "select " + ZipkinSpanRecord.TRACE_ID + - " from " + ZipkinSpanRecord.ADDITIONAL_QUERY_TABLE + - " where " + ZipkinSpanRecord.QUERY + " = ?" + - " and " + ZipkinSpanRecord.TIME_BUCKET + " >= ?" + - " and " + ZipkinSpanRecord.TIME_BUCKET + " <= ?", + String querySuffix = "annotation_query LIKE ?" + + " AND " + ZipkinSpanRecord.TIMESTAMP + ">=?" + + " AND " + ZipkinSpanRecord.TIMESTAMP + "<=?" + + " LIMIT ?" + + " ALLOW FILTERING"; + + this.query = buildQuery(() -> "select trace_id from " + ZipkinSpanRecord.INDEX_NAME + " where " + querySuffix, + row -> row.getString(ZipkinSpanRecord.TRACE_ID) + ); + this.queryWithService = buildQuery(() -> "select trace_id from " + ZipkinSpanRecord.INDEX_NAME + " where " + + ZipkinSpanRecord.LOCAL_ENDPOINT_SERVICE_NAME + " = ? and " + querySuffix, row -> row.getString(ZipkinSpanRecord.TRACE_ID) ); } - public CompletionStage> asyncGet(String query, long startTimeBucket, long endTimeBucket) { - return executeAsync(this.query, query, startTimeBucket, endTimeBucket); + public CompletionStage> asyncGet(String serviceName, String query, long startTimeBucket, long endTimeBucket, int size) { + if (StringUtil.isNotEmpty(serviceName)) { + return executeAsync(this.queryWithService, serviceName, query, startTimeBucket, endTimeBucket, size); + } + return executeAsync(this.query, query, startTimeBucket, endTimeBucket, size); } } diff --git a/zipkin-server/storage-cassandra/src/main/resources/zipkin-schemas.cql b/zipkin-server/storage-cassandra/src/main/resources/zipkin-schemas.cql index 7b1a699c2db..b1c8d8cdf8e 100644 --- a/zipkin-server/storage-cassandra/src/main/resources/zipkin-schemas.cql +++ b/zipkin-server/storage-cassandra/src/main/resources/zipkin-schemas.cql @@ -23,6 +23,7 @@ CREATE TABLE IF NOT EXISTS zipkin_span ( debug INT, shared INT, uuid_unique text, + annotation_query text, PRIMARY KEY(trace_id, uuid_unique) ) WITH CLUSTERING ORDER BY (uuid_unique DESC) @@ -34,6 +35,15 @@ CREATE TABLE IF NOT EXISTS zipkin_span ( AND speculative_retry = '95percentile' AND comment = 'Primary table for holding trace data'; +CREATE CUSTOM INDEX IF NOT EXISTS ON zipkin_span (local_endpoint_service_name) USING 'org.apache.cassandra.index.sasi.SASIIndex' + WITH OPTIONS = {'mode': 'PREFIX'}; +CREATE CUSTOM INDEX IF NOT EXISTS ON zipkin_span (annotation_query) USING 'org.apache.cassandra.index.sasi.SASIIndex' + WITH OPTIONS = { + 'mode': 'PREFIX', + 'analyzed': 'true', + 'analyzer_class':'org.apache.cassandra.index.sasi.analyzer.DelimiterAnalyzer', + 'delimiter': '░'}; + CREATE TABLE IF NOT EXISTS zipkin_service_relation_traffic ( id text, table_name text, @@ -83,21 +93,6 @@ CREATE TABLE IF NOT EXISTS zipkin_service_traffic ( AND speculative_retry = '95percentile' AND comment = 'Secondary table for looking up all services'; -CREATE TABLE IF NOT EXISTS zipkin_query ( - trace_id text, - query text, - time_bucket BIGINT, - PRIMARY KEY(query, time_bucket, trace_id) -) - WITH CLUSTERING ORDER BY (time_bucket DESC, trace_id DESC) - AND compaction = {'class': 'org.apache.cassandra.db.compaction.TimeWindowCompactionStrategy'} - AND default_time_to_live = 604800 - AND gc_grace_seconds = 3600 - AND read_repair_chance = 0 - AND dclocal_read_repair_chance = 0 - AND speculative_retry = '95percentile' - AND comment = 'Secondary table for looking up traces by annotation query'; - CREATE TABLE IF NOT EXISTS tag_autocomplete ( id text, table_name text, diff --git a/zipkin-server/storage-cassandra/src/test/java/zipkin/server/storage/cassandra/ITCassandraStorage.java b/zipkin-server/storage-cassandra/src/test/java/zipkin/server/storage/cassandra/ITCassandraStorage.java index b33ab735712..8930060e89c 100644 --- a/zipkin-server/storage-cassandra/src/test/java/zipkin/server/storage/cassandra/ITCassandraStorage.java +++ b/zipkin-server/storage-cassandra/src/test/java/zipkin/server/storage/cassandra/ITCassandraStorage.java @@ -53,7 +53,7 @@ public class ITCassandraStorage { @RegisterExtension CassandraExtension cassandra = new CassandraExtension(); - private final ModuleManager moduleManager = new ModuleManager(); + private final ModuleManager moduleManager = new ModuleManager("Zipkin Server"); private SpanForwardService forward; private ITagAutoCompleteQueryDAO tagAutoCompleteQueryDAO; private IZipkinQueryDAO zipkinQueryDAO; @@ -92,6 +92,7 @@ public void test() throws InterruptedException, IOException { // search traces final QueryRequest query = QueryRequest.newBuilder() .lookback(86400000L) + .serviceName("frontend") .endTs(System.currentTimeMillis()) .minDuration(1000L) .annotationQuery(Collections.singletonMap("http.path", "/api"))