Skip to content

Commit

Permalink
Enhance the performance and storage spze of the cassandra storage (#3584
Browse files Browse the repository at this point in the history
)
  • Loading branch information
mrproliu authored Nov 7, 2023
1 parent 3c23205 commit 44b8c06
Show file tree
Hide file tree
Showing 15 changed files with 582 additions and 149 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@
import com.datastax.oss.driver.api.core.config.DriverOption;
import com.datastax.oss.driver.api.core.cql.AsyncResultSet;
import com.datastax.oss.driver.api.core.cql.BoundStatement;
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
import com.datastax.oss.driver.api.core.cql.ResultSet;
import com.datastax.oss.driver.api.core.cql.Row;
import com.datastax.oss.driver.api.core.cql.Statement;
import com.datastax.oss.driver.api.core.metadata.schema.KeyspaceMetadata;
import com.datastax.oss.driver.api.core.metadata.schema.TableMetadata;
import com.datastax.oss.driver.internal.core.auth.ProgrammaticPlainTextAuthProvider;
Expand Down Expand Up @@ -77,11 +79,23 @@ public int getDefaultTtl(String table) {
}

public <T> List<T> executeQuery(String cql, ResultHandler<T> resultHandler, Object... params) {
return executeQuery(cqlSession.prepare(cql), resultHandler, params);
}

public <T> CompletionStage<List<T>> executeAsyncQuery(String cql, ResultHandler<T> resultHandler, Object... params) {
return executeAsyncQuery(cqlSession.prepare(cql), resultHandler, params);
}

public PreparedStatement prepare(String cql) {
return cqlSession.prepare(cql);
}

public <T> List<T> executeQuery(PreparedStatement statement, ResultHandler<T> resultHandler, Object... params) {
if (LOG.isDebugEnabled()) {
LOG.debug("Executing CQL: {}", cql);
LOG.debug("Executing CQL: {}", statement.getQuery());
LOG.debug("CQL parameters: {}", Arrays.toString(params));
}
final BoundStatement stmt = cqlSession.prepare(cql).bind(params);
final BoundStatement stmt = statement.bind(params);
final ResultSet resultSet = cqlSession.execute(stmt);
healthChecker.health();
if (resultHandler != null) {
Expand All @@ -91,12 +105,23 @@ public <T> List<T> executeQuery(String cql, ResultHandler<T> resultHandler, Obje
return null;
}

public <T> CompletionStage<List<T>> executeAsyncQuery(String cql, ResultHandler<T> resultHandler, Object... params) {
public <T> CompletionStage<List<T>> executeAsyncQuery(PreparedStatement statement, ResultHandler<T> resultHandler, Object... params) {
if (LOG.isDebugEnabled()) {
LOG.debug("Executing CQL: {}", cql);
LOG.debug("Executing CQL: {}", statement.getQuery());
LOG.debug("CQL parameters: {}", Arrays.toString(params));
}
final BoundStatement stmt = cqlSession.prepare(cql).bind(params);
final BoundStatement stmt = statement.bind(params);
return executeAsyncQuery0(stmt, resultHandler);
}

public <T> CompletionStage<List<T>> executeAsyncQueryWithCustomBind(PreparedStatement original, Statement statement, ResultHandler<T> resultHandler) {
if (LOG.isDebugEnabled()) {
LOG.debug("Executing Custom Bind CQL: {}", original.getQuery());
}
return executeAsyncQuery0(statement, resultHandler);
}

private <T> CompletionStage<List<T>> executeAsyncQuery0(Statement stmt, ResultHandler<T> resultHandler) {
final CompletionStage<AsyncResultSet> resultSet = cqlSession.executeAsync(stmt);
healthChecker.health();
if (resultHandler != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,13 @@

package zipkin.server.storage.cassandra;

import org.apache.skywalking.oap.server.core.storage.StorageData;
import org.apache.skywalking.oap.server.core.storage.StorageException;
import org.apache.skywalking.oap.server.core.storage.model.Model;
import org.apache.skywalking.oap.server.core.storage.model.ModelCreator;
import org.apache.skywalking.oap.server.core.zipkin.ZipkinSpanRecord;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.TableMetaInfo;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.common.JDBCTableInstaller;

public class CassandraTableInstaller implements ModelCreator.CreatingListener {
private final CassandraClient client;
Expand All @@ -43,6 +46,17 @@ public void start() {

@Override
public void whenCreating(Model model) throws StorageException {
if (ZipkinSpanRecord.INDEX_NAME.equals(model.getName())) {
// remove unnecessary columns
for (int i = model.getColumns().size() - 1; i >= 0; i--) {
final String columnName = model.getColumns().get(i).getColumnName().getStorageName();
if (StorageData.TIME_BUCKET.equals(columnName) ||
ZipkinSpanRecord.TIMESTAMP_MILLIS.equals(columnName) ||
JDBCTableInstaller.TABLE_COLUMN.equals(columnName)) {
model.getColumns().remove(i);
}
}
}
TableMetaInfo.addModel(model);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package zipkin.server.storage.cassandra.dao;

import com.datastax.oss.driver.api.core.cql.BoundStatement;
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
import org.apache.skywalking.oap.server.core.storage.IBatchDAO;
import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
import org.apache.skywalking.oap.server.library.client.request.PrepareRequest;
Expand All @@ -29,6 +30,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;

public class CassandraBatchDAO implements IBatchDAO {
static final Logger LOG = LoggerFactory.getLogger(CassandraBatchDAO.class);
Expand All @@ -37,6 +39,8 @@ public class CassandraBatchDAO implements IBatchDAO {
private final DataCarrier<PrepareRequest> dataCarrier;
private final int maxBatchSqlSize;

private static final ConcurrentHashMap<String, PreparedStatement> PREPARED_STATEMENT_MAP = new ConcurrentHashMap<>();

public CassandraBatchDAO(CassandraClient client, int maxBatchCqlSize, int asyncBatchPersistentPoolSize) {
this.client = client;
String name = "CASSANDRA_ASYNCHRONOUS_BATCH_PERSISTENT";
Expand Down Expand Up @@ -76,7 +80,7 @@ public CompletableFuture<Void> flush(List<PrepareRequest> prepareRequests) {
}

final long start = System.currentTimeMillis();
boolean success = true;
final boolean isInsert = cqls.get(0) instanceof InsertRequest;
try {
for (PrepareRequest cql : cqls) {
final CQLExecutor executor = (CQLExecutor) cql;
Expand All @@ -85,28 +89,26 @@ public CompletableFuture<Void> flush(List<PrepareRequest> prepareRequests) {
LOG.debug("CQL parameters: {}", executor.getParams());
}

final BoundStatement stmt = client.getSession().prepare(executor.getCql())
.bind(((CQLExecutor) cql).getParams().toArray());
client.getSession().execute(stmt);
final PreparedStatement stmt = PREPARED_STATEMENT_MAP.computeIfAbsent(executor.getCql(), e -> client.getSession().prepare(e));

final BoundStatement boundStatement = stmt.bind(((CQLExecutor) cql).getParams().toArray());
client.getSession().executeAsync(boundStatement).thenAccept(result -> {
if (isInsert) {
((InsertRequest) executor).onInsertCompleted();
} else if (!result.wasApplied()) {
((UpdateRequest) executor).onUpdateFailure();
};
});
}
} catch (Exception e) {
// Just to avoid one execution failure makes the rest of batch failure.
LOG.error(e.getMessage(), e);
success = false;
}

final boolean isInsert = cqls.get(0) instanceof InsertRequest;
for (PrepareRequest executor : cqls) {
if (isInsert) {
((InsertRequest) executor).onInsertCompleted();
} else if (!success) {
((UpdateRequest) executor).onUpdateFailure();
}
}
if (LOG.isDebugEnabled()) {
long end = System.currentTimeMillis();
long cost = end - start;
LOG.debug("execute sql statements done, data size: {}, maxBatchSqlSize: {}, cost:{}ms", prepareRequests.size(), maxBatchSqlSize, cost);
LOG.debug("execute sync sql statements done, data size: {}, maxBatchSqlSize: {}, cost:{}ms", prepareRequests.size(), maxBatchSqlSize, cost);
}
return CompletableFuture.completedFuture(null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

package zipkin.server.storage.cassandra.dao;

import com.datastax.oss.driver.api.core.CqlIdentifier;
import com.datastax.oss.driver.api.core.cql.Row;
import org.apache.skywalking.oap.server.core.storage.SessionCacheCallback;
import org.apache.skywalking.oap.server.core.storage.StorageData;
Expand All @@ -35,6 +34,8 @@
import zipkin.server.storage.cassandra.CassandraClient;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -137,10 +138,6 @@ private <T extends StorageData> List<CQLExecutor> buildAdditionalInsertExecutor(
List<Object> param = new ArrayList<>();
final StringBuilder cqlBuilder = new StringBuilder("INSERT INTO ").append(tableName);

columnNames.add(ID_COLUMN);
values.add("?");
param.add(TableHelper.generateId(model, metrics.id().build()));

int position = 0;
List valueList = new ArrayList();
for (int i = 0; i < columns.size(); i++) {
Expand Down Expand Up @@ -172,7 +169,7 @@ private <T extends StorageData> List<CQLExecutor> buildAdditionalInsertExecutor(
if (!CollectionUtils.isEmpty(valueList)) {
for (Object object : valueList) {
List<Object> paramCopy = new ArrayList<>(param);
paramCopy.set(position, object);
paramCopy.set(position - 1, object);
sqlExecutors.add(new CQLExecutor(sql, paramCopy, callback, null));
}
} else {
Expand All @@ -196,7 +193,7 @@ private <T extends StorageData> CQLExecutor buildInsertExecutor(Model model,
final List<ModelColumn> columns = model.getColumns();
final List<String> columnNames =
Stream.concat(
Stream.of(ID_COLUMN, JDBCTableInstaller.TABLE_COLUMN),
(model.isRecord() ? Collections.<String>emptyList() : Arrays.asList(ID_COLUMN, JDBCTableInstaller.TABLE_COLUMN)).stream(),
columns
.stream()
.map(ModelColumn::getColumnName)
Expand All @@ -210,7 +207,7 @@ private <T extends StorageData> CQLExecutor buildInsertExecutor(Model model,
cqlBuilder.append(columnNames.stream().map(it -> "?").collect(Collectors.joining(",", "(", ")")));

final List<Object> params = Stream.concat(
Stream.of(TableHelper.generateId(model, metrics.id().build()), model.getName()),
(model.isRecord() ? Collections.<String>emptyList() : Arrays.asList(TableHelper.generateId(model, metrics.id().build()), model.getName())).stream(),
columns
.stream()
.map(ModelColumn::getColumnName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public static List<CQLExecutor> buildExtensionsForSpan(ZipkinSpanRecord span, Se

public static int durationIndexBucket(long ts_milli) {
// if the window constant has microsecond precision, the division produces negative getValues
return (int) (ts_milli / (DURATION_INDEX_BUCKET_WINDOW_SECONDS));
return (int) (ts_milli / (DURATION_INDEX_BUCKET_WINDOW_SECONDS)) / 1000;
}

private static CQLExecutor buildServiceSpan(String service, String span, int bucket, UUID ts, String trace_id, long durationMillis,
Expand Down
Loading

0 comments on commit 44b8c06

Please sign in to comment.