Skip to content

Commit

Permalink
Sync skywalking upstream version and reduce Cassandra storage size (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
mrproliu authored Nov 9, 2023
1 parent 726e377 commit 29a392f
Show file tree
Hide file tree
Showing 8 changed files with 74 additions and 104 deletions.
2 changes: 1 addition & 1 deletion skywalking
Submodule skywalking updated 156 files
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public static void start() {
RunningMode.setMode(mode);

ApplicationConfigLoader configLoader = new ApplicationConfigLoader();
ModuleManager manager = new ModuleManager();
ModuleManager manager = new ModuleManager("Zipkin Server");
try {
ApplicationConfiguration applicationConfiguration = configLoader.load();
manager.init(applicationConfiguration);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ public class CassandraClient implements Client {
static final Logger LOG = LoggerFactory.getLogger(CassandraClient.class);

public static final String RECORD_UNIQUE_UUID_COLUMN = "uuid_unique";
public static final String ZIPKIN_SPAN_ANNOTATION_QUERY_COLUMN = "annotation_query";

private final CassandraConfig config;
private final DelegatedHealthChecker healthChecker;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,18 @@
package zipkin.server.storage.cassandra.dao;

import com.datastax.oss.driver.api.core.cql.Row;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import org.apache.skywalking.oap.server.core.storage.SessionCacheCallback;
import org.apache.skywalking.oap.server.core.storage.StorageData;
import org.apache.skywalking.oap.server.core.storage.model.ColumnName;
import org.apache.skywalking.oap.server.core.storage.model.Model;
import org.apache.skywalking.oap.server.core.storage.model.ModelColumn;
import org.apache.skywalking.oap.server.core.storage.model.SQLDatabaseModelExtension;
import org.apache.skywalking.oap.server.core.storage.type.Convert2Storage;
import org.apache.skywalking.oap.server.core.storage.type.HashMapConverter;
import org.apache.skywalking.oap.server.core.storage.type.StorageBuilder;
import org.apache.skywalking.oap.server.core.storage.type.StorageDataComplexObject;
import org.apache.skywalking.oap.server.core.zipkin.ZipkinSpanRecord;
import org.apache.skywalking.oap.server.library.util.CollectionUtils;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.TableMetaInfo;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.common.JDBCTableInstaller;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.common.TableHelper;
Expand All @@ -44,8 +44,10 @@
import java.util.stream.Stream;

import static org.apache.skywalking.oap.server.storage.plugin.jdbc.common.JDBCTableInstaller.ID_COLUMN;
import static zipkin2.internal.RecyclableBuffers.SHORT_STRING_LENGTH;

public class CassandraCqlExecutor {
private static final JsonObject EMPTY_JSON_OBJECT = new JsonObject();

protected <T extends StorageData> List<StorageData> getByIDs(CassandraClient client,
String modelName,
Expand Down Expand Up @@ -74,20 +76,6 @@ protected <T extends StorageData>CQLExecutor getInsertExecutor(Model model, T me
});
CQLExecutor sqlExecutor = buildInsertExecutor(
model, metrics, timeBucket, mainEntity, callback);
//build additional table cql
for (final SQLDatabaseModelExtension.AdditionalTable additionalTable : model.getSqlDBModelExtension().getAdditionalTables().values()) {
Map<String, Object> additionalEntity = new HashMap<>();
additionalTable.getColumns().forEach(column -> {
additionalEntity.put(column.getColumnName().getName(), objectMap.get(column.getColumnName().getName()));
});

List<CQLExecutor> additionalSQLExecutors = buildAdditionalInsertExecutor(
model, additionalTable.getName(), additionalTable.getColumns(), metrics,
timeBucket, additionalEntity, callback
);
sqlExecutor.appendAdditionalCQLs(additionalSQLExecutors);
}

// extension the span tables for query
if (metrics instanceof ZipkinSpanRecord) {
sqlExecutor.appendAdditionalCQLs(CassandraTableExtension.buildExtensionsForSpan((ZipkinSpanRecord) metrics, callback));
Expand Down Expand Up @@ -125,64 +113,6 @@ protected <T extends StorageData> CQLExecutor getUpdateExecutor(Model model, T m
return new CQLExecutor(sqlBuilder.toString(), param, callback, null);
}

private <T extends StorageData> List<CQLExecutor> buildAdditionalInsertExecutor(Model model, String tableName,
List<ModelColumn> columns,
T metrics,
long timeBucket,
Map<String, Object> objectMap,
SessionCacheCallback callback) {

List<CQLExecutor> sqlExecutors = new ArrayList<>();
List<String> columnNames = new ArrayList<>();
List<String> values = new ArrayList<>();
List<Object> param = new ArrayList<>();
final StringBuilder cqlBuilder = new StringBuilder("INSERT INTO ").append(tableName);

int position = 0;
List valueList = new ArrayList();
for (int i = 0; i < columns.size(); i++) {
ModelColumn column = columns.get(i);
if (List.class.isAssignableFrom(column.getType())) {
valueList = (List) objectMap.get(column.getColumnName().getName());

columnNames.add(column.getColumnName().getStorageName());
values.add("?");
param.add(null);

position = i + 1;
} else {
columnNames.add(column.getColumnName().getStorageName());
values.add("?");

Object value = objectMap.get(column.getColumnName().getName());
if (value instanceof StorageDataComplexObject) {
param.add(((StorageDataComplexObject) value).toStorageData());
} else {
param.add(value);
}
}
}

cqlBuilder.append("(").append(columnNames.stream().collect(Collectors.joining(", "))).append(")")
.append(" VALUES (").append(values.stream().collect(Collectors.joining(", "))).append(")");
String sql = cqlBuilder.toString();
if (!CollectionUtils.isEmpty(valueList)) {
for (Object object : valueList) {
List<Object> paramCopy = new ArrayList<>(param);
paramCopy.set(position - 1, object);
sqlExecutors.add(new CQLExecutor(sql, paramCopy, callback, null));
}
} else {
// if not query data, then ignore the data insert
if ("zipkin_query".equals(tableName)) {
return sqlExecutors;
}
sqlExecutors.add(new CQLExecutor(sql, param, callback, null));
}

return sqlExecutors;
}

private <T extends StorageData> CQLExecutor buildInsertExecutor(Model model,
T metrics,
long timeBucket,
Expand All @@ -199,8 +129,9 @@ private <T extends StorageData> CQLExecutor buildInsertExecutor(Model model,
.map(ModelColumn::getColumnName)
.map(ColumnName::getStorageName))
.collect(Collectors.toList());
if (model.isRecord()) {
if (metrics instanceof ZipkinSpanRecord) {
columnNames.add(CassandraClient.RECORD_UNIQUE_UUID_COLUMN);
columnNames.add(CassandraClient.ZIPKIN_SPAN_ANNOTATION_QUERY_COLUMN);
}
cqlBuilder.append(columnNames.stream().collect(Collectors.joining(",", "(", ")")));
cqlBuilder.append(" VALUES ");
Expand All @@ -220,8 +151,9 @@ private <T extends StorageData> CQLExecutor buildInsertExecutor(Model model,
return it;
}))
.collect(Collectors.toList());
if (model.isRecord()) {
if (metrics instanceof ZipkinSpanRecord) {
params.add(UUID.randomUUID().toString());
params.add(annotationQuery((ZipkinSpanRecord) metrics));
}

return new CQLExecutor(cqlBuilder.toString(), params, onCompleteCallback, null);
Expand All @@ -242,4 +174,35 @@ private static String getModelTables(CassandraClient client, String modelName) {
final Model model = TableMetaInfo.get(modelName);
return TableHelper.getTableName(model);
}

private String annotationQuery(ZipkinSpanRecord span) {
final JsonObject annotation = jsonCheck(span.getAnnotations());
final JsonObject tags = jsonCheck(span.getTags());
if (annotation.size() == 0 && tags.size() == 0) return null;

char delimiter = '░'; // as very unlikely to be in the query
StringBuilder result = new StringBuilder().append(delimiter);
for (Map.Entry<String, JsonElement> annotationEntry : annotation.entrySet()) {
final String annotationValue = annotationEntry.getValue().getAsString();
if (annotationValue.length() > SHORT_STRING_LENGTH) continue;

result.append(annotationValue).append(delimiter);
}

for (Map.Entry<String, JsonElement> tagEntry : tags.entrySet()) {
final String tagValue = tagEntry.getValue().getAsString();
if (tagValue.length() > SHORT_STRING_LENGTH) continue;

result.append(tagEntry.getKey()).append(delimiter); // search is possible by key alone
result.append(tagEntry.getKey()).append('=').append(tagValue).append(delimiter);
}
return result.length() == 1 ? null : result.toString();
}

private JsonObject jsonCheck(JsonObject json) {
if (json == null) {
json = EMPTY_JSON_OBJECT;
}
return json;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,8 @@ public List<List<Span>> getTraces(QueryRequest request, Duration duration) throw
if (CollectionUtils.isNotEmpty(request.annotationQuery())) {
for (Map.Entry<String, String> entry : request.annotationQuery().entrySet()) {
completionTraceIds.add(traceIDByAnnotationQueryExecutor.asyncGet(
entry.getValue().isEmpty() ? entry.getKey() : entry.getKey() + "=" + entry.getValue(),
duration.getStartTimeBucket(), duration.getEndTimeBucket()
request.serviceName(), entry.getValue().isEmpty() ? entry.getKey() : entry.getKey() + "=" + entry.getValue(),
duration.getStartTimestamp() * 1000, duration.getEndTimestamp() * 1000, request.limit()
));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package zipkin.server.storage.cassandra.dao.executor;

import org.apache.skywalking.oap.server.core.zipkin.ZipkinSpanRecord;
import org.apache.skywalking.oap.server.library.util.StringUtil;
import zipkin.server.storage.cassandra.CassandraClient;
import zipkin.server.storage.cassandra.CassandraTableHelper;

Expand All @@ -23,19 +24,28 @@

public class TraceIDByAnnotationQueryExecutor extends BaseQueryExecutor {
private final Query<String> query;
private final Query<String> queryWithService;
public TraceIDByAnnotationQueryExecutor(CassandraClient client, CassandraTableHelper tableHelper) {
super(client, tableHelper);
this.query = buildQuery(
() -> "select " + ZipkinSpanRecord.TRACE_ID +
" from " + ZipkinSpanRecord.ADDITIONAL_QUERY_TABLE +
" where " + ZipkinSpanRecord.QUERY + " = ?" +
" and " + ZipkinSpanRecord.TIME_BUCKET + " >= ?" +
" and " + ZipkinSpanRecord.TIME_BUCKET + " <= ?",
String querySuffix = "annotation_query LIKE ?"
+ " AND " + ZipkinSpanRecord.TIMESTAMP + ">=?"
+ " AND " + ZipkinSpanRecord.TIMESTAMP + "<=?"
+ " LIMIT ?"
+ " ALLOW FILTERING";

this.query = buildQuery(() -> "select trace_id from " + ZipkinSpanRecord.INDEX_NAME + " where " + querySuffix,
row -> row.getString(ZipkinSpanRecord.TRACE_ID)
);
this.queryWithService = buildQuery(() -> "select trace_id from " + ZipkinSpanRecord.INDEX_NAME + " where " +
ZipkinSpanRecord.LOCAL_ENDPOINT_SERVICE_NAME + " = ? and " + querySuffix,
row -> row.getString(ZipkinSpanRecord.TRACE_ID)
);
}

public CompletionStage<List<String>> asyncGet(String query, long startTimeBucket, long endTimeBucket) {
return executeAsync(this.query, query, startTimeBucket, endTimeBucket);
public CompletionStage<List<String>> asyncGet(String serviceName, String query, long startTimeBucket, long endTimeBucket, int size) {
if (StringUtil.isNotEmpty(serviceName)) {
return executeAsync(this.queryWithService, serviceName, query, startTimeBucket, endTimeBucket, size);
}
return executeAsync(this.query, query, startTimeBucket, endTimeBucket, size);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ CREATE TABLE IF NOT EXISTS zipkin_span (
debug INT,
shared INT,
uuid_unique text,
annotation_query text,
PRIMARY KEY(trace_id, uuid_unique)
)
WITH CLUSTERING ORDER BY (uuid_unique DESC)
Expand All @@ -34,6 +35,15 @@ CREATE TABLE IF NOT EXISTS zipkin_span (
AND speculative_retry = '95percentile'
AND comment = 'Primary table for holding trace data';

CREATE CUSTOM INDEX IF NOT EXISTS ON zipkin_span (local_endpoint_service_name) USING 'org.apache.cassandra.index.sasi.SASIIndex'
WITH OPTIONS = {'mode': 'PREFIX'};
CREATE CUSTOM INDEX IF NOT EXISTS ON zipkin_span (annotation_query) USING 'org.apache.cassandra.index.sasi.SASIIndex'
WITH OPTIONS = {
'mode': 'PREFIX',
'analyzed': 'true',
'analyzer_class':'org.apache.cassandra.index.sasi.analyzer.DelimiterAnalyzer',
'delimiter': ''};

CREATE TABLE IF NOT EXISTS zipkin_service_relation_traffic (
id text,
table_name text,
Expand Down Expand Up @@ -83,21 +93,6 @@ CREATE TABLE IF NOT EXISTS zipkin_service_traffic (
AND speculative_retry = '95percentile'
AND comment = 'Secondary table for looking up all services';

CREATE TABLE IF NOT EXISTS zipkin_query (
trace_id text,
query text,
time_bucket BIGINT,
PRIMARY KEY(query, time_bucket, trace_id)
)
WITH CLUSTERING ORDER BY (time_bucket DESC, trace_id DESC)
AND compaction = {'class': 'org.apache.cassandra.db.compaction.TimeWindowCompactionStrategy'}
AND default_time_to_live = 604800
AND gc_grace_seconds = 3600
AND read_repair_chance = 0
AND dclocal_read_repair_chance = 0
AND speculative_retry = '95percentile'
AND comment = 'Secondary table for looking up traces by annotation query';

CREATE TABLE IF NOT EXISTS tag_autocomplete (
id text,
table_name text,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public class ITCassandraStorage {
@RegisterExtension
CassandraExtension cassandra = new CassandraExtension();

private final ModuleManager moduleManager = new ModuleManager();
private final ModuleManager moduleManager = new ModuleManager("Zipkin Server");
private SpanForwardService forward;
private ITagAutoCompleteQueryDAO tagAutoCompleteQueryDAO;
private IZipkinQueryDAO zipkinQueryDAO;
Expand Down Expand Up @@ -92,6 +92,7 @@ public void test() throws InterruptedException, IOException {
// search traces
final QueryRequest query = QueryRequest.newBuilder()
.lookback(86400000L)
.serviceName("frontend")
.endTs(System.currentTimeMillis())
.minDuration(1000L)
.annotationQuery(Collections.singletonMap("http.path", "/api"))
Expand Down

0 comments on commit 29a392f

Please sign in to comment.