result = new ArrayList<>();
+ for (String contactPoint : contactPoints.split(",", 100)) {
+ HostAndPort parsed = HostAndPort.fromString(contactPoint, 9042);
+ result.add(new InetSocketAddress(parsed.getHost(), parsed.getPort()));
+ }
+ return result;
+ }
+}
diff --git a/zipkin-server/storage-cassandra/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.library.module.ModuleProvider b/zipkin-server/storage-cassandra/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.library.module.ModuleProvider
new file mode 100755
index 00000000000..38538a46d12
--- /dev/null
+++ b/zipkin-server/storage-cassandra/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.library.module.ModuleProvider
@@ -0,0 +1,19 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+#
+
+zipkin.server.storage.cassandra.CassandraProvider
\ No newline at end of file
diff --git a/zipkin-server/storage-cassandra/src/main/resources/zipkin-schemas.cql b/zipkin-server/storage-cassandra/src/main/resources/zipkin-schemas.cql
new file mode 100644
index 00000000000..508ced3c89f
--- /dev/null
+++ b/zipkin-server/storage-cassandra/src/main/resources/zipkin-schemas.cql
@@ -0,0 +1,176 @@
+CREATE KEYSPACE IF NOT EXISTS zipkin2
+ WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'}
+ AND durable_writes = false;
+
+CREATE TABLE IF NOT EXISTS zipkin_span (
+ id text,
+ table_name text,
+ trace_id text,
+ span_id text,
+ parent_id text,
+ name text,
+ duration BIGINT,
+ kind text,
+ timestamp_millis BIGINT,
+ TIMESTAMP BIGINT,
+ local_endpoint_service_name text,
+ local_endpoint_ipv4 text,
+ local_endpoint_ipv6 text,
+ local_endpoint_port INT,
+ remote_endpoint_service_name text,
+ remote_endpoint_ipv4 text,
+ remote_endpoint_ipv6 text,
+ remote_endpoint_port INT,
+ annotations text,
+ tags text,
+ debug INT,
+ shared INT,
+ time_bucket BIGINT,
+ uuid_unique text,
+ PRIMARY KEY(trace_id, uuid_unique)
+)
+ WITH CLUSTERING ORDER BY (uuid_unique DESC)
+ AND compaction = {'class': 'org.apache.cassandra.db.compaction.TimeWindowCompactionStrategy'}
+ AND default_time_to_live = 604800
+ AND gc_grace_seconds = 3600
+ AND read_repair_chance = 0
+ AND dclocal_read_repair_chance = 0
+ AND speculative_retry = '95percentile'
+ AND comment = 'Primary table for holding trace data';
+
+CREATE TABLE IF NOT EXISTS zipkin_service_relation_traffic (
+ id text,
+ table_name text,
+ service_name text,
+ remote_service_name text,
+ time_bucket BIGINT,
+ PRIMARY KEY(service_name, remote_service_name)
+)
+ WITH compaction = {'class': 'org.apache.cassandra.db.compaction.LeveledCompactionStrategy', 'unchecked_tombstone_compaction': 'true', 'tombstone_threshold': '0.2'}
+ AND caching = {'rows_per_partition': 'ALL'}
+ AND default_time_to_live = 259200
+ AND gc_grace_seconds = 3600
+ AND read_repair_chance = 0
+ AND dclocal_read_repair_chance = 0
+ AND speculative_retry = '95percentile'
+ AND comment = 'Secondary table for looking up remote service names by a service name.';
+
+CREATE TABLE IF NOT EXISTS zipkin_service_span_traffic (
+ id text,
+ table_name text,
+ service_name text,
+ span_name text,
+ time_bucket BIGINT,
+ PRIMARY KEY(service_name, span_name)
+)
+ WITH compaction = {'class': 'org.apache.cassandra.db.compaction.LeveledCompactionStrategy', 'unchecked_tombstone_compaction': 'true', 'tombstone_threshold': '0.2'}
+ AND caching = {'rows_per_partition': 'ALL'}
+ AND default_time_to_live = 604800
+ AND gc_grace_seconds = 3600
+ AND read_repair_chance = 0
+ AND dclocal_read_repair_chance = 0
+ AND speculative_retry = '95percentile'
+ AND comment = 'Secondary table for looking up span names by a service name.';
+
+CREATE TABLE IF NOT EXISTS zipkin_service_traffic (
+ id text,
+ table_name text,
+ service_name text,
+ time_bucket BIGINT,
+ PRIMARY KEY(service_name, time_bucket)
+)
+ WITH compaction = {'class': 'org.apache.cassandra.db.compaction.TimeWindowCompactionStrategy'}
+ AND default_time_to_live = 604800
+ AND gc_grace_seconds = 3600
+ AND read_repair_chance = 0
+ AND dclocal_read_repair_chance = 0
+ AND speculative_retry = '95percentile'
+ AND comment = 'Secondary table for looking up all services';
+
+CREATE TABLE IF NOT EXISTS zipkin_query (
+ id text,
+ trace_id text,
+ query text,
+ time_bucket BIGINT,
+ PRIMARY KEY(query, time_bucket, trace_id)
+)
+ WITH CLUSTERING ORDER BY (time_bucket DESC, trace_id DESC)
+ AND compaction = {'class': 'org.apache.cassandra.db.compaction.TimeWindowCompactionStrategy'}
+ AND default_time_to_live = 604800
+ AND gc_grace_seconds = 3600
+ AND read_repair_chance = 0
+ AND dclocal_read_repair_chance = 0
+ AND speculative_retry = '95percentile'
+ AND comment = 'Secondary table for looking up traces by annotation query';
+
+CREATE TABLE IF NOT EXISTS tag_autocomplete (
+ id text,
+ table_name text,
+ tag_key text,
+ tag_value text,
+ tag_type text,
+ time_bucket BIGINT,
+ PRIMARY KEY(tag_key, time_bucket, tag_value)
+)
+ WITH compaction = {'class': 'org.apache.cassandra.db.compaction.LeveledCompactionStrategy', 'unchecked_tombstone_compaction': 'true', 'tombstone_threshold': '0.2'}
+ AND caching = {'rows_per_partition': 'ALL'}
+ AND default_time_to_live = 259200
+ AND gc_grace_seconds = 3600
+ AND read_repair_chance = 0
+ AND dclocal_read_repair_chance = 0
+ AND speculative_retry = '95percentile'
+ AND comment = 'Secondary table for looking up span tag values for auto-complete purposes.';
+
+
+CREATE TABLE IF NOT EXISTS zipkin_trace_by_service_span (
+ service text, //-- service name
+ span text, //-- span name, or blank for queries without span name
+ bucket int, //-- time bucket, calculated as ts/interval (in microseconds), for some pre-configured interval like 1 day.
+ ts timeuuid, //-- start timestamp of the span, truncated to millisecond precision
+ trace_id text, //-- trace ID
+ duration bigint, //-- span duration, in milliseconds
+ PRIMARY KEY ((service, span, bucket), ts)
+)
+ WITH CLUSTERING ORDER BY (ts DESC)
+ AND compaction = {'class': 'org.apache.cassandra.db.compaction.TimeWindowCompactionStrategy'}
+ AND default_time_to_live = 259200
+ AND gc_grace_seconds = 3600
+ AND read_repair_chance = 0
+ AND dclocal_read_repair_chance = 0
+ AND speculative_retry = '95percentile'
+ AND comment = 'Secondary table for looking up a trace by a service, or service and span. span column may be blank (when only looking up by service). bucket column adds time bucketing to the partition key, values are microseconds rounded to a pre-configured interval (typically one day). ts column is start timestamp of the span as time-uuid, truncated to millisecond precision. duration column is span duration, rounded up to tens of milliseconds (or hundredths of seconds)';
+
+CREATE CUSTOM INDEX IF NOT EXISTS ON zipkin_trace_by_service_span (duration) USING 'org.apache.cassandra.index.sasi.SASIIndex'
+ WITH OPTIONS = {'mode': 'PREFIX'};
+
+CREATE TABLE IF NOT EXISTS zipkin_trace_by_service_remote_service (
+ service text, //-- service name
+ remote_service text, //-- remote servie name
+ bucket int, //-- time bucket, calculated as ts/interval (in microseconds), for some pre-configured interval like 1 day.
+ ts timeuuid, //-- start timestamp of the span, truncated to millisecond precision
+ trace_id text, //-- trace ID
+ PRIMARY KEY ((service, remote_service, bucket), ts)
+)
+ WITH CLUSTERING ORDER BY (ts DESC)
+ AND compaction = {'class': 'org.apache.cassandra.db.compaction.TimeWindowCompactionStrategy'}
+ AND default_time_to_live = 259200
+ AND gc_grace_seconds = 3600
+ AND read_repair_chance = 0
+ AND dclocal_read_repair_chance = 0
+ AND speculative_retry = '95percentile'
+ AND comment = 'Secondary table for looking up a trace by a remote service. bucket column adds time bucketing to the partition key, values are microseconds rounded to a pre-configured interval (typically one day). ts column is start timestamp of the span as time-uuid, truncated to millisecond precision.';
+
+CREATE TABLE IF NOT EXISTS zipkin_dependency (
+ analyze_day date,
+ parent text,
+ child text,
+ error_count bigint,
+ call_count bigint,
+ PRIMARY KEY (analyze_day, parent, child)
+)
+ WITH compaction = {'class': 'org.apache.cassandra.db.compaction.LeveledCompactionStrategy', 'unchecked_tombstone_compaction': 'true', 'tombstone_threshold': '0.2'}
+ AND default_time_to_live = 259200
+ AND gc_grace_seconds = 3600
+ AND read_repair_chance = 0
+ AND dclocal_read_repair_chance = 0
+ AND comment = 'Holder for each days generation of zipkin2.DependencyLink';
\ No newline at end of file
diff --git a/zipkin-server/storage-cassandra/src/test/java/zipkin/server/storage/cassandra/ApplicationConfigLoader.java b/zipkin-server/storage-cassandra/src/test/java/zipkin/server/storage/cassandra/ApplicationConfigLoader.java
new file mode 100644
index 00000000000..fa536c3c099
--- /dev/null
+++ b/zipkin-server/storage-cassandra/src/test/java/zipkin/server/storage/cassandra/ApplicationConfigLoader.java
@@ -0,0 +1,215 @@
+/*
+ * Copyright 2015-2023 The OpenZipkin Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package zipkin.server.storage.cassandra;
+
+import org.apache.skywalking.oap.server.library.module.ApplicationConfiguration;
+import org.apache.skywalking.oap.server.library.module.ProviderNotFoundException;
+import org.apache.skywalking.oap.server.library.util.CollectionUtils;
+import org.apache.skywalking.oap.server.library.util.PropertyPlaceholderHelper;
+import org.apache.skywalking.oap.server.library.util.ResourceUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.yaml.snakeyaml.Yaml;
+
+import java.io.FileNotFoundException;
+import java.io.Reader;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * Initialize collector settings with following sources. Use application.yml as primary setting, and fix missing setting
+ * by default settings in application-default.yml.
+ *
+ * At last, override setting by system.properties and system.envs if the key matches moduleName.provideName.settingKey.
+ */
+public class ApplicationConfigLoader {
+ static final Logger log = LoggerFactory.getLogger(ApplicationConfigLoader.class.getName());
+
+ private static final String DISABLE_SELECTOR = "-";
+ private static final String SELECTOR = "selector";
+
+ private final Yaml yaml = new Yaml();
+
+ public ApplicationConfiguration load() {
+ ApplicationConfiguration configuration = new ApplicationConfiguration();
+ this.loadConfig(configuration);
+ this.overrideConfigBySystemEnv(configuration);
+ return configuration;
+ }
+
+ @SuppressWarnings("unchecked")
+ private void loadConfig(ApplicationConfiguration configuration) {
+ try {
+ Reader applicationReader = ResourceUtils.read("application.yml");
+ Map> moduleConfig = yaml.loadAs(applicationReader, Map.class);
+ if (CollectionUtils.isNotEmpty(moduleConfig)) {
+ selectConfig(moduleConfig);
+ moduleConfig.forEach((moduleName, providerConfig) -> {
+ if (providerConfig.size() > 0) {
+ log.info("Get a module define from application.yml, module name: {}", moduleName);
+ ApplicationConfiguration.ModuleConfiguration moduleConfiguration = configuration.addModule(
+ moduleName);
+ providerConfig.forEach((providerName, config) -> {
+ log.info(
+ "Get a provider define belong to {} module, provider name: {}", moduleName,
+ providerName
+ );
+ final Map propertiesConfig = (Map) config;
+ final Properties properties = new Properties();
+ if (propertiesConfig != null) {
+ propertiesConfig.forEach((propertyName, propertyValue) -> {
+ if (propertyValue instanceof Map) {
+ Properties subProperties = new Properties();
+ ((Map) propertyValue).forEach((key, value) -> {
+ subProperties.put(key, value);
+ replacePropertyAndLog(key, value, subProperties, providerName);
+ });
+ properties.put(propertyName, subProperties);
+ } else {
+ properties.put(propertyName, propertyValue);
+ replacePropertyAndLog(propertyName, propertyValue, properties, providerName);
+ }
+ });
+ }
+ moduleConfiguration.addProviderConfiguration(providerName, properties);
+ });
+ } else {
+ log.warn(
+ "Get a module define from application.yml, but no provider define, use default, module name: {}",
+ moduleName
+ );
+ }
+ });
+ }
+ } catch (FileNotFoundException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ private void replacePropertyAndLog(final String propertyName, final Object propertyValue, final Properties target,
+ final Object providerName) {
+ final String valueString = PropertyPlaceholderHelper.INSTANCE
+ .replacePlaceholders(String.valueOf(propertyValue), target);
+ if (valueString.trim().length() == 0) {
+ target.replace(propertyName, valueString);
+ log.info("Provider={} config={} has been set as an empty string", providerName, propertyName);
+ } else {
+ // Use YAML to do data type conversion.
+ final Object replaceValue = convertValueString(valueString);
+ if (replaceValue != null) {
+ target.replace(propertyName, replaceValue);
+ }
+ }
+ }
+
+ private Object convertValueString(String valueString) {
+ try {
+ Object replaceValue = yaml.load(valueString);
+ if (replaceValue instanceof String || replaceValue instanceof Integer || replaceValue instanceof Long || replaceValue instanceof Boolean || replaceValue instanceof ArrayList) {
+ return replaceValue;
+ } else {
+ return valueString;
+ }
+ } catch (Exception e) {
+ log.warn("yaml convert value type error, use origin values string. valueString={}", valueString, e);
+ return valueString;
+ }
+ }
+
+ private void overrideConfigBySystemEnv(ApplicationConfiguration configuration) {
+ for (Map.Entry