From 137ab8dc9f0bc0e961e5148e6313c055d9097b48 Mon Sep 17 00:00:00 2001
From: Mrproliu <741550557@qq.com>
Date: Wed, 25 Oct 2023 12:27:24 +0800
Subject: [PATCH] Add dependency module related table create and query
---
zipkin-server/server-starter/pom.xml | 5 ++
.../src/main/resources/zipkin-schemas.cql | 15 ++++
zipkin-server/zipkin-storage-ext/pom.xml | 1 +
.../pom.xml | 22 +++++
.../ZipkinDependencyCassandraQueryDAO.java | 56 ++++++++++++
...kinDependencyCassandraStorageProvider.java | 90 +++++++++++++++++++
...g.oap.server.library.module.ModuleProvider | 19 ++++
7 files changed, 208 insertions(+)
create mode 100644 zipkin-server/zipkin-storage-ext/zipkin-dependency-storage-cassandra/pom.xml
create mode 100644 zipkin-server/zipkin-storage-ext/zipkin-dependency-storage-cassandra/src/main/java/zipkin/server/dependency/storage/cassandra/ZipkinDependencyCassandraQueryDAO.java
create mode 100644 zipkin-server/zipkin-storage-ext/zipkin-dependency-storage-cassandra/src/main/java/zipkin/server/dependency/storage/cassandra/ZipkinDependencyCassandraStorageProvider.java
create mode 100644 zipkin-server/zipkin-storage-ext/zipkin-dependency-storage-cassandra/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.library.module.ModuleProvider
diff --git a/zipkin-server/server-starter/pom.xml b/zipkin-server/server-starter/pom.xml
index 4bcb4288ff9..3aa26c46441 100644
--- a/zipkin-server/server-starter/pom.xml
+++ b/zipkin-server/server-starter/pom.xml
@@ -77,6 +77,11 @@
zipkin-dependency-storage-banyandb
${project.version}
+
+ io.zipkin
+ zipkin-dependency-storage-cassandra
+ ${project.version}
+
diff --git a/zipkin-server/storage-cassandra/src/main/resources/zipkin-schemas.cql b/zipkin-server/storage-cassandra/src/main/resources/zipkin-schemas.cql
index b3074ee7fa5..508ced3c89f 100644
--- a/zipkin-server/storage-cassandra/src/main/resources/zipkin-schemas.cql
+++ b/zipkin-server/storage-cassandra/src/main/resources/zipkin-schemas.cql
@@ -159,3 +159,18 @@ CREATE TABLE IF NOT EXISTS zipkin_trace_by_service_remote_service (
AND dclocal_read_repair_chance = 0
AND speculative_retry = '95percentile'
AND comment = 'Secondary table for looking up a trace by a remote service. bucket column adds time bucketing to the partition key, values are microseconds rounded to a pre-configured interval (typically one day). ts column is start timestamp of the span as time-uuid, truncated to millisecond precision.';
+
+CREATE TABLE IF NOT EXISTS zipkin_dependency (
+ analyze_day date,
+ parent text,
+ child text,
+ error_count bigint,
+ call_count bigint,
+ PRIMARY KEY (analyze_day, parent, child)
+)
+ WITH compaction = {'class': 'org.apache.cassandra.db.compaction.LeveledCompactionStrategy', 'unchecked_tombstone_compaction': 'true', 'tombstone_threshold': '0.2'}
+ AND default_time_to_live = 259200
+ AND gc_grace_seconds = 3600
+ AND read_repair_chance = 0
+ AND dclocal_read_repair_chance = 0
+ AND comment = 'Holder for each days generation of zipkin2.DependencyLink';
\ No newline at end of file
diff --git a/zipkin-server/zipkin-storage-ext/pom.xml b/zipkin-server/zipkin-storage-ext/pom.xml
index 66c0033b42c..2a28a6d637e 100644
--- a/zipkin-server/zipkin-storage-ext/pom.xml
+++ b/zipkin-server/zipkin-storage-ext/pom.xml
@@ -17,6 +17,7 @@
zipkin-dependency-storage-jdbc
zipkin-dependency-storage-elasticsearch
zipkin-dependency-storage-banyandb
+ zipkin-dependency-storage-cassandra
diff --git a/zipkin-server/zipkin-storage-ext/zipkin-dependency-storage-cassandra/pom.xml b/zipkin-server/zipkin-storage-ext/zipkin-dependency-storage-cassandra/pom.xml
new file mode 100644
index 00000000000..608e090ce3d
--- /dev/null
+++ b/zipkin-server/zipkin-storage-ext/zipkin-dependency-storage-cassandra/pom.xml
@@ -0,0 +1,22 @@
+
+
+
+ zipkin-storage-ext
+ io.zipkin
+ 2.24.4-SNAPSHOT
+
+ 4.0.0
+
+ zipkin-dependency-storage-cassandra
+ Zipkin Dependency Cassandra Extension
+
+
+
+ io.zipkin
+ storage-cassandra
+ ${project.version}
+
+
+
\ No newline at end of file
diff --git a/zipkin-server/zipkin-storage-ext/zipkin-dependency-storage-cassandra/src/main/java/zipkin/server/dependency/storage/cassandra/ZipkinDependencyCassandraQueryDAO.java b/zipkin-server/zipkin-storage-ext/zipkin-dependency-storage-cassandra/src/main/java/zipkin/server/dependency/storage/cassandra/ZipkinDependencyCassandraQueryDAO.java
new file mode 100644
index 00000000000..2bade9c37b6
--- /dev/null
+++ b/zipkin-server/zipkin-storage-ext/zipkin-dependency-storage-cassandra/src/main/java/zipkin/server/dependency/storage/cassandra/ZipkinDependencyCassandraQueryDAO.java
@@ -0,0 +1,56 @@
+/*
+ * Copyright 2015-2023 The OpenZipkin Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package zipkin.server.dependency.storage.cassandra;
+
+import org.apache.skywalking.zipkin.dependency.entity.ZipkinDependency;
+import zipkin.server.dependency.IZipkinDependencyQueryDAO;
+import zipkin.server.storage.cassandra.CassandraClient;
+import zipkin2.DependencyLink;
+import zipkin2.internal.DateUtil;
+import zipkin2.internal.DependencyLinker;
+import zipkin2.internal.Nullable;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.ZoneOffset;
+import java.util.ArrayList;
+import java.util.List;
+
+public class ZipkinDependencyCassandraQueryDAO implements IZipkinDependencyQueryDAO {
+ private CassandraClient client;
+ @Override
+ public List getDependencies(long endTs, long lookback) throws IOException {
+ return DependencyLinker.merge(client.executeQuery("SELECT parent,child,call_count,error_count from " + ZipkinDependency.INDEX_NAME +
+ " where " + ZipkinDependency.DAY + " in ? ", r -> DependencyLink.newBuilder()
+ .parent(r.getString(ZipkinDependency.PARENT))
+ .child(r.getString(ZipkinDependency.CHILD))
+ .callCount(r.getLong(ZipkinDependency.CALL_COUNT))
+ .errorCount(r.getLong(ZipkinDependency.ERROR_COUNT)).build(), getDays(endTs, lookback)));
+ }
+
+ public void setClient(CassandraClient client) {
+ this.client = client;
+ }
+
+ List getDays(long endTs, @Nullable Long lookback) {
+ List result = new ArrayList<>();
+ for (long epochMillis : DateUtil.epochDays(endTs, lookback)) {
+ result.add(Instant.ofEpochMilli(epochMillis).atZone(ZoneOffset.UTC).toLocalDate());
+ }
+ return result;
+ }
+
+}
diff --git a/zipkin-server/zipkin-storage-ext/zipkin-dependency-storage-cassandra/src/main/java/zipkin/server/dependency/storage/cassandra/ZipkinDependencyCassandraStorageProvider.java b/zipkin-server/zipkin-storage-ext/zipkin-dependency-storage-cassandra/src/main/java/zipkin/server/dependency/storage/cassandra/ZipkinDependencyCassandraStorageProvider.java
new file mode 100644
index 00000000000..f5d3ab56cad
--- /dev/null
+++ b/zipkin-server/zipkin-storage-ext/zipkin-dependency-storage-cassandra/src/main/java/zipkin/server/dependency/storage/cassandra/ZipkinDependencyCassandraStorageProvider.java
@@ -0,0 +1,90 @@
+/*
+ * Copyright 2015-2023 The OpenZipkin Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package zipkin.server.dependency.storage.cassandra;
+
+import org.apache.skywalking.oap.server.core.CoreModule;
+import org.apache.skywalking.oap.server.core.storage.StorageModule;
+import org.apache.skywalking.oap.server.library.module.ModuleConfig;
+import org.apache.skywalking.oap.server.library.module.ModuleDefine;
+import org.apache.skywalking.oap.server.library.module.ModuleProvider;
+import org.apache.skywalking.oap.server.library.module.ModuleStartException;
+import org.apache.skywalking.oap.server.library.module.ServiceNotProvidedException;
+import zipkin.server.dependency.IZipkinDependencyQueryDAO;
+import zipkin.server.dependency.ZipkinDependencyModule;
+import zipkin.server.storage.cassandra.CassandraClient;
+import zipkin.server.storage.cassandra.CassandraConfig;
+import zipkin.server.storage.cassandra.CassandraProvider;
+
+import java.lang.reflect.Field;
+
+public class ZipkinDependencyCassandraStorageProvider extends ModuleProvider {
+ private CassandraConfig config;
+ private ZipkinDependencyCassandraQueryDAO queryDAO;
+
+ @Override
+ public String name() {
+ return "cassandra";
+ }
+
+ @Override
+ public Class extends ModuleDefine> module() {
+ return ZipkinDependencyModule.class;
+ }
+
+ @Override
+ public ConfigCreator extends ModuleConfig> newConfigCreator() {
+ return new ConfigCreator() {
+
+ @Override
+ public Class type() {
+ return CassandraConfig.class;
+ }
+
+ @Override
+ public void onInitialized(CassandraConfig initialized) {
+ config = initialized;
+ }
+ };
+ }
+
+ @Override
+ public void prepare() throws ServiceNotProvidedException, ModuleStartException {
+ this.queryDAO = new ZipkinDependencyCassandraQueryDAO();
+ this.registerServiceImplementation(IZipkinDependencyQueryDAO.class, queryDAO);
+ }
+
+ @Override
+ public void start() throws ServiceNotProvidedException, ModuleStartException {
+ }
+
+ @Override
+ public void notifyAfterCompleted() throws ServiceNotProvidedException, ModuleStartException {
+ CassandraProvider provider =
+ (CassandraProvider) getManager().find(StorageModule.NAME).provider();
+ try {
+ Field field = CassandraProvider.class.getDeclaredField("client");
+ field.setAccessible(true);
+ CassandraClient client = (CassandraClient) field.get(provider);
+ queryDAO.setClient(client);
+ } catch (NoSuchFieldException | IllegalAccessException e) {
+ throw new ModuleStartException("Failed to get CassandraStorageClient.", e);
+ }
+ }
+
+ @Override
+ public String[] requiredModules() {
+ return new String[] {CoreModule.NAME, StorageModule.NAME};
+ }
+}
diff --git a/zipkin-server/zipkin-storage-ext/zipkin-dependency-storage-cassandra/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.library.module.ModuleProvider b/zipkin-server/zipkin-storage-ext/zipkin-dependency-storage-cassandra/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.library.module.ModuleProvider
new file mode 100644
index 00000000000..fd4f7427cc9
--- /dev/null
+++ b/zipkin-server/zipkin-storage-ext/zipkin-dependency-storage-cassandra/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.library.module.ModuleProvider
@@ -0,0 +1,19 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+#
+
+zipkin.server.dependency.storage.cassandra.ZipkinDependencyCassandraStorageProvider
\ No newline at end of file