Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support transactions only on given partitions #299

Merged
merged 2 commits into from
Dec 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions benchmarks/src/com/aerospike/benchmarks/Arguments.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
package com.aerospike.benchmarks;

import java.util.Set;

import com.aerospike.client.Bin;
import com.aerospike.client.Value;
import com.aerospike.client.policy.BatchPolicy;
Expand All @@ -40,6 +42,7 @@ public class Arguments {
public int readMultiBinPct;
public int writeMultiBinPct;
public int throughput;
public Set<Integer> partitionIds;
public long transactionLimit;
public boolean reportNotFound;
public boolean debug;
Expand Down
21 changes: 19 additions & 2 deletions benchmarks/src/com/aerospike/benchmarks/InsertTaskSync.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,14 @@
*/
package com.aerospike.benchmarks;

import java.util.HashMap;
import java.util.Map;

import com.aerospike.client.AerospikeException;
import com.aerospike.client.Bin;
import com.aerospike.client.IAerospikeClient;
import com.aerospike.client.Key;
import com.aerospike.client.cluster.Partition;
import com.aerospike.client.util.RandomShift;
import com.aerospike.client.util.Util;

Expand Down Expand Up @@ -83,14 +87,27 @@ private void runCommand(long keyCurrent, RandomShift random) {
private void put(Key key, Bin[] bins) {
if (counters.write.latency != null) {
long begin = System.nanoTime();
client.put(args.writePolicy, key, bins);

if (! skipKey(key)) {
client.put(args.writePolicy, key, bins);
}

long elapsed = System.nanoTime() - begin;
counters.write.count.getAndIncrement();
counters.write.latency.add(elapsed);
}
else {
client.put(args.writePolicy, key, bins);
if (! skipKey(key)) {
client.put(args.writePolicy, key, bins);
}
counters.write.count.getAndIncrement();
}
}

private boolean skipKey(Key key) {
if (args.partitionIds != null && !args.partitionIds.contains(Partition.getPartitionId(key.digest))) {
return true;
}
return false;
}
}
34 changes: 33 additions & 1 deletion benchmarks/src/com/aerospike/benchmarks/Main.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,11 @@
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.Arrays;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

Expand All @@ -47,6 +50,7 @@
import com.aerospike.client.async.EventPolicy;
import com.aerospike.client.async.NettyEventLoops;
import com.aerospike.client.async.NioEventLoops;
import com.aerospike.client.cluster.Node;
import com.aerospike.client.command.BatchNode;
import com.aerospike.client.command.BatchNodeList;
import com.aerospike.client.command.BatchStatus;
Expand Down Expand Up @@ -351,6 +355,8 @@ public Main(String[] commandLineArgs) throws Exception {
options.addOption("ufn", "udfFunctionName", true, "Specify the udf function name that must be used in the udf benchmarks");
options.addOption("ufv","udfFunctionValues",true, "The udf argument values comma separated");
options.addOption("sendKey", false, "Send key to server");

options.addOption("pids", "partitionIds", true, "Specify the list of comma seperated partition IDs the primary keys must belong to");

// parse the command line arguments
CommandLineParser parser = new DefaultParser();
Expand Down Expand Up @@ -984,6 +990,31 @@ else if (! level.equals("all")) {
if (line.hasOption("sendKey")) {
args.writePolicy.sendKey = true;
}

if (line.hasOption("partitionIds")) {
String[] pids = line.getOptionValue("partitionIds").split(",");

Set<Integer> partitionIds = new HashSet<>();

for (String pid : pids) {
int partitionId = -1;

try {
partitionId = Integer.parseInt(pid);
}
catch(NumberFormatException nfe) {
throw new Exception("Partition ID has to be an integer");
}

if (partitionId < 0 || partitionId >= Node.PARTITIONS) {
throw new Exception("Partition ID has to be a value between 0 and " + Node.PARTITIONS);
}

partitionIds.add(partitionId);
}

args.partitionIds = partitionIds;
}

System.out.println("Benchmark: " + this.hosts[0]
+ ", namespace: " + args.namespace
Expand All @@ -1006,7 +1037,8 @@ else if (! level.equals("all")) {
+ ", transactions: " + args.transactionLimit
+ ", bins: " + args.nBins
+ ", random values: " + (args.fixedBins == null)
+ ", throughput: " + (args.throughput == 0 ? "unlimited" : (args.throughput + " tps")));
+ ", throughput: " + (args.throughput == 0 ? "unlimited" : (args.throughput + " tps"))
+ ", partitions: " + (args.partitionIds == null ? "all" : args.partitionIds.toString()));

System.out.println("client policy:");
System.out.println(
Expand Down
57 changes: 57 additions & 0 deletions benchmarks/src/com/aerospike/benchmarks/RWTaskSync.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,15 @@
*/
package com.aerospike.benchmarks;

import java.util.ArrayList;
import java.util.List;

import com.aerospike.client.Bin;
import com.aerospike.client.IAerospikeClient;
import com.aerospike.client.Key;
import com.aerospike.client.Record;
import com.aerospike.client.Value;
import com.aerospike.client.cluster.Partition;
import com.aerospike.client.policy.WritePolicy;
import com.aerospike.client.util.RandomShift;
import com.aerospike.client.util.Util;
Expand Down Expand Up @@ -67,6 +71,11 @@ public void run() {

@Override
protected void put(WritePolicy writePolicy, Key key, Bin[] bins) {
if (skipKey(key)) {
counters.write.count.getAndIncrement();
return;
}

if (counters.write.latency != null) {
long begin = System.nanoTime();
client.put(writePolicy, key, bins);
Expand All @@ -82,6 +91,11 @@ protected void put(WritePolicy writePolicy, Key key, Bin[] bins) {

@Override
protected void add(Key key, Bin[] bins) {
if (skipKey(key)) {
counters.write.count.getAndIncrement();
return;
}

if (counters.write.latency != null) {
long begin = System.nanoTime();
client.add(writePolicyGeneration, key, bins);
Expand All @@ -97,6 +111,11 @@ protected void add(Key key, Bin[] bins) {

@Override
protected void get(Key key, String binName) {
if (skipKey(key)) {
processRead(key, new Object());
return;
}

Record record;

if (counters.read.latency != null) {
Expand All @@ -113,6 +132,11 @@ record = client.get(args.readPolicy, key, binName);

@Override
protected void get(Key key) {
if (skipKey(key)) {
processRead(key, new Object());
return;
}

Record record;

if (counters.read.latency != null) {
Expand All @@ -129,10 +153,16 @@ record = client.get(args.readPolicy, key);

@Override
protected void get(Key key, String udfPackageName, String udfFunctionName, Value[] udfValues) {
if (skipKey(key)) {
processRead(key, new Object());
return;
}

Object udfReturnObj;

if (counters.read.latency != null) {
long begin = System.nanoTime();

udfReturnObj = client.execute(args.writePolicy, key, udfPackageName, udfFunctionName, udfValues);
long elapsed = System.nanoTime() - begin;
counters.read.latency.add(elapsed);
Expand All @@ -145,6 +175,10 @@ protected void get(Key key, String udfPackageName, String udfFunctionName, Value

@Override
protected void get(Key[] keys, String binName) {
if (args.partitionIds != null) {
keys = getFilteredKeys(keys);
}

Record[] records;

if (counters.read.latency != null) {
Expand All @@ -165,6 +199,10 @@ protected void get(Key[] keys, String binName) {

@Override
protected void get(Key[] keys) {
if (args.partitionIds != null) {
keys = getFilteredKeys(keys);
}

Record[] records;

if (counters.read.latency != null) {
Expand All @@ -182,4 +220,23 @@ protected void get(Key[] keys) {
}
processBatchRead();
}

private boolean skipKey(Key key) {
if (args.partitionIds != null && !args.partitionIds.contains(Partition.getPartitionId(key.digest))) {
return true;
}
return false;
}

private Key[] getFilteredKeys(Key[] keys) {
List<Key> filteredKeys = new ArrayList<>();

for (Key key : keys) {
if (! skipKey(key)) {
filteredKeys.add(key);
}
}

return filteredKeys.toArray(new Key[0]);
}
}
Loading