Skip to content

Commit

Permalink
Fix issues
Browse files Browse the repository at this point in the history
  • Loading branch information
mrproliu committed Oct 17, 2023
1 parent e4b59c9 commit 6da0976
Show file tree
Hide file tree
Showing 21 changed files with 2,506 additions and 703 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ public void test() throws Exception {
}

int responseCode = connection.getResponseCode();
if (responseCode != HttpURLConnection.HTTP_OK) { // success
if (responseCode != HttpURLConnection.HTTP_ACCEPTED) { // success
BufferedReader in = new BufferedReader(new InputStreamReader(connection.getInputStream()));
String inputLine;
StringBuffer response = new StringBuffer();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,8 @@ storage:
# Will throw an exception on startup if authentication fails.
username: ${ZIPKIN_STORAGE_CASSANDRA_USERNAME:}
password: ${ZIPKIN_STORAGE_CASSANDRA_PASSWORD:}
# Ensuring that schema exists, if enabled tries to execute script /zipkin-server/storage-cassandra/resources/zipkin-schemas.cql.
ensureSchema: ${ZIPKIN_STORAGE_CASSANDRA_ENSURE_SCHEMA:true}
# Max pooled connections per datacenter-local host.
maxConnections: ${ZIPKIN_STORAGE_CASSANDRA_MAX_CONNECTIONS:8}
# Using ssl for connection, rely on Keystore
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

package zipkin.server.storage.cassandra;

import com.datastax.oss.driver.api.core.CqlIdentifier;
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.auth.AuthProvider;
import com.datastax.oss.driver.api.core.config.DriverOption;
Expand All @@ -22,6 +23,7 @@
import com.datastax.oss.driver.api.core.cql.ResultSet;
import com.datastax.oss.driver.api.core.cql.Row;
import com.datastax.oss.driver.api.core.metadata.schema.KeyspaceMetadata;
import com.datastax.oss.driver.api.core.metadata.schema.TableMetadata;
import com.datastax.oss.driver.internal.core.auth.ProgrammaticPlainTextAuthProvider;
import org.apache.skywalking.oap.server.library.client.Client;
import org.apache.skywalking.oap.server.library.client.healthcheck.DelegatedHealthChecker;
Expand All @@ -36,6 +38,7 @@
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletionStage;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
Expand Down Expand Up @@ -66,6 +69,13 @@ public CqlSession getSession() {
return cqlSession;
}

public int getDefaultTtl(String table) {
return (int) getMetadata().getTable(table)
.map(TableMetadata::getOptions)
.flatMap(o -> Optional.ofNullable(o.get(CqlIdentifier.fromCql("default_time_to_live"))))
.orElse(0);
}

public <T> List<T> executeQuery(String cql, ResultHandler<T> resultHandler, Object... params) {
if (LOG.isDebugEnabled()) {
LOG.debug("Executing CQL: {}", cql);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ public class CassandraConfig extends ModuleConfig {
private String username;
private String password;

private boolean ensureSchema = true;

/**
* The maximum size of batch size of CQL execution
*/
Expand Down Expand Up @@ -106,4 +108,12 @@ public int getAsyncBatchPersistentPoolSize() {
public void setAsyncBatchPersistentPoolSize(int asyncBatchPersistentPoolSize) {
this.asyncBatchPersistentPoolSize = asyncBatchPersistentPoolSize;
}

public boolean getEnsureSchema() {
return ensureSchema;
}

public void setEnsureSchema(boolean ensureSchema) {
this.ensureSchema = ensureSchema;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,6 @@
import zipkin.server.storage.cassandra.dao.CassandraZipkinQueryDAO;
import zipkin.server.storage.cassandra.dao.EmptyDAO;

import java.time.Clock;

public class CassandraProvider extends ModuleProvider {
private CassandraConfig moduleConfig;
private CassandraClient client;
Expand Down Expand Up @@ -98,7 +96,7 @@ public void onInitialized(CassandraConfig initialized) {
@Override
public void prepare() throws ServiceNotProvidedException, ModuleStartException {
client = new CassandraClient(moduleConfig);
modelInstaller = new CassandraTableInstaller(client, getManager());
modelInstaller = new CassandraTableInstaller(client, moduleConfig);
tableHelper = new CassandraTableHelper(getManager(), client);

this.registerServiceImplementation(
Expand Down Expand Up @@ -137,9 +135,9 @@ public void prepare() throws ServiceNotProvidedException, ModuleStartException {
this.registerServiceImplementation(IServiceLabelDAO.class, emptyDAO);
this.registerServiceImplementation(ISpanAttachedEventQueryDAO.class, new CassandraSpanAttachedEventRecordDAO());

this.registerServiceImplementation(IHistoryDeleteDAO.class, new CassandraHistoryDeleteDAO(client, tableHelper, modelInstaller, Clock.systemDefaultZone()));
this.registerServiceImplementation(IHistoryDeleteDAO.class, new CassandraHistoryDeleteDAO());
this.registerServiceImplementation(IZipkinQueryDAO.class, new CassandraZipkinQueryDAO(client, tableHelper));
this.registerServiceImplementation(ITagAutoCompleteQueryDAO.class, new CassandraTagAutocompleteDAO(client, tableHelper));
this.registerServiceImplementation(ITagAutoCompleteQueryDAO.class, new CassandraTagAutocompleteDAO(client, tableHelper, getManager()));

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,101 +14,26 @@

package zipkin.server.storage.cassandra;

import com.datastax.oss.driver.api.core.metadata.schema.KeyspaceMetadata;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.analysis.DownSampling;
import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
import org.apache.skywalking.oap.server.core.config.ConfigService;
import org.apache.skywalking.oap.server.core.storage.model.Model;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.TableMetaInfo;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.common.TableHelper;

import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.LongStream;

import static java.util.stream.Collectors.toList;

public class CassandraTableHelper extends TableHelper {
private ModuleManager moduleManager;
private final CassandraClient client;

private final LoadingCache<String, Boolean> tableExistence =
CacheBuilder.newBuilder()
.expireAfterWrite(Duration.ofMinutes(10))
.build(new CacheLoader<String, Boolean>() {
@Override
public Boolean load(String tableName) throws Exception {
final KeyspaceMetadata metadata = client.getMetadata();
return metadata != null && metadata.getTable(tableName).isPresent();
}
});

public CassandraTableHelper(ModuleManager moduleManager, CassandraClient client) {
super(moduleManager, null);
this.moduleManager = moduleManager;
this.client = client;
}

public List<String> getTablesForRead(String modelName, long timeBucketStart, long timeBucketEnd) {
final Model model = TableMetaInfo.get(modelName);
final String rawTableName = getTableName(model);

if (!model.isTimeSeries()) {
return Collections.singletonList(rawTableName);
}

final List<String> ttlTables = getTablesWithinTTL(modelName);
return getTablesInTimeBucketRange(modelName, timeBucketStart, timeBucketEnd)
.stream()
.filter(ttlTables::contains)
.filter(table -> {
try {
return tableExistence.get(table);
} catch (Exception e) {
throw new RuntimeException(e);
}
})
.collect(toList());
}

public List<String> getTablesWithinTTL(String modelName) {
public String getTableForRead(String modelName) {
final Model model = TableMetaInfo.get(modelName);
final String rawTableName = getTableName(model);

if (!model.isTimeSeries()) {
return Collections.singletonList(rawTableName);
}

final List<Long> ttlTimeBuckets = getTTLTimeBuckets(model);
return ttlTimeBuckets
.stream()
.map(it -> getTable(rawTableName, it))
.filter(table -> {
try {
return tableExistence.get(table);
} catch (Exception e) {
throw new RuntimeException(e);
}
})
.collect(toList());
}

List<Long> getTTLTimeBuckets(Model model) {
final int ttl = model.isRecord() ?
getConfigService().getRecordDataTTL() :
getConfigService().getMetricsDataTTL();
return LongStream
.rangeClosed(0, ttl)
.mapToObj(it -> TimeBucket.getTimeBucket(System.currentTimeMillis() - TimeUnit.DAYS.toMillis(it), DownSampling.Day))
.distinct()
.collect(toList());
return getTableName(model);
}

ConfigService getConfigService() {
Expand Down
Loading

0 comments on commit 6da0976

Please sign in to comment.