Skip to content

Commit

Permalink
Add dependency module related table create and query
Browse files Browse the repository at this point in the history
  • Loading branch information
mrproliu committed Oct 25, 2023
1 parent a04c190 commit 137ab8d
Show file tree
Hide file tree
Showing 7 changed files with 208 additions and 0 deletions.
5 changes: 5 additions & 0 deletions zipkin-server/server-starter/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,11 @@
<artifactId>zipkin-dependency-storage-banyandb</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.zipkin</groupId>
<artifactId>zipkin-dependency-storage-cassandra</artifactId>
<version>${project.version}</version>
</dependency>

<!-- zipkin receiver -->
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,3 +159,18 @@ CREATE TABLE IF NOT EXISTS zipkin_trace_by_service_remote_service (
AND dclocal_read_repair_chance = 0
AND speculative_retry = '95percentile'
AND comment = 'Secondary table for looking up a trace by a remote service. bucket column adds time bucketing to the partition key, values are microseconds rounded to a pre-configured interval (typically one day). ts column is start timestamp of the span as time-uuid, truncated to millisecond precision.';

CREATE TABLE IF NOT EXISTS zipkin_dependency (
analyze_day date,
parent text,
child text,
error_count bigint,
call_count bigint,
PRIMARY KEY (analyze_day, parent, child)
)
WITH compaction = {'class': 'org.apache.cassandra.db.compaction.LeveledCompactionStrategy', 'unchecked_tombstone_compaction': 'true', 'tombstone_threshold': '0.2'}
AND default_time_to_live = 259200
AND gc_grace_seconds = 3600
AND read_repair_chance = 0
AND dclocal_read_repair_chance = 0
AND comment = 'Holder for each days generation of zipkin2.DependencyLink';
1 change: 1 addition & 0 deletions zipkin-server/zipkin-storage-ext/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
<module>zipkin-dependency-storage-jdbc</module>
<module>zipkin-dependency-storage-elasticsearch</module>
<module>zipkin-dependency-storage-banyandb</module>
<module>zipkin-dependency-storage-cassandra</module>
</modules>

<dependencies>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>zipkin-storage-ext</artifactId>
<groupId>io.zipkin</groupId>
<version>2.24.4-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>zipkin-dependency-storage-cassandra</artifactId>
<name>Zipkin Dependency Cassandra Extension</name>

<dependencies>
<dependency>
<groupId>io.zipkin</groupId>
<artifactId>storage-cassandra</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Copyright 2015-2023 The OpenZipkin Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/

package zipkin.server.dependency.storage.cassandra;

import org.apache.skywalking.zipkin.dependency.entity.ZipkinDependency;
import zipkin.server.dependency.IZipkinDependencyQueryDAO;
import zipkin.server.storage.cassandra.CassandraClient;
import zipkin2.DependencyLink;
import zipkin2.internal.DateUtil;
import zipkin2.internal.DependencyLinker;
import zipkin2.internal.Nullable;

import java.io.IOException;
import java.time.Instant;
import java.time.LocalDate;
import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.List;

public class ZipkinDependencyCassandraQueryDAO implements IZipkinDependencyQueryDAO {
private CassandraClient client;
@Override
public List<DependencyLink> getDependencies(long endTs, long lookback) throws IOException {
return DependencyLinker.merge(client.executeQuery("SELECT parent,child,call_count,error_count from " + ZipkinDependency.INDEX_NAME +
" where " + ZipkinDependency.DAY + " in ? ", r -> DependencyLink.newBuilder()
.parent(r.getString(ZipkinDependency.PARENT))
.child(r.getString(ZipkinDependency.CHILD))
.callCount(r.getLong(ZipkinDependency.CALL_COUNT))
.errorCount(r.getLong(ZipkinDependency.ERROR_COUNT)).build(), getDays(endTs, lookback)));
}

public void setClient(CassandraClient client) {
this.client = client;
}

List<LocalDate> getDays(long endTs, @Nullable Long lookback) {
List<LocalDate> result = new ArrayList<>();
for (long epochMillis : DateUtil.epochDays(endTs, lookback)) {
result.add(Instant.ofEpochMilli(epochMillis).atZone(ZoneOffset.UTC).toLocalDate());
}
return result;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* Copyright 2015-2023 The OpenZipkin Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/

package zipkin.server.dependency.storage.cassandra;

import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.storage.StorageModule;
import org.apache.skywalking.oap.server.library.module.ModuleConfig;
import org.apache.skywalking.oap.server.library.module.ModuleDefine;
import org.apache.skywalking.oap.server.library.module.ModuleProvider;
import org.apache.skywalking.oap.server.library.module.ModuleStartException;
import org.apache.skywalking.oap.server.library.module.ServiceNotProvidedException;
import zipkin.server.dependency.IZipkinDependencyQueryDAO;
import zipkin.server.dependency.ZipkinDependencyModule;
import zipkin.server.storage.cassandra.CassandraClient;
import zipkin.server.storage.cassandra.CassandraConfig;
import zipkin.server.storage.cassandra.CassandraProvider;

import java.lang.reflect.Field;

public class ZipkinDependencyCassandraStorageProvider extends ModuleProvider {
private CassandraConfig config;
private ZipkinDependencyCassandraQueryDAO queryDAO;

@Override
public String name() {
return "cassandra";
}

@Override
public Class<? extends ModuleDefine> module() {
return ZipkinDependencyModule.class;
}

@Override
public ConfigCreator<? extends ModuleConfig> newConfigCreator() {
return new ConfigCreator<CassandraConfig>() {

@Override
public Class<CassandraConfig> type() {
return CassandraConfig.class;
}

@Override
public void onInitialized(CassandraConfig initialized) {
config = initialized;
}
};
}

@Override
public void prepare() throws ServiceNotProvidedException, ModuleStartException {
this.queryDAO = new ZipkinDependencyCassandraQueryDAO();
this.registerServiceImplementation(IZipkinDependencyQueryDAO.class, queryDAO);
}

@Override
public void start() throws ServiceNotProvidedException, ModuleStartException {
}

@Override
public void notifyAfterCompleted() throws ServiceNotProvidedException, ModuleStartException {
CassandraProvider provider =
(CassandraProvider) getManager().find(StorageModule.NAME).provider();
try {
Field field = CassandraProvider.class.getDeclaredField("client");
field.setAccessible(true);
CassandraClient client = (CassandraClient) field.get(provider);
queryDAO.setClient(client);
} catch (NoSuchFieldException | IllegalAccessException e) {
throw new ModuleStartException("Failed to get CassandraStorageClient.", e);
}
}

@Override
public String[] requiredModules() {
return new String[] {CoreModule.NAME, StorageModule.NAME};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
#

zipkin.server.dependency.storage.cassandra.ZipkinDependencyCassandraStorageProvider

0 comments on commit 137ab8d

Please sign in to comment.