Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Cassandra Storage in v3 #3570

Merged
merged 11 commits into from
Oct 30, 2023
1 change: 1 addition & 0 deletions .github/workflows/test-v3.yml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ jobs:
- name: receiver-zipkin-kafka
- name: receiver-zipkin-rabbitmq
- name: receiver-zipkin-scribe
- name: storage-cassandra
steps:
- name: Checkout Repository
uses: actions/checkout@v2
Expand Down
1 change: 1 addition & 0 deletions zipkin-server/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
<module>receiver-zipkin-activemq</module>
<module>receiver-zipkin-rabbitmq</module>
<module>receiver-zipkin-scribe</module>
<module>storage-cassandra</module>
</modules>

<dependencyManagement>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2019 The OpenZipkin Authors
* Copyright 2015-2023 The OpenZipkin Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
Expand Down Expand Up @@ -64,6 +64,16 @@ public class CoreModuleConfig extends ModuleConfig {
*/
private int traceSampleRate = 10000;

/**
* The number of threads used to prepare metrics data to the storage.
*/
private int prepareThreads = 2;

/**
* The period of doing data persistence. Unit is second.
*/
private int persistentPeriod = 25;

private static final String DEFAULT_SEARCHABLE_TAG_KEYS = String.join(
Const.COMMA,
"http.method"
Expand All @@ -72,6 +82,8 @@ public class CoreModuleConfig extends ModuleConfig {
public org.apache.skywalking.oap.server.core.CoreModuleConfig toSkyWalkingConfig() {
final org.apache.skywalking.oap.server.core.CoreModuleConfig result = new org.apache.skywalking.oap.server.core.CoreModuleConfig();
result.setServiceCacheRefreshInterval(serviceCacheRefreshInterval);
result.setPrepareThreads(prepareThreads);
result.setPersistentPeriod(persistentPeriod);
return result;
}

Expand Down Expand Up @@ -154,4 +166,20 @@ public int getTraceSampleRate() {
public void setTraceSampleRate(int traceSampleRate) {
this.traceSampleRate = traceSampleRate;
}

public int getPrepareThreads() {
return prepareThreads;
}

public void setPrepareThreads(int prepareThreads) {
this.prepareThreads = prepareThreads;
}

public int getPersistentPeriod() {
return persistentPeriod;
}

public void setPersistentPeriod(int persistentPeriod) {
this.persistentPeriod = persistentPeriod;
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2019 The OpenZipkin Authors
* Copyright 2015-2023 The OpenZipkin Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
Expand All @@ -16,6 +16,7 @@
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.analysis.meter.MeterSystem;
import org.apache.skywalking.oap.server.core.analysis.worker.MetricsStreamProcessor;
import org.apache.skywalking.oap.server.core.annotation.AnnotationScan;
import org.apache.skywalking.oap.server.core.cache.NetworkAddressAliasCache;
import org.apache.skywalking.oap.server.core.cache.ProfileTaskCache;
import org.apache.skywalking.oap.server.core.command.CommandService;
Expand Down Expand Up @@ -50,13 +51,17 @@
import org.apache.skywalking.oap.server.core.remote.client.RemoteClientManager;
import org.apache.skywalking.oap.server.core.server.GRPCHandlerRegister;
import org.apache.skywalking.oap.server.core.server.HTTPHandlerRegister;
import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine;
import org.apache.skywalking.oap.server.core.source.SourceReceiver;
import org.apache.skywalking.oap.server.core.source.SourceReceiverImpl;
import org.apache.skywalking.oap.server.core.status.ServerStatusService;
import org.apache.skywalking.oap.server.core.storage.PersistenceTimer;
import org.apache.skywalking.oap.server.core.storage.StorageException;
import org.apache.skywalking.oap.server.core.storage.model.IModelManager;
import org.apache.skywalking.oap.server.core.storage.model.ModelCreator;
import org.apache.skywalking.oap.server.core.storage.model.ModelManipulator;
import org.apache.skywalking.oap.server.core.storage.model.StorageModels;
import org.apache.skywalking.oap.server.core.storage.ttl.DataTTLKeeperTimer;
import org.apache.skywalking.oap.server.core.worker.IWorkerInstanceGetter;
import org.apache.skywalking.oap.server.core.worker.IWorkerInstanceSetter;
import org.apache.skywalking.oap.server.core.worker.WorkerInstancesService;
Expand All @@ -69,19 +74,23 @@
import zipkin.server.core.services.EmptyGRPCHandlerRegister;
import zipkin.server.core.services.EmptyHTTPHandlerRegister;
import zipkin.server.core.services.EmptyNetworkAddressAliasCache;
import zipkin.server.core.services.SelfSenderService;
import zipkin.server.core.services.ZipkinConfigService;

import java.io.IOException;
import java.util.Collections;

public class CoreModuleProvider extends ModuleProvider {
private CoreModuleConfig moduleConfig;

private EndpointNameGrouping endpointNameGrouping;
private final SourceReceiverImpl receiver;
private final ZipkinSourceReceiverImpl receiver;
private final AnnotationScan annotationScan;
private final StorageModels storageModels;

public CoreModuleProvider() {
this.receiver = new SourceReceiverImpl();
this.annotationScan = new AnnotationScan();
this.receiver = new ZipkinSourceReceiverImpl();
this.storageModels = new StorageModels();
}

Expand Down Expand Up @@ -121,6 +130,16 @@ public void prepare() throws ServiceNotProvidedException, ModuleStartException {
);
this.registerServiceImplementation(NamingControl.class, namingControl);

annotationScan.registerListener(new ZipkinStreamAnnotationListener(getManager()));

AnnotationScan scopeScan = new AnnotationScan();
scopeScan.registerListener(new DefaultScopeDefine.Listener());
try {
scopeScan.scan();
} catch (Exception e) {
throw new ModuleStartException(e.getMessage(), e);
}

final org.apache.skywalking.oap.server.core.CoreModuleConfig swConfig = this.moduleConfig.toSkyWalkingConfig();
this.registerServiceImplementation(MeterSystem.class, new MeterSystem(getManager()));
this.registerServiceImplementation(ConfigService.class, new ZipkinConfigService(moduleConfig, this));
Expand All @@ -133,8 +152,8 @@ public void prepare() throws ServiceNotProvidedException, ModuleStartException {
final WorkerInstancesService instancesService = new WorkerInstancesService();
this.registerServiceImplementation(IWorkerInstanceGetter.class, instancesService);
this.registerServiceImplementation(IWorkerInstanceSetter.class, instancesService);
this.registerServiceImplementation(RemoteSenderService.class, new RemoteSenderService(getManager()));
this.registerServiceImplementation(RemoteSenderService.class, new RemoteSenderService(getManager()));
// no cluster mode for zipkin, for sending the streaming data to the local
this.registerServiceImplementation(RemoteSenderService.class, new SelfSenderService(getManager()));
mrproliu marked this conversation as resolved.
Show resolved Hide resolved
this.registerServiceImplementation(ModelCreator.class, storageModels);
this.registerServiceImplementation(IModelManager.class, storageModels);
this.registerServiceImplementation(ModelManipulator.class, storageModels);
Expand Down Expand Up @@ -182,12 +201,19 @@ public void prepare() throws ServiceNotProvidedException, ModuleStartException {

@Override
public void start() throws ServiceNotProvidedException, ModuleStartException {

try {
receiver.scan();
annotationScan.scan();
} catch (IOException | IllegalAccessException | InstantiationException | StorageException e) {
throw new ModuleStartException(e.getMessage(), e);
}
}

@Override
public void notifyAfterCompleted() throws ServiceNotProvidedException, ModuleStartException {

final org.apache.skywalking.oap.server.core.CoreModuleConfig swConfig = this.moduleConfig.toSkyWalkingConfig();
PersistenceTimer.INSTANCE.start(getManager(), swConfig);
DataTTLKeeperTimer.INSTANCE.start(getManager(), swConfig);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Copyright 2015-2023 The OpenZipkin Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/

package zipkin.server.core;

import org.apache.skywalking.oap.server.core.analysis.DispatcherManager;
import org.apache.skywalking.oap.server.core.analysis.manual.searchtag.TagAutocompleteDispatcher;

public class ZipkinDispatcherManager extends DispatcherManager {

@Override
public void addIfAsSourceDispatcher(Class aClass) throws IllegalAccessException, InstantiationException {
if (aClass.getSimpleName().startsWith("Zipkin") || aClass.equals(TagAutocompleteDispatcher.class)) {
super.addIfAsSourceDispatcher(aClass);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright 2015-2023 The OpenZipkin Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/

package zipkin.server.core;

import org.apache.skywalking.oap.server.core.analysis.DispatcherDetectorListener;
import org.apache.skywalking.oap.server.core.source.ISource;
import org.apache.skywalking.oap.server.core.source.SourceReceiver;

import java.io.IOException;

public class ZipkinSourceReceiverImpl implements SourceReceiver {
private final ZipkinDispatcherManager mgr;

public ZipkinSourceReceiverImpl() {
mgr = new ZipkinDispatcherManager();
}

@Override
public void receive(ISource source) {
mgr.forward(source);
}

@Override
public DispatcherDetectorListener getDispatcherDetectorListener() {
return mgr;
}

public void scan() throws IOException, IllegalAccessException, InstantiationException {
mgr.scan();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright 2015-2023 The OpenZipkin Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/

package zipkin.server.core;

import org.apache.skywalking.oap.server.core.analysis.StreamAnnotationListener;
import org.apache.skywalking.oap.server.core.analysis.manual.searchtag.TagAutocompleteData;
import org.apache.skywalking.oap.server.core.storage.StorageException;
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;

public class ZipkinStreamAnnotationListener extends StreamAnnotationListener {

public ZipkinStreamAnnotationListener(ModuleDefineHolder moduleDefineHolder) {
super(moduleDefineHolder);
}

@Override
public void notify(Class aClass) throws StorageException {
// only including all zipkin streaming
if (aClass.getSimpleName().startsWith("Zipkin") || aClass.equals(TagAutocompleteData.class)) {
super.notify(aClass);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright 2015-2023 The OpenZipkin Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/

package zipkin.server.core.services;

import org.apache.skywalking.oap.server.core.remote.RemoteSenderService;
import org.apache.skywalking.oap.server.core.remote.client.Address;
import org.apache.skywalking.oap.server.core.remote.client.SelfRemoteClient;
import org.apache.skywalking.oap.server.core.remote.data.StreamData;
import org.apache.skywalking.oap.server.core.remote.selector.Selector;
import org.apache.skywalking.oap.server.library.module.ModuleManager;

public class SelfSenderService extends RemoteSenderService {
private final ModuleManager moduleManager;
private SelfRemoteClient self;

public SelfSenderService(ModuleManager moduleManager) {
super(moduleManager);
this.moduleManager = moduleManager;
}

private SelfRemoteClient getSelf() {
if (self == null) {
self = new SelfRemoteClient(moduleManager, new Address("127.0.0.1", 0, true));
}
return self;
}

@Override
public void send(String nextWorkName, StreamData streamData, Selector selector) {
getSelf().push(nextWorkName, streamData);
}
}
5 changes: 5 additions & 0 deletions zipkin-server/server-starter/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,11 @@
<artifactId>storage-elasticsearch-plugin</artifactId>
<version>${skywalking.version}</version>
</dependency>
<dependency>
<groupId>io.zipkin</groupId>
<artifactId>storage-cassandra</artifactId>
<version>${project.version}</version>
</dependency>

<!-- zipkin receiver -->
<dependency>
Expand Down
19 changes: 19 additions & 0 deletions zipkin-server/server-starter/src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ core:
searchableTracesTags: ${ZIPKIN_SEARCHABLE_TAG_KEYS:http.method}
# The trace sample rate precision is 1/10000, should be between 0 and 10000
traceSampleRate: ${ZIPKIN_SAMPLE_RATE:10000}
# The number of threads used to prepare metrics data to the storage.
prepareThreads: ${ZIPKIN_PREPARE_THREADS:2}
# The period of doing data persistence. Unit is second.Default value is 25s
persistentPeriod: ${ZIPKIN_PERSISTENT_PERIOD:25}

storage:
selector: ${ZIPKIN_STORAGE:h2}
Expand Down Expand Up @@ -125,6 +129,21 @@ storage:
superDatasetBlockIntervalHours: ${ZIPKIN_STORAGE_BANYANDB_SUPER_DATASET_BLOCK_INTERVAL_HOURS:4} # Unit is hour
superDatasetSegmentIntervalDays: ${ZIPKIN_STORAGE_BANYANDB_SUPER_DATASET_SEGMENT_INTERVAL_DAYS:1} # Unit is day
specificGroupSettings: ${ZIPKIN_STORAGE_BANYANDB_SPECIFIC_GROUP_SETTINGS:""} # For example, {"group1": {"blockIntervalHours": 4, "segmentIntervalDays": 1}}
cassandra:
keyspace: ${ZIPKIN_STORAGE_CASSANDRA_KEYSPACE:zipkin}
# Comma separated list of host addresses part of Cassandra cluster. Ports default to 9042 but you can also specify a custom port with 'host:port'.
contactPoints: ${ZIPKIN_STORAGE_CASSANDRA_CONTACT_POINTS:localhost}
# Name of the datacenter that will be considered "local" for load balancing.
localDc: ${ZIPKIN_STORAGE_CASSANDRA_LOCAL_DC:datacenter1}
# Will throw an exception on startup if authentication fails.
username: ${ZIPKIN_STORAGE_CASSANDRA_USERNAME:}
password: ${ZIPKIN_STORAGE_CASSANDRA_PASSWORD:}
# Max pooled connections per datacenter-local host.
maxConnections: ${ZIPKIN_STORAGE_CASSANDRA_MAX_CONNECTIONS:8}
# Using ssl for connection, rely on Keystore
use-ssl: ${ZIPKIN_STORAGE_CASSANDRA_USE_SSL:false}
maxSizeOfBatchCql: ${ZIPKIN_STORAGE_CASSANDRA_MAX_SIZE_OF_BATCH_CQL:2000}
asyncBatchPersistentPoolSize: ${ZIPKIN_STORAGE_CASSANDRA_ASYNC_BATCH_PERSISTENT_POOL_SIZE:4}

receiver-zipkin-http:
selector: ${ZIPKIN_RECEIVER_ZIPKIN_HTTP:default}
Expand Down
Loading