From 0fd15dfb0507ed862fd2b7b1c5df163faa321311 Mon Sep 17 00:00:00 2001 From: Brian Nichols Date: Tue, 26 Mar 2024 15:14:19 -0400 Subject: [PATCH 1/9] CLIENT-2839 Add methods for copying default policies: copyReadPolicyDefault() copyWritePolicyDefault() copyScanPolicyDefault() copyQueryPolicyDefault() copyBatchPolicyDefault() copyBatchParentPolicyWriteDefault() copyBatchWritePolicyDefault() copyBatchDeletePolicyDefault() copyBatchUDFPolicyDefault() copyInfoPolicyDefault() Also, suppress internal deprecation warnings for shortQuery. --- .../com/aerospike/client/AerospikeClient.java | 100 ++++++++++++++++++ .../aerospike/client/IAerospikeClient.java | 91 +++++++++++++++- .../com/aerospike/client/command/Command.java | 1 + .../src/com/aerospike/examples/QueryExp.java | 8 +- .../client/proxy/AerospikeClientProxy.java | 100 ++++++++++++++++++ .../client/proxy/grpc/GrpcConversions.java | 1 + 6 files changed, 296 insertions(+), 5 deletions(-) diff --git a/client/src/com/aerospike/client/AerospikeClient.java b/client/src/com/aerospike/client/AerospikeClient.java index 6f5bfe7ae..b055fbdb7 100644 --- a/client/src/com/aerospike/client/AerospikeClient.java +++ b/client/src/com/aerospike/client/AerospikeClient.java @@ -339,46 +339,146 @@ protected AerospikeClient(ClientPolicy policy) { // Default Policies //------------------------------------------------------- + /** + * Return read policy default. Use when the policy will not be modified. + */ public final Policy getReadPolicyDefault() { return readPolicyDefault; } + /** + * Copy read policy default. Use when the policy will be modified for use in a specific transaction. + */ + public final Policy copyReadPolicyDefault() { + return new Policy(readPolicyDefault); + } + + /** + * Return write policy default. Use when the policy will not be modified. + */ public final WritePolicy getWritePolicyDefault() { return writePolicyDefault; } + /** + * Copy write policy default. Use when the policy will be modified for use in a specific transaction. + */ + public final WritePolicy copyWritePolicyDefault() { + return new WritePolicy(writePolicyDefault); + } + + /** + * Return scan policy default. Use when the policy will not be modified. + */ public final ScanPolicy getScanPolicyDefault() { return scanPolicyDefault; } + /** + * Copy scan policy default. Use when the policy will be modified for use in a specific transaction. + */ + public final ScanPolicy copyScanPolicyDefault() { + return new ScanPolicy(scanPolicyDefault); + } + + /** + * Return query policy default. Use when the policy will not be modified. + */ public final QueryPolicy getQueryPolicyDefault() { return queryPolicyDefault; } + /** + * Copy query policy default. Use when the policy will be modified for use in a specific transaction. + */ + public final QueryPolicy copyQueryPolicyDefault() { + return new QueryPolicy(queryPolicyDefault); + } + + /** + * Return batch header read policy default. Use when the policy will not be modified. + */ public final BatchPolicy getBatchPolicyDefault() { return batchPolicyDefault; } + /** + * Copy batch header read policy default. Use when the policy will be modified for use in a specific transaction. + */ + public final BatchPolicy copyBatchPolicyDefault() { + return new BatchPolicy(batchPolicyDefault); + } + + /** + * Return batch header write policy default. Use when the policy will not be modified. + */ public final BatchPolicy getBatchParentPolicyWriteDefault() { return batchParentPolicyWriteDefault; } + /** + * Copy batch header write policy default. Use when the policy will be modified for use in a specific transaction. + */ + public final BatchPolicy copyBatchParentPolicyWriteDefault() { + return new BatchPolicy(batchParentPolicyWriteDefault); + } + + /** + * Return batch detail write policy default. Use when the policy will not be modified. + */ public final BatchWritePolicy getBatchWritePolicyDefault() { return batchWritePolicyDefault; } + /** + * Copy batch detail write policy default. Use when the policy will be modified for use in a specific transaction. + */ + public final BatchWritePolicy copyBatchWritePolicyDefault() { + return new BatchWritePolicy(batchWritePolicyDefault); + } + + /** + * Return batch detail delete policy default. Use when the policy will not be modified. + */ public final BatchDeletePolicy getBatchDeletePolicyDefault() { return batchDeletePolicyDefault; } + /** + * Copy batch detail delete policy default. Use when the policy will be modified for use in a specific transaction. + */ + public final BatchDeletePolicy copyBatchDeletePolicyDefault() { + return new BatchDeletePolicy(batchDeletePolicyDefault); + } + + /** + * Return batch detail UDF policy default. Use when the policy will not be modified. + */ public final BatchUDFPolicy getBatchUDFPolicyDefault() { return batchUDFPolicyDefault; } + /** + * Copy batch detail UDF policy default. Use when the policy will be modified for use in a specific transaction. + */ + public final BatchUDFPolicy copyBatchUDFPolicyDefault() { + return new BatchUDFPolicy(batchUDFPolicyDefault); + } + + /** + * Return info command policy default. Use when the policy will not be modified. + */ public final InfoPolicy getInfoPolicyDefault() { return infoPolicyDefault; } + /** + * Copy info command policy default. Use when the policy will be modified for use in a specific transaction. + */ + public final InfoPolicy copyInfoPolicyDefault() { + return new InfoPolicy(infoPolicyDefault); + } + //------------------------------------------------------- // Cluster Connection Management //------------------------------------------------------- diff --git a/client/src/com/aerospike/client/IAerospikeClient.java b/client/src/com/aerospike/client/IAerospikeClient.java index f6ce767fb..161ea58bb 100644 --- a/client/src/com/aerospike/client/IAerospikeClient.java +++ b/client/src/com/aerospike/client/IAerospikeClient.java @@ -1,5 +1,5 @@ /* - * Copyright 2012-2023 Aerospike, Inc. + * Copyright 2012-2024 Aerospike, Inc. * * Portions may be licensed to Aerospike, Inc. under one or more contributor * license agreements WHICH ARE COMPATIBLE WITH THE APACHE LICENSE, VERSION 2.0. @@ -77,17 +77,106 @@ public interface IAerospikeClient extends Closeable { // Default Policies //------------------------------------------------------- + /** + * Return read policy default. Use when the policy will not be modified. + */ public Policy getReadPolicyDefault(); + + /** + * Copy read policy default. Use when the policy will be modified for use in a specific transaction. + */ + public Policy copyReadPolicyDefault(); + + /** + * Return write policy default. Use when the policy will not be modified. + */ public WritePolicy getWritePolicyDefault(); + + /** + * Copy write policy default. Use when the policy will be modified for use in a specific transaction. + */ + public WritePolicy copyWritePolicyDefault(); + + /** + * Return scan policy default. Use when the policy will not be modified. + */ public ScanPolicy getScanPolicyDefault(); + + /** + * Copy scan policy default. Use when the policy will be modified for use in a specific transaction. + */ + public ScanPolicy copyScanPolicyDefault(); + + /** + * Return query policy default. Use when the policy will not be modified. + */ public QueryPolicy getQueryPolicyDefault(); + + /** + * Copy query policy default. Use when the policy will be modified for use in a specific transaction. + */ + public QueryPolicy copyQueryPolicyDefault(); + + /** + * Return batch header read policy default. Use when the policy will not be modified. + */ public BatchPolicy getBatchPolicyDefault(); + + /** + * Copy batch header read policy default. Use when the policy will be modified for use in a specific transaction. + */ + public BatchPolicy copyBatchPolicyDefault(); + + /** + * Return batch header write policy default. Use when the policy will not be modified. + */ public BatchPolicy getBatchParentPolicyWriteDefault(); + + /** + * Copy batch header write policy default. Use when the policy will be modified for use in a specific transaction. + */ + public BatchPolicy copyBatchParentPolicyWriteDefault(); + + /** + * Return batch detail write policy default. Use when the policy will not be modified. + */ public BatchWritePolicy getBatchWritePolicyDefault(); + + /** + * Copy batch detail write policy default. Use when the policy will be modified for use in a specific transaction. + */ + public BatchWritePolicy copyBatchWritePolicyDefault(); + + /** + * Return batch detail delete policy default. Use when the policy will not be modified. + */ public BatchDeletePolicy getBatchDeletePolicyDefault(); + + /** + * Copy batch detail delete policy default. Use when the policy will be modified for use in a specific transaction. + */ + public BatchDeletePolicy copyBatchDeletePolicyDefault(); + + /** + * Return batch detail UDF policy default. Use when the policy will not be modified. + */ public BatchUDFPolicy getBatchUDFPolicyDefault(); + + /** + * Copy batch detail UDF policy default. Use when the policy will be modified for use in a specific transaction. + */ + public BatchUDFPolicy copyBatchUDFPolicyDefault(); + + /** + * Return info command policy default. Use when the policy will not be modified. + */ public InfoPolicy getInfoPolicyDefault(); + /** + * Copy info command policy default. Use when the policy will be modified for use in a specific transaction. + */ + public InfoPolicy copyInfoPolicyDefault(); + //------------------------------------------------------- // Cluster Connection Management //------------------------------------------------------- diff --git a/client/src/com/aerospike/client/command/Command.java b/client/src/com/aerospike/client/command/Command.java index 9e9abf519..2fcb6ee53 100644 --- a/client/src/com/aerospike/client/command/Command.java +++ b/client/src/com/aerospike/client/command/Command.java @@ -1386,6 +1386,7 @@ public final void setScan( // Query //-------------------------------------------------- + @SuppressWarnings("deprecation") public final void setQuery( Cluster cluster, Policy policy, diff --git a/examples/src/com/aerospike/examples/QueryExp.java b/examples/src/com/aerospike/examples/QueryExp.java index c0a9d3a54..f0895dfec 100644 --- a/examples/src/com/aerospike/examples/QueryExp.java +++ b/examples/src/com/aerospike/examples/QueryExp.java @@ -1,5 +1,5 @@ /* - * Copyright 2012-2023 Aerospike, Inc. + * Copyright 2012-2024 Aerospike, Inc. * * Portions may be licensed to Aerospike, Inc. under one or more contributor * license agreements WHICH ARE COMPATIBLE WITH THE APACHE LICENSE, VERSION 2.0. @@ -132,7 +132,7 @@ private void runQuery1( // Predicates are applied on query results on server side. // Predicates can reference any bin. - QueryPolicy policy = new QueryPolicy(client.getQueryPolicyDefault()); + QueryPolicy policy = client.copyQueryPolicyDefault(); policy.filterExp = Exp.build( Exp.or( Exp.and( @@ -171,7 +171,7 @@ private void runQuery2( stmt.setSetName(params.set); stmt.setFilter(Filter.range(binName, begin, end)); - QueryPolicy policy = new QueryPolicy(client.getQueryPolicyDefault()); + QueryPolicy policy = client.copyQueryPolicyDefault(); policy.filterExp = Exp.build( Exp.and( Exp.ge(Exp.lastUpdate(), Exp.val(beginTime)), @@ -206,7 +206,7 @@ private void runQuery3( stmt.setSetName(params.set); stmt.setFilter(Filter.range(binName, begin, end)); - QueryPolicy policy = new QueryPolicy(client.getQueryPolicyDefault()); + QueryPolicy policy = client.copyQueryPolicyDefault(); policy.filterExp = Exp.build( Exp.regexCompare("prefix.*suffix", RegexFlag.ICASE | RegexFlag.NEWLINE, Exp.stringBin("bin3"))); diff --git a/proxy/src/com/aerospike/client/proxy/AerospikeClientProxy.java b/proxy/src/com/aerospike/client/proxy/AerospikeClientProxy.java index 1c3155c49..6a139bc7f 100644 --- a/proxy/src/com/aerospike/client/proxy/AerospikeClientProxy.java +++ b/proxy/src/com/aerospike/client/proxy/AerospikeClientProxy.java @@ -293,46 +293,146 @@ private static String getVersion() { // Default Policies //------------------------------------------------------- + /** + * Return read policy default. Use when the policy will not be modified. + */ public final Policy getReadPolicyDefault() { return readPolicyDefault; } + /** + * Copy read policy default. Use when the policy will be modified for use in a specific transaction. + */ + public final Policy copyReadPolicyDefault() { + return new Policy(readPolicyDefault); + } + + /** + * Return write policy default. Use when the policy will not be modified. + */ public final WritePolicy getWritePolicyDefault() { return writePolicyDefault; } + /** + * Copy write policy default. Use when the policy will be modified for use in a specific transaction. + */ + public final WritePolicy copyWritePolicyDefault() { + return new WritePolicy(writePolicyDefault); + } + + /** + * Return scan policy default. Use when the policy will not be modified. + */ public final ScanPolicy getScanPolicyDefault() { return scanPolicyDefault; } + /** + * Copy scan policy default. Use when the policy will be modified for use in a specific transaction. + */ + public final ScanPolicy copyScanPolicyDefault() { + return new ScanPolicy(scanPolicyDefault); + } + + /** + * Return query policy default. Use when the policy will not be modified. + */ public final QueryPolicy getQueryPolicyDefault() { return queryPolicyDefault; } + /** + * Copy query policy default. Use when the policy will be modified for use in a specific transaction. + */ + public final QueryPolicy copyQueryPolicyDefault() { + return new QueryPolicy(queryPolicyDefault); + } + + /** + * Return batch header read policy default. Use when the policy will not be modified. + */ public final BatchPolicy getBatchPolicyDefault() { return batchPolicyDefault; } + /** + * Copy batch header read policy default. Use when the policy will be modified for use in a specific transaction. + */ + public final BatchPolicy copyBatchPolicyDefault() { + return new BatchPolicy(batchPolicyDefault); + } + + /** + * Return batch header write policy default. Use when the policy will not be modified. + */ public final BatchPolicy getBatchParentPolicyWriteDefault() { return batchParentPolicyWriteDefault; } + /** + * Copy batch header write policy default. Use when the policy will be modified for use in a specific transaction. + */ + public final BatchPolicy copyBatchParentPolicyWriteDefault() { + return new BatchPolicy(batchParentPolicyWriteDefault); + } + + /** + * Return batch detail write policy default. Use when the policy will not be modified. + */ public final BatchWritePolicy getBatchWritePolicyDefault() { return batchWritePolicyDefault; } + /** + * Copy batch detail write policy default. Use when the policy will be modified for use in a specific transaction. + */ + public final BatchWritePolicy copyBatchWritePolicyDefault() { + return new BatchWritePolicy(batchWritePolicyDefault); + } + + /** + * Return batch detail delete policy default. Use when the policy will not be modified. + */ public final BatchDeletePolicy getBatchDeletePolicyDefault() { return batchDeletePolicyDefault; } + /** + * Copy batch detail delete policy default. Use when the policy will be modified for use in a specific transaction. + */ + public final BatchDeletePolicy copyBatchDeletePolicyDefault() { + return new BatchDeletePolicy(batchDeletePolicyDefault); + } + + /** + * Return batch detail UDF policy default. Use when the policy will not be modified. + */ public final BatchUDFPolicy getBatchUDFPolicyDefault() { return batchUDFPolicyDefault; } + /** + * Copy batch detail UDF policy default. Use when the policy will be modified for use in a specific transaction. + */ + public final BatchUDFPolicy copyBatchUDFPolicyDefault() { + return new BatchUDFPolicy(batchUDFPolicyDefault); + } + + /** + * Return info command policy default. Use when the policy will not be modified. + */ public final InfoPolicy getInfoPolicyDefault() { return infoPolicyDefault; } + /** + * Copy info command policy default. Use when the policy will be modified for use in a specific transaction. + */ + public final InfoPolicy copyInfoPolicyDefault() { + return new InfoPolicy(infoPolicyDefault); + } + //------------------------------------------------------- // Client Management //------------------------------------------------------- diff --git a/proxy/src/com/aerospike/client/proxy/grpc/GrpcConversions.java b/proxy/src/com/aerospike/client/proxy/grpc/GrpcConversions.java index 49a068f73..69cad9969 100644 --- a/proxy/src/com/aerospike/client/proxy/grpc/GrpcConversions.java +++ b/proxy/src/com/aerospike/client/proxy/grpc/GrpcConversions.java @@ -106,6 +106,7 @@ public static Kvs.ScanPolicy toGrpc(ScanPolicy scanPolicy) { return scanPolicyBuilder.build(); } + @SuppressWarnings("deprecation") public static Kvs.QueryPolicy toGrpc(QueryPolicy queryPolicy) { // Base policy fields. Kvs.QueryPolicy.Builder queryPolicyBuilder = Kvs.QueryPolicy.newBuilder(); From 81aba0b6f6e1c523639a514dd21b2575a299eeaf Mon Sep 17 00:00:00 2001 From: agrgr Date: Thu, 28 Mar 2024 12:48:45 -0400 Subject: [PATCH 2/9] CLIENT-2843 Add getKeyRecord() in RecordSet and RecordSequenceRecordSet. --- client/src/com/aerospike/client/query/RecordSet.java | 11 +++++++++-- .../client/proxy/RecordSequenceRecordSet.java | 9 +++++++-- 2 files changed, 16 insertions(+), 4 deletions(-) diff --git a/client/src/com/aerospike/client/query/RecordSet.java b/client/src/com/aerospike/client/query/RecordSet.java index ad36a5e65..7796db2b9 100644 --- a/client/src/com/aerospike/client/query/RecordSet.java +++ b/client/src/com/aerospike/client/query/RecordSet.java @@ -1,5 +1,5 @@ /* - * Copyright 2012-2023 Aerospike, Inc. + * Copyright 2012-2024 Aerospike, Inc. * * Portions may be licensed to Aerospike, Inc. under one or more contributor * license agreements WHICH ARE COMPATIBLE WITH THE APACHE LICENSE, VERSION 2.0. @@ -131,6 +131,13 @@ public Record getRecord() { return record.record; } + /** + * Get key and record. + */ + public KeyRecord getKeyRecord() { + return record; + } + //------------------------------------------------------- // Methods for internal use only. //------------------------------------------------------- @@ -206,7 +213,7 @@ public boolean hasNext() { @Override public KeyRecord next() { - KeyRecord kr = recordSet.record; + KeyRecord kr = recordSet.getKeyRecord(); more = recordSet.next(); return kr; } diff --git a/proxy/src/com/aerospike/client/proxy/RecordSequenceRecordSet.java b/proxy/src/com/aerospike/client/proxy/RecordSequenceRecordSet.java index 0da743a84..b782cb447 100644 --- a/proxy/src/com/aerospike/client/proxy/RecordSequenceRecordSet.java +++ b/proxy/src/com/aerospike/client/proxy/RecordSequenceRecordSet.java @@ -1,5 +1,5 @@ /* - * Copyright 2012-2023 Aerospike, Inc. + * Copyright 2012-2024 Aerospike, Inc. * * Portions may be licensed to Aerospike, Inc. under one or more contributor * license agreements WHICH ARE COMPATIBLE WITH THE APACHE LICENSE, VERSION 2.0. @@ -34,7 +34,7 @@ public class RecordSequenceRecordSet extends RecordSet implements RecordSequence private final long taskId; private volatile boolean valid = true; private final BlockingQueue queue; - private volatile KeyRecord record; + protected volatile KeyRecord record; private volatile AerospikeException exception; public RecordSequenceRecordSet(long taskId, int capacity) { @@ -114,6 +114,11 @@ public Key getKey() { return record.key; } + @Override + public KeyRecord getKeyRecord() { + return record; + } + @Override public void onRecord(Key key, Record record) throws AerospikeException { if (!valid) { From ed417c99efdf6a599b9169473d6d98855d67039c Mon Sep 17 00:00:00 2001 From: Brian Nichols Date: Thu, 28 Mar 2024 17:47:11 -0400 Subject: [PATCH 3/9] CLIENT-2842 Use default batch policy (ClientPolicy batchWritePolicyDefault, batchUDFPolicyDefault or batchDeletePolicyDefault) when the record level batch policy is null. --- .../com/aerospike/client/AerospikeClient.java | 74 +++++-------------- .../aerospike/client/async/AsyncBatch.java | 13 +++- .../com/aerospike/client/cluster/Cluster.java | 7 +- .../com/aerospike/client/command/Batch.java | 7 +- .../aerospike/client/command/BatchAttr.java | 33 --------- .../com/aerospike/client/command/Command.java | 46 ++++++------ .../client/proxy/AerospikeClientProxy.java | 4 +- .../aerospike/client/proxy/BatchProxy.java | 20 +++-- 8 files changed, 80 insertions(+), 124 deletions(-) diff --git a/client/src/com/aerospike/client/AerospikeClient.java b/client/src/com/aerospike/client/AerospikeClient.java index b055fbdb7..f5ffee2c7 100644 --- a/client/src/com/aerospike/client/AerospikeClient.java +++ b/client/src/com/aerospike/client/AerospikeClient.java @@ -296,7 +296,7 @@ public AerospikeClient(ClientPolicy policy, Host... hosts) this.infoPolicyDefault = policy.infoPolicyDefault; this.operatePolicyReadDefault = new WritePolicy(this.readPolicyDefault); - cluster = new Cluster(policy, hosts); + cluster = new Cluster(this, policy, hosts); } //------------------------------------------------------- @@ -2328,13 +2328,9 @@ public final boolean operate(BatchPolicy policy, List records) case BATCH_WRITE: { BatchWrite bw = (BatchWrite)record; BatchAttr attr = new BatchAttr(); + BatchWritePolicy bwp = (bw.policy != null)? bw.policy : this.batchWritePolicyDefault; - if (bw.policy != null) { - attr.setWrite(bw.policy); - } - else { - attr.setWrite(policy); - } + attr.setWrite(bwp); attr.adjustWrite(bw.ops); attr.setOpSize(bw.ops); commands[count++] = new BatchSingle.OperateBatchRecord( @@ -2345,13 +2341,9 @@ public final boolean operate(BatchPolicy policy, List records) case BATCH_UDF: { BatchUDF bu = (BatchUDF)record; BatchAttr attr = new BatchAttr(); + BatchUDFPolicy bup = (bu.policy != null)? bu.policy : this.batchUDFPolicyDefault; - if (bu.policy != null) { - attr.setUDF(bu.policy); - } - else { - attr.setUDF(policy); - } + attr.setUDF(bup); commands[count++] = new BatchSingle.UDF( cluster, policy, bu.packageName, bu.functionName, bu.functionArgs, attr, record, status, bn.node); @@ -2361,13 +2353,9 @@ public final boolean operate(BatchPolicy policy, List records) case BATCH_DELETE: { BatchDelete bd = (BatchDelete)record; BatchAttr attr = new BatchAttr(); + BatchDeletePolicy bdp = (bd.policy != null)? bd.policy : this.batchDeletePolicyDefault; - if (bd.policy != null) { - attr.setDelete(bd.policy); - } - else { - attr.setDelete(policy); - } + attr.setDelete(bdp); commands[count++] = new BatchSingle.Delete(cluster, policy, attr, record, status, bn.node); break; } @@ -2445,13 +2433,9 @@ public final void operate( case BATCH_WRITE: { BatchWrite bw = (BatchWrite)record; BatchAttr attr = new BatchAttr(); + BatchWritePolicy bwp = (bw.policy != null)? bw.policy : this.batchWritePolicyDefault; - if (bw.policy != null) { - attr.setWrite(bw.policy); - } - else { - attr.setWrite(policy); - } + attr.setWrite(bwp); attr.adjustWrite(bw.ops); attr.setOpSize(bw.ops); commands[count++] = new AsyncBatchSingle.Write(executor, cluster, policy, attr, bw, bn.node); @@ -2461,13 +2445,9 @@ public final void operate( case BATCH_UDF: { BatchUDF bu = (BatchUDF)record; BatchAttr attr = new BatchAttr(); + BatchUDFPolicy bup = (bu.policy != null)? bu.policy : this.batchUDFPolicyDefault; - if (bu.policy != null) { - attr.setUDF(bu.policy); - } - else { - attr.setUDF(policy); - } + attr.setUDF(bup); commands[count++] = new AsyncBatchSingle.UDF(executor, cluster, policy, attr, bu, bn.node); break; } @@ -2475,13 +2455,9 @@ public final void operate( case BATCH_DELETE: { BatchDelete bd = (BatchDelete)record; BatchAttr attr = new BatchAttr(); + BatchDeletePolicy bdp = (bd.policy != null)? bd.policy : this.batchDeletePolicyDefault; - if (bd.policy != null) { - attr.setDelete(bd.policy); - } - else { - attr.setDelete(policy); - } + attr.setDelete(bdp); commands[count++] = new AsyncBatchSingle.Delete(executor, cluster, policy, attr, record, bn.node); break; @@ -2560,13 +2536,9 @@ public final void operate( case BATCH_WRITE: { BatchWrite bw = (BatchWrite)record; BatchAttr attr = new BatchAttr(); + BatchWritePolicy bwp = (bw.policy != null)? bw.policy : this.batchWritePolicyDefault; - if (bw.policy != null) { - attr.setWrite(bw.policy); - } - else { - attr.setWrite(policy); - } + attr.setWrite(bwp); attr.adjustWrite(bw.ops); attr.setOpSize(bw.ops); commands[count++] = new AsyncBatchSingle.WriteSequence( @@ -2577,13 +2549,9 @@ public final void operate( case BATCH_UDF: { BatchUDF bu = (BatchUDF)record; BatchAttr attr = new BatchAttr(); + BatchUDFPolicy bup = (bu.policy != null)? bu.policy : this.batchUDFPolicyDefault; - if (bu.policy != null) { - attr.setUDF(bu.policy); - } - else { - attr.setUDF(policy); - } + attr.setUDF(bup); commands[count++] = new AsyncBatchSingle.UDFSequence( executor, cluster, policy, attr, bu, bn.node, listener, i); break; @@ -2592,13 +2560,9 @@ public final void operate( case BATCH_DELETE: { BatchDelete bd = (BatchDelete)record; BatchAttr attr = new BatchAttr(); + BatchDeletePolicy bdp = (bd.policy != null)? bd.policy : this.batchDeletePolicyDefault; - if (bd.policy != null) { - attr.setDelete(bd.policy); - } - else { - attr.setDelete(policy); - } + attr.setDelete(bdp); commands[count++] = new AsyncBatchSingle.DeleteSequence( executor, cluster, policy, attr, bd, bn.node, listener, i); break; diff --git a/client/src/com/aerospike/client/async/AsyncBatch.java b/client/src/com/aerospike/client/async/AsyncBatch.java index abd386dde..5893e06ae 100644 --- a/client/src/com/aerospike/client/async/AsyncBatch.java +++ b/client/src/com/aerospike/client/async/AsyncBatch.java @@ -19,6 +19,7 @@ import java.util.ArrayList; import java.util.List; +import com.aerospike.client.AerospikeClient; import com.aerospike.client.AerospikeException; import com.aerospike.client.BatchRead; import com.aerospike.client.BatchRecord; @@ -62,7 +63,7 @@ public ReadListCommand( @Override protected void writeBuffer() { if (batch.node.hasBatchAny()) { - setBatchOperate(batchPolicy, records, batch); + setBatchOperate(batchPolicy, null, null, null, records, batch); } else { setBatchRead(batchPolicy, records, batch); @@ -117,7 +118,7 @@ public ReadSequenceCommand( @Override protected void writeBuffer() { if (batch.node.hasBatchAny()) { - setBatchOperate(batchPolicy, records, batch); + setBatchOperate(batchPolicy, null, null, null, records, batch); } else { setBatchRead(batchPolicy, records, batch); @@ -411,7 +412,9 @@ protected boolean isWrite() { @Override protected void writeBuffer() { - setBatchOperate(batchPolicy, records, batch); + AerospikeClient client = parent.cluster.client; + setBatchOperate(batchPolicy, client.batchWritePolicyDefault, client.batchUDFPolicyDefault, + client.batchDeletePolicyDefault, records, batch); } @Override @@ -498,7 +501,9 @@ protected boolean isWrite() { @Override protected void writeBuffer() { - setBatchOperate(batchPolicy, records, batch); + AerospikeClient client = parent.cluster.client; + setBatchOperate(batchPolicy, client.batchWritePolicyDefault, client.batchUDFPolicyDefault, + client.batchDeletePolicyDefault, records, batch); } @Override diff --git a/client/src/com/aerospike/client/cluster/Cluster.java b/client/src/com/aerospike/client/cluster/Cluster.java index fdc75c2d3..1b6a82b01 100644 --- a/client/src/com/aerospike/client/cluster/Cluster.java +++ b/client/src/com/aerospike/client/cluster/Cluster.java @@ -31,6 +31,7 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReferenceArray; +import com.aerospike.client.AerospikeClient; import com.aerospike.client.AerospikeException; import com.aerospike.client.Host; import com.aerospike.client.Log; @@ -57,6 +58,9 @@ import com.aerospike.client.util.Util; public class Cluster implements Runnable, Closeable { + // Client back pointer. + public final AerospikeClient client; + // Expected cluster name. protected final String clusterName; @@ -201,7 +205,8 @@ public class Cluster implements Runnable, Closeable { private final AtomicLong tranCount = new AtomicLong(); private final AtomicLong delayQueueTimeoutCount = new AtomicLong(); - public Cluster(ClientPolicy policy, Host[] hosts) { + public Cluster(AerospikeClient client, ClientPolicy policy, Host[] hosts) { + this.client = client; this.clusterName = policy.clusterName; this.validateClusterName = policy.validateClusterName; this.tlsPolicy = policy.tlsPolicy; diff --git a/client/src/com/aerospike/client/command/Batch.java b/client/src/com/aerospike/client/command/Batch.java index 1c208a2b2..a5168c061 100644 --- a/client/src/com/aerospike/client/command/Batch.java +++ b/client/src/com/aerospike/client/command/Batch.java @@ -20,6 +20,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import com.aerospike.client.AerospikeClient; import com.aerospike.client.AerospikeException; import com.aerospike.client.BatchRead; import com.aerospike.client.BatchRecord; @@ -55,7 +56,7 @@ public ReadListCommand( @Override protected void writeBuffer() { if (batch.node.hasBatchAny()) { - setBatchOperate(batchPolicy, records, batch); + setBatchOperate(batchPolicy, null, null, null, records, batch); } else { setBatchRead(batchPolicy, records, batch); @@ -234,7 +235,9 @@ protected boolean isWrite() { @Override protected void writeBuffer() { - setBatchOperate(batchPolicy, records, batch); + AerospikeClient client = cluster.client; + setBatchOperate(batchPolicy, client.batchWritePolicyDefault, client.batchUDFPolicyDefault, + client.batchDeletePolicyDefault, records, batch); } @Override diff --git a/client/src/com/aerospike/client/command/BatchAttr.java b/client/src/com/aerospike/client/command/BatchAttr.java index 9f48e3dbb..487a742d0 100644 --- a/client/src/com/aerospike/client/command/BatchAttr.java +++ b/client/src/com/aerospike/client/command/BatchAttr.java @@ -191,17 +191,6 @@ public void adjustRead(boolean readAllBins) { } } - public void setWrite(Policy wp) { - filterExp = null; - readAttr = 0; - writeAttr = Command.INFO2_WRITE | Command.INFO2_RESPOND_ALL_OPS; - infoAttr = 0; - expiration = 0; - generation = 0; - hasWrite = true; - sendKey = wp.sendKey; - } - public void setWrite(BatchWritePolicy wp) { filterExp = wp.filterExp; readAttr = 0; @@ -271,17 +260,6 @@ else if (op.type == Operation.Type.READ_HEADER) { } } - public void setUDF(Policy up) { - filterExp = null; - readAttr = 0; - writeAttr = Command.INFO2_WRITE; - infoAttr = 0; - expiration = 0; - generation = 0; - hasWrite = true; - sendKey = up.sendKey; - } - public void setUDF(BatchUDFPolicy up) { filterExp = up.filterExp; readAttr = 0; @@ -301,17 +279,6 @@ public void setUDF(BatchUDFPolicy up) { } } - public void setDelete(Policy dp) { - filterExp = null; - readAttr = 0; - writeAttr = Command.INFO2_WRITE | Command.INFO2_RESPOND_ALL_OPS | Command.INFO2_DELETE; - infoAttr = 0; - expiration = 0; - generation = 0; - hasWrite = true; - sendKey = dp.sendKey; - } - public void setDelete(BatchDeletePolicy dp) { filterExp = dp.filterExp; readAttr = 0; diff --git a/client/src/com/aerospike/client/command/Command.java b/client/src/com/aerospike/client/command/Command.java index 2fcb6ee53..efda4be76 100644 --- a/client/src/com/aerospike/client/command/Command.java +++ b/client/src/com/aerospike/client/command/Command.java @@ -36,8 +36,11 @@ import com.aerospike.client.Value; import com.aerospike.client.cluster.Cluster; import com.aerospike.client.exp.Expression; +import com.aerospike.client.policy.BatchDeletePolicy; import com.aerospike.client.policy.BatchPolicy; import com.aerospike.client.policy.BatchReadPolicy; +import com.aerospike.client.policy.BatchUDFPolicy; +import com.aerospike.client.policy.BatchWritePolicy; import com.aerospike.client.policy.CommitLevel; import com.aerospike.client.policy.Policy; import com.aerospike.client.policy.QueryDuration; @@ -742,12 +745,25 @@ else if (ops != null) { // Batch Read/Write Operations //-------------------------------------------------- - public final void setBatchOperate(BatchPolicy policy, List records, BatchNode batch) { + public final void setBatchOperate( + BatchPolicy policy, + BatchWritePolicy writePolicy, + BatchUDFPolicy udfPolicy, + BatchDeletePolicy deletePolicy, + List records, + BatchNode batch + ) { final BatchRecordIterNative iter = new BatchRecordIterNative(records, batch); - setBatchOperate(policy, iter); + setBatchOperate(policy, writePolicy, udfPolicy, deletePolicy, iter); } - public final void setBatchOperate(BatchPolicy policy, KeyIter iter) { + public final void setBatchOperate( + BatchPolicy policy, + BatchWritePolicy writePolicy, + BatchUDFPolicy udfPolicy, + BatchDeletePolicy deletePolicy, + KeyIter iter + ) { BatchRecord record; BatchRecord prev = null; @@ -850,13 +866,9 @@ else if (br.ops != null) { case BATCH_WRITE: { BatchWrite bw = (BatchWrite)record; + BatchWritePolicy bwp = (bw.policy != null)? bw.policy : writePolicy; - if (bw.policy != null) { - attr.setWrite(bw.policy); - } - else { - attr.setWrite(policy); - } + attr.setWrite(bwp); attr.adjustWrite(bw.ops); writeBatchOperations(key, bw.ops, attr, attr.filterExp); break; @@ -864,13 +876,9 @@ else if (br.ops != null) { case BATCH_UDF: { BatchUDF bu = (BatchUDF)record; + BatchUDFPolicy bup = (bu.policy != null)? bu.policy : udfPolicy; - if (bu.policy != null) { - attr.setUDF(bu.policy); - } - else { - attr.setUDF(policy); - } + attr.setUDF(bup); writeBatchWrite(key, attr, attr.filterExp, 3, 0); writeField(bu.packageName, FieldType.UDF_PACKAGE_NAME); writeField(bu.functionName, FieldType.UDF_FUNCTION); @@ -880,13 +888,9 @@ else if (br.ops != null) { case BATCH_DELETE: { BatchDelete bd = (BatchDelete)record; + BatchDeletePolicy bdp = (bd.policy != null)? bd.policy : deletePolicy; - if (bd.policy != null) { - attr.setDelete(bd.policy); - } - else { - attr.setDelete(policy); - } + attr.setDelete(bdp); writeBatchWrite(key, attr, attr.filterExp, 0, 0); break; } diff --git a/proxy/src/com/aerospike/client/proxy/AerospikeClientProxy.java b/proxy/src/com/aerospike/client/proxy/AerospikeClientProxy.java index 6a139bc7f..46a051d91 100644 --- a/proxy/src/com/aerospike/client/proxy/AerospikeClientProxy.java +++ b/proxy/src/com/aerospike/client/proxy/AerospikeClientProxy.java @@ -1575,7 +1575,7 @@ public void operate( policy = batchParentPolicyWriteDefault; } - CommandProxy command = new BatchProxy.OperateListCommand(executor, policy, listener, records); + CommandProxy command = new BatchProxy.OperateListCommand(this, executor, policy, listener, records); command.execute(); } @@ -1610,7 +1610,7 @@ public void operate( policy = batchParentPolicyWriteDefault; } - CommandProxy command = new BatchProxy.OperateSequenceCommand(executor, policy, listener, records); + CommandProxy command = new BatchProxy.OperateSequenceCommand(this, executor, policy, listener, records); command.execute(); } diff --git a/proxy/src/com/aerospike/client/proxy/BatchProxy.java b/proxy/src/com/aerospike/client/proxy/BatchProxy.java index cef64b9da..c4ce06f8d 100644 --- a/proxy/src/com/aerospike/client/proxy/BatchProxy.java +++ b/proxy/src/com/aerospike/client/proxy/BatchProxy.java @@ -1,5 +1,5 @@ /* - * Copyright 2012-2023 Aerospike, Inc. + * Copyright 2012-2024 Aerospike, Inc. * * Portions may be licensed to Aerospike, Inc. under one or more contributor * license agreements WHICH ARE COMPATIBLE WITH THE APACHE LICENSE, VERSION 2.0. @@ -75,7 +75,7 @@ public ReadListCommandSync( @Override void writeCommand(Command command) { BatchRecordIterProxy iter = new BatchRecordIterProxy(records); - command.setBatchOperate(batchPolicy, iter); + command.setBatchOperate(batchPolicy, null, null, null, iter); } @Override @@ -120,7 +120,7 @@ public ReadListCommand( @Override void writeCommand(Command command) { BatchRecordIterProxy iter = new BatchRecordIterProxy(records); - command.setBatchOperate(batchPolicy, iter); + command.setBatchOperate(batchPolicy, null, null, null, iter); } @Override @@ -164,7 +164,7 @@ public ReadSequenceCommand( @Override void writeCommand(Command command) { BatchRecordIterProxy iter = new BatchRecordIterProxy(records); - command.setBatchOperate(batchPolicy, iter); + command.setBatchOperate(batchPolicy, null, null, null, iter); } @Override @@ -417,17 +417,20 @@ void onFailure(AerospikeException ae) { //------------------------------------------------------- public static final class OperateListCommand extends BaseCommand { + private final AerospikeClientProxy client; private final BatchOperateListListener listener; private final List records; private boolean status; public OperateListCommand( + AerospikeClientProxy client, GrpcCallExecutor executor, BatchPolicy batchPolicy, BatchOperateListListener listener, List records ) { super(executor, batchPolicy, true, records.size()); + this.client = client; this.listener = listener; this.records = records; this.status = true; @@ -436,7 +439,8 @@ public OperateListCommand( @Override void writeCommand(Command command) { BatchRecordIterProxy iter = new BatchRecordIterProxy(records); - command.setBatchOperate(batchPolicy, iter); + command.setBatchOperate(batchPolicy, client.batchWritePolicyDefault, client.batchUDFPolicyDefault, + client.batchDeletePolicyDefault, iter); } @Override @@ -485,16 +489,19 @@ void onFailure(AerospikeException ae) { } public static final class OperateSequenceCommand extends BaseCommand { + private final AerospikeClientProxy client; private final BatchRecordSequenceListener listener; private final List records; public OperateSequenceCommand( + AerospikeClientProxy client, GrpcCallExecutor executor, BatchPolicy batchPolicy, BatchRecordSequenceListener listener, List records ) { super(executor, batchPolicy, true, records.size()); + this.client = client; this.listener = listener; this.records = records; } @@ -502,7 +509,8 @@ public OperateSequenceCommand( @Override void writeCommand(Command command) { BatchRecordIterProxy iter = new BatchRecordIterProxy(records); - command.setBatchOperate(batchPolicy, iter); + command.setBatchOperate(batchPolicy, client.batchWritePolicyDefault, client.batchUDFPolicyDefault, + client.batchDeletePolicyDefault, iter); } @Override From 1d3444208197ea3b02663bf04c5b7dcabb1f8c6b Mon Sep 17 00:00:00 2001 From: Brian Nichols Date: Fri, 5 Apr 2024 14:43:05 -0400 Subject: [PATCH 4/9] CLIENT-2868 Add readTouchTtlPercent command line option to benchmarks. --- .../src/com/aerospike/benchmarks/Main.java | 42 +++++++++++++------ 1 file changed, 30 insertions(+), 12 deletions(-) diff --git a/benchmarks/src/com/aerospike/benchmarks/Main.java b/benchmarks/src/com/aerospike/benchmarks/Main.java index b99af08e7..18bc82f22 100644 --- a/benchmarks/src/com/aerospike/benchmarks/Main.java +++ b/benchmarks/src/com/aerospike/benchmarks/Main.java @@ -222,6 +222,11 @@ public Main(String[] commandLineArgs) throws Exception { " 0: Default to namespace expiration time\n" + " >0: Actual given expiration time" ); + options.addOption("rt", "readTouchTtlPercent", true, + "Read touch TTL percent is expressed as a percentage of the TTL (or expiration) sent on the most\n" + + "recent write such that a read within this interval of the record’s end of life will generate a touch.\n" + + "Range: 0 - 100" + ); options.addOption("g", "throughput", true, "Set a target transactions per second for the client. The client should not exceed this " + "average throughput." @@ -392,6 +397,14 @@ public Main(String[] commandLineArgs) throws Exception { } } + if (line.hasOption("readTouchTtlPercent")) { + args.readPolicy.readTouchTtlPercent = Integer.parseInt(line.getOptionValue("readTouchTtlPercent")); + if (args.readPolicy.readTouchTtlPercent < 0 || args.readPolicy.readTouchTtlPercent > 100) { + throw new Exception("Invalid readTouchTtlPercent: " + args.readPolicy.readTouchTtlPercent + + " Range: 0 - 100"); + } + } + if (line.hasOption("port")) { this.port = Integer.parseInt(line.getOptionValue("port")); } @@ -1057,19 +1070,21 @@ else if (! level.equals("all")) { if (args.workload != Workload.INITIALIZE) { System.out.println("read policy:"); System.out.println( - " connectTimeout: " + args.readPolicy.connectTimeout - + ", socketTimeout: " + args.readPolicy.socketTimeout - + ", totalTimeout: " + args.readPolicy.totalTimeout - + ", timeoutDelay: " + args.readPolicy.timeoutDelay - + ", maxRetries: " + args.readPolicy.maxRetries - + ", sleepBetweenRetries: " + args.readPolicy.sleepBetweenRetries - ); + " connectTimeout: " + args.readPolicy.connectTimeout + + ", socketTimeout: " + args.readPolicy.socketTimeout + + ", totalTimeout: " + args.readPolicy.totalTimeout + + ", timeoutDelay: " + args.readPolicy.timeoutDelay + + ", maxRetries: " + args.readPolicy.maxRetries + + ", sleepBetweenRetries: " + args.readPolicy.sleepBetweenRetries + ); System.out.println( - " readModeAP: " + args.readPolicy.readModeAP - + ", readModeSC: " + args.readPolicy.readModeSC - + ", replica: " + args.readPolicy.replica - + ", reportNotFound: " + args.reportNotFound); + " readModeAP: " + args.readPolicy.readModeAP + + ", readModeSC: " + args.readPolicy.readModeSC + + ", replica: " + args.readPolicy.replica + + ", readTouchTtlPercent: " + args.readPolicy.readTouchTtlPercent + + ", reportNotFound: " + args.reportNotFound + ); } System.out.println("write policy:"); @@ -1082,7 +1097,10 @@ else if (! level.equals("all")) { + ", sleepBetweenRetries: " + args.writePolicy.sleepBetweenRetries ); - System.out.println(" commitLevel: " + args.writePolicy.commitLevel); + System.out.println( + " commitLevel: " + args.writePolicy.commitLevel + + ", expiration: " + args.writePolicy.expiration + ); if (args.batchSize > 1) { System.out.println("batch size: " + args.batchSize); From d59e8bbc6c25ab75b3f0f0760a671712001a918a Mon Sep 17 00:00:00 2001 From: Kateryna Kanivets Date: Fri, 5 Apr 2024 16:05:42 -0400 Subject: [PATCH 5/9] Code cleanup in benchmarks. --- .../aerospike/benchmarks/InsertTaskSync.java | 18 +++----- .../src/com/aerospike/benchmarks/KeyType.java | 4 +- .../aerospike/benchmarks/LatencyManager.java | 12 ++--- .../benchmarks/LatencyManagerYcsb.java | 26 +++++------ .../src/com/aerospike/benchmarks/Main.java | 40 ++++++++-------- .../src/com/aerospike/benchmarks/RWTask.java | 46 +++++++++---------- .../com/aerospike/benchmarks/RWTaskSync.java | 7 +-- .../benchmarks/TransactionalItem.java | 7 +-- .../benchmarks/TransactionalType.java | 10 ++-- .../benchmarks/TransactionalWorkload.java | 6 +-- .../com/aerospike/benchmarks/Workload.java | 4 +- 11 files changed, 87 insertions(+), 93 deletions(-) diff --git a/benchmarks/src/com/aerospike/benchmarks/InsertTaskSync.java b/benchmarks/src/com/aerospike/benchmarks/InsertTaskSync.java index 0046dc511..992ed2d75 100644 --- a/benchmarks/src/com/aerospike/benchmarks/InsertTaskSync.java +++ b/benchmarks/src/com/aerospike/benchmarks/InsertTaskSync.java @@ -1,5 +1,5 @@ /* - * Copyright 2012-2023 Aerospike, Inc. + * Copyright 2012-2024 Aerospike, Inc. * * Portions may be licensed to Aerospike, Inc. under one or more contributor * license agreements WHICH ARE COMPATIBLE WITH THE APACHE LICENSE, VERSION 2.0. @@ -16,9 +16,6 @@ */ package com.aerospike.benchmarks; -import java.util.HashMap; -import java.util.Map; - import com.aerospike.client.AerospikeException; import com.aerospike.client.Bin; import com.aerospike.client.IAerospikeClient; @@ -87,11 +84,11 @@ private void runCommand(long keyCurrent, RandomShift random) { private void put(Key key, Bin[] bins) { if (counters.write.latency != null) { long begin = System.nanoTime(); - + if (! skipKey(key)) { client.put(args.writePolicy, key, bins); } - + long elapsed = System.nanoTime() - begin; counters.write.count.getAndIncrement(); counters.write.latency.add(elapsed); @@ -103,11 +100,8 @@ private void put(Key key, Bin[] bins) { counters.write.count.getAndIncrement(); } } - + private boolean skipKey(Key key) { - if (args.partitionIds != null && !args.partitionIds.contains(Partition.getPartitionId(key.digest))) { - return true; - } - return false; - } + return args.partitionIds != null && !args.partitionIds.contains(Partition.getPartitionId(key.digest)); + } } diff --git a/benchmarks/src/com/aerospike/benchmarks/KeyType.java b/benchmarks/src/com/aerospike/benchmarks/KeyType.java index 885e79846..24daf4326 100644 --- a/benchmarks/src/com/aerospike/benchmarks/KeyType.java +++ b/benchmarks/src/com/aerospike/benchmarks/KeyType.java @@ -1,5 +1,5 @@ /* - * Copyright 2012-2021 Aerospike, Inc. + * Copyright 2012-2024 Aerospike, Inc. * * Portions may be licensed to Aerospike, Inc. under one or more contributor * license agreements WHICH ARE COMPATIBLE WITH THE APACHE LICENSE, VERSION 2.0. @@ -17,5 +17,5 @@ package com.aerospike.benchmarks; public enum KeyType { - STRING, INTEGER; + STRING, INTEGER } diff --git a/benchmarks/src/com/aerospike/benchmarks/LatencyManager.java b/benchmarks/src/com/aerospike/benchmarks/LatencyManager.java index 0b157016b..ec13c9bf0 100644 --- a/benchmarks/src/com/aerospike/benchmarks/LatencyManager.java +++ b/benchmarks/src/com/aerospike/benchmarks/LatencyManager.java @@ -1,5 +1,5 @@ /* - * Copyright 2012-2021 Aerospike, Inc. + * Copyright 2012-2024 Aerospike, Inc. * * Portions may be licensed to Aerospike, Inc. under one or more contributor * license agreements WHICH ARE COMPATIBLE WITH THE APACHE LICENSE, VERSION 2.0. @@ -20,9 +20,9 @@ public interface LatencyManager { - public void add(long elapsed); + void add(long elapsed); - public void printHeader(PrintStream stream); + void printHeader(PrintStream stream); /** * Print latency percents for specified cumulative ranges. @@ -32,9 +32,9 @@ public interface LatencyManager { * affects performance. Fortunately, the values will even out over time * (ie. no double counting). */ - public void printResults(PrintStream stream, String prefix); + void printResults(PrintStream stream, String prefix); - public void printSummaryHeader(PrintStream stream); + void printSummaryHeader(PrintStream stream); - public void printSummary(PrintStream stream, String prefix); + void printSummary(PrintStream stream, String prefix); } diff --git a/benchmarks/src/com/aerospike/benchmarks/LatencyManagerYcsb.java b/benchmarks/src/com/aerospike/benchmarks/LatencyManagerYcsb.java index 50498bf82..351abad9a 100644 --- a/benchmarks/src/com/aerospike/benchmarks/LatencyManagerYcsb.java +++ b/benchmarks/src/com/aerospike/benchmarks/LatencyManagerYcsb.java @@ -1,5 +1,5 @@ /* - * Copyright 2012-2021 Aerospike, Inc. + * Copyright 2012-2024 Aerospike, Inc. * * Portions may be licensed to Aerospike, Inc. under one or more contributor * license agreements WHICH ARE COMPATIBLE WITH THE APACHE LICENSE, VERSION 2.0. @@ -25,21 +25,21 @@ public class LatencyManagerYcsb implements LatencyManager { public static final String BUCKETS = "histogram.buckets"; public static final String BUCKETS_DEFAULT = "1000"; - private AtomicInteger _buckets; - private AtomicLongArray histogram; - private AtomicLong histogramoverflow; - private AtomicInteger operations; - private AtomicLong totallatency; - private AtomicInteger warmupCount; + private final AtomicInteger _buckets; + private final AtomicLongArray histogram; + private final AtomicLong histogramoverflow; + private final AtomicInteger operations; + private final AtomicLong totallatency; + private final AtomicInteger warmupCount; private volatile boolean warmupComplete = false; //keep a windowed version of these stats for printing status - private AtomicInteger windowoperations; - private AtomicLong windowtotallatency; + private final AtomicInteger windowoperations; + private final AtomicLong windowtotallatency; - private AtomicLong min; - private AtomicLong max; - private String name; + private final AtomicLong min; + private final AtomicLong max; + private final String name; public LatencyManagerYcsb(String name, int warmupCount) { this.name = name; @@ -125,7 +125,7 @@ public void printResults(PrintStream exporter, String prefix) { } } buffer.append(']'); - exporter.println(buffer.toString()); + exporter.println(buffer); windowoperations.set(0); windowtotallatency.set(0); } diff --git a/benchmarks/src/com/aerospike/benchmarks/Main.java b/benchmarks/src/com/aerospike/benchmarks/Main.java index 18bc82f22..dd5e35c63 100644 --- a/benchmarks/src/com/aerospike/benchmarks/Main.java +++ b/benchmarks/src/com/aerospike/benchmarks/Main.java @@ -96,8 +96,8 @@ public static void main(String[] args) { } } - private Arguments args = new Arguments(); - private Host[] hosts; + private final Arguments args = new Arguments(); + private final Host[] hosts; private EventLoopType eventLoopType = EventLoopType.DIRECT_NIO; private int port = 3000; private long nKeys; @@ -113,8 +113,8 @@ public static void main(String[] args) { private String filepath; private EventLoops eventLoops; - private ClientPolicy clientPolicy = new ClientPolicy(); - private CounterStore counters = new CounterStore(); + private final ClientPolicy clientPolicy = new ClientPolicy(); + private final CounterStore counters = new CounterStore(); public Main(String[] commandLineArgs) throws Exception { boolean hasTxns = false; @@ -1121,25 +1121,25 @@ else if (! level.equals("all")) { System.out.print("bin[" + binCount + "]: "); switch (spec.type) { - case INTEGER: - System.out.println("integer"); - break; + case INTEGER: + System.out.println("integer"); + break; - case STRING: - System.out.println("string[" + spec.size + "]"); - break; + case STRING: + System.out.println("string[" + spec.size + "]"); + break; - case BYTES: - System.out.println("byte[" + spec.size + "]"); - break; + case BYTES: + System.out.println("byte[" + spec.size + "]"); + break; - case RANDOM: - System.out.println("random[" + (spec.size * 8) + "]"); - break; + case RANDOM: + System.out.println("random[" + (spec.size * 8) + "]"); + break; - case TIMESTAMP: - System.out.println("timestamp"); - break; + case TIMESTAMP: + System.out.println("timestamp"); + break; } binCount++; } @@ -1164,7 +1164,7 @@ private static void logUsage(Options options) { String syntax = Main.class.getName() + " []"; formatter.printHelp(pw, 100, syntax, "options:", options, 0, 2, null); - System.out.println(sw.toString()); + System.out.println(sw); } private static String getLatencyUsage(String latencyString) { diff --git a/benchmarks/src/com/aerospike/benchmarks/RWTask.java b/benchmarks/src/com/aerospike/benchmarks/RWTask.java index c4e592976..3cbd59550 100644 --- a/benchmarks/src/com/aerospike/benchmarks/RWTask.java +++ b/benchmarks/src/com/aerospike/benchmarks/RWTask.java @@ -1,5 +1,5 @@ /* - * Copyright 2012-2021 Aerospike, Inc. + * Copyright 2012-2024 Aerospike, Inc. * * Portions may be licensed to Aerospike, Inc. under one or more contributor * license agreements WHICH ARE COMPATIBLE WITH THE APACHE LICENSE, VERSION 2.0. @@ -59,33 +59,33 @@ public void stop() { protected void runCommand(RandomShift random) { try { switch (args.workload) { - case READ_UPDATE: - case READ_REPLACE: - readUpdate(random); - break; + case READ_UPDATE: + case READ_REPLACE: + readUpdate(random); + break; - case READ_MODIFY_UPDATE: - readModifyUpdate(random); - break; + case READ_MODIFY_UPDATE: + readModifyUpdate(random); + break; - case READ_MODIFY_INCREMENT: - readModifyIncrement(random); - break; + case READ_MODIFY_INCREMENT: + readModifyIncrement(random); + break; - case READ_MODIFY_DECREMENT: - readModifyDecrement(random); - break; + case READ_MODIFY_DECREMENT: + readModifyDecrement(random); + break; - case READ_FROM_FILE: - readFromFile(random); - break; + case READ_FROM_FILE: + readFromFile(random); + break; - case TRANSACTION: - runTransaction(random); - break; + case TRANSACTION: + runTransaction(random); + break; - default: - break; + default: + break; } } catch (Exception e) { @@ -93,7 +93,7 @@ protected void runCommand(RandomShift random) { e.printStackTrace(); } else { - System.out.println("Exception - " + e.toString()); + System.out.println("Exception - " + e); } } } diff --git a/benchmarks/src/com/aerospike/benchmarks/RWTaskSync.java b/benchmarks/src/com/aerospike/benchmarks/RWTaskSync.java index 5d2ca4272..7a5dfd5db 100644 --- a/benchmarks/src/com/aerospike/benchmarks/RWTaskSync.java +++ b/benchmarks/src/com/aerospike/benchmarks/RWTaskSync.java @@ -222,11 +222,8 @@ protected void get(Key[] keys) { } private boolean skipKey(Key key) { - if (args.partitionIds != null && !args.partitionIds.contains(Partition.getPartitionId(key.digest))) { - return true; - } - return false; - } + return args.partitionIds != null && !args.partitionIds.contains(Partition.getPartitionId(key.digest)); + } private Key[] getFilteredKeys(Key[] keys) { List filteredKeys = new ArrayList<>(); diff --git a/benchmarks/src/com/aerospike/benchmarks/TransactionalItem.java b/benchmarks/src/com/aerospike/benchmarks/TransactionalItem.java index 1bce61121..d29c48c65 100644 --- a/benchmarks/src/com/aerospike/benchmarks/TransactionalItem.java +++ b/benchmarks/src/com/aerospike/benchmarks/TransactionalItem.java @@ -1,5 +1,5 @@ /* - * Copyright 2012-2021 Aerospike, Inc. + * Copyright 2012-2024 Aerospike, Inc. * * Portions may be licensed to Aerospike, Inc. under one or more contributor * license agreements WHICH ARE COMPATIBLE WITH THE APACHE LICENSE, VERSION 2.0. @@ -17,8 +17,9 @@ package com.aerospike.benchmarks; public class TransactionalItem { - private TransactionalType type; - private int repetitions; + private final TransactionalType type; + private final int repetitions; + public TransactionalItem(TransactionalType type, int repetitions) { super(); this.type = type; diff --git a/benchmarks/src/com/aerospike/benchmarks/TransactionalType.java b/benchmarks/src/com/aerospike/benchmarks/TransactionalType.java index 81e3545f6..0c74b4935 100644 --- a/benchmarks/src/com/aerospike/benchmarks/TransactionalType.java +++ b/benchmarks/src/com/aerospike/benchmarks/TransactionalType.java @@ -1,5 +1,5 @@ /* - * Copyright 2012-2021 Aerospike, Inc. + * Copyright 2012-2024 Aerospike, Inc. * * Portions may be licensed to Aerospike, Inc. under one or more contributor * license agreements WHICH ARE COMPATIBLE WITH THE APACHE LICENSE, VERSION 2.0. @@ -29,14 +29,16 @@ public enum TransactionalType { MULTI_BIN_REPLACE('P', false), MULTI_BIN_WRITE('W', false); - private char code; - private boolean read; - private boolean batch; + private final char code; + private final boolean read; + private final boolean batch; + private TransactionalType(char code, boolean isRead, boolean isBatch) { this.code = code; this.read = isRead; this.batch = isBatch; } + private TransactionalType(char code, boolean isRead) { this(code, isRead, false); } diff --git a/benchmarks/src/com/aerospike/benchmarks/TransactionalWorkload.java b/benchmarks/src/com/aerospike/benchmarks/TransactionalWorkload.java index b54287307..c8912678e 100644 --- a/benchmarks/src/com/aerospike/benchmarks/TransactionalWorkload.java +++ b/benchmarks/src/com/aerospike/benchmarks/TransactionalWorkload.java @@ -1,5 +1,5 @@ /* - * Copyright 2012-2021 Aerospike, Inc. + * Copyright 2012-2024 Aerospike, Inc. * * Portions may be licensed to Aerospike, Inc. under one or more contributor * license agreements WHICH ARE COMPATIBLE WITH THE APACHE LICENSE, VERSION 2.0. @@ -32,7 +32,7 @@ public class TransactionalWorkload implements Iterable{ private enum VariationType { PLUS, MINUS - }; + } // These options are derived and should not be set private int minReads; @@ -166,7 +166,7 @@ else if (varianceStr.matches("^\\d+$")) { private class WorkloadIterator implements Iterator { private int reads = 0; private int writes = 0; - private RandomShift random; + private final RandomShift random; private int fixedSequenceIndex = 0; public WorkloadIterator(RandomShift random) { diff --git a/benchmarks/src/com/aerospike/benchmarks/Workload.java b/benchmarks/src/com/aerospike/benchmarks/Workload.java index 8c049cb59..350c2c137 100644 --- a/benchmarks/src/com/aerospike/benchmarks/Workload.java +++ b/benchmarks/src/com/aerospike/benchmarks/Workload.java @@ -1,5 +1,5 @@ /* - * Copyright 2012-2021 Aerospike, Inc. + * Copyright 2012-2024 Aerospike, Inc. * * Portions may be licensed to Aerospike, Inc. under one or more contributor * license agreements WHICH ARE COMPATIBLE WITH THE APACHE LICENSE, VERSION 2.0. @@ -58,5 +58,5 @@ public enum Workload { /** * Form business transactions */ - TRANSACTION; + TRANSACTION } From 3482ff57b33ed5b8de34736ff4a52c940468e657 Mon Sep 17 00:00:00 2001 From: John Griffin Date: Mon, 15 Apr 2024 11:20:45 -0400 Subject: [PATCH 6/9] Propagate the original LuaC compile() exception details in AerospikeExceptions thrown by LuaCache. (#316) Co-authored-by: griffinjm --- client/src/com/aerospike/client/lua/LuaCache.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/client/src/com/aerospike/client/lua/LuaCache.java b/client/src/com/aerospike/client/lua/LuaCache.java index 51e080f5f..499fcf40c 100644 --- a/client/src/com/aerospike/client/lua/LuaCache.java +++ b/client/src/com/aerospike/client/lua/LuaCache.java @@ -63,7 +63,7 @@ public static final Prototype loadPackageFromFile(String packageName) { Packages.put(packageName, prototype); } catch (Throwable e) { - throw new AerospikeException("Failed to read file: " + source.getAbsolutePath()); + throw new AerospikeException("Failed to read file: " + source.getAbsolutePath(), e); } } return prototype; @@ -81,7 +81,7 @@ public static final Prototype loadPackageFromResource(ClassLoader resourceLoader Packages.put(packageName, prototype); } catch (Throwable e) { - throw new AerospikeException("Failed to read resource: " + resourcePath); + throw new AerospikeException("Failed to read resource: " + resourcePath, e); } } return prototype; @@ -94,7 +94,7 @@ private static Prototype compile(String packageName, InputStream is) { return LuaC.instance.compile(bis, packageName); } catch (Throwable e) { - throw new AerospikeException("Failed to compile: " + packageName); + throw new AerospikeException("Failed to compile: " + packageName, e); } } From 9c942748ac00e804a4b6b64007dac2bed7822f22 Mon Sep 17 00:00:00 2001 From: Brian Nichols Date: Thu, 25 Apr 2024 18:37:46 -0400 Subject: [PATCH 7/9] CLIENT-2902 Only validate error code instead of entire error string in createDrop() index test. --- client/src/com/aerospike/client/Info.java | 41 +++++++++++++++++++ .../aerospike/test/sync/query/TestIndex.java | 5 ++- 2 files changed, 44 insertions(+), 2 deletions(-) diff --git a/client/src/com/aerospike/client/Info.java b/client/src/com/aerospike/client/Info.java index 388f3ab4b..402ed015c 100644 --- a/client/src/com/aerospike/client/Info.java +++ b/client/src/com/aerospike/client/Info.java @@ -305,6 +305,47 @@ public static HashMap request(Connection conn) return info.parseMultiResponse(); } + //------------------------------------------------------- + // Parse Methods + //------------------------------------------------------- + + /** + * Parse info response string and return the result code for info commands + * that only return OK or an error string. Info commands that return other + * data are not handled by this method. + */ + public static int parseResultCode(String response) { + if (response.regionMatches(true, 0, "OK", 0, 2)) { + return ResultCode.OK; + } + + // Error format: ERROR|FAIL[:][:] + try { + String[] list = response.split(":"); + String s = list[0]; + + if (s.regionMatches(true, 0, "FAIL", 0, 4) || + s.regionMatches(true, 0, "ERROR", 0, 5)) { + + if (list.length > 1) { + s = list[1].trim(); + + if (! s.isEmpty()) { + return Integer.parseInt(s); + } + } + return ResultCode.SERVER_ERROR; + } + throw new AerospikeException("Unrecognized info response: " + response); + } + catch (AerospikeException ae) { + throw ae; + } + catch (Throwable t) { + throw new AerospikeException("Unrecognized info response: " + response, t); + } + } + //------------------------------------------------------- // Member variables. //------------------------------------------------------- diff --git a/test/src/com/aerospike/test/sync/query/TestIndex.java b/test/src/com/aerospike/test/sync/query/TestIndex.java index 404dc484c..ca97c54ba 100644 --- a/test/src/com/aerospike/test/sync/query/TestIndex.java +++ b/test/src/com/aerospike/test/sync/query/TestIndex.java @@ -26,7 +26,6 @@ import com.aerospike.client.Value; import com.aerospike.client.cdt.CTX; import com.aerospike.client.cluster.Node; -import com.aerospike.client.policy.Policy; import com.aerospike.client.query.IndexType; import com.aerospike.client.task.IndexTask; import com.aerospike.test.sync.TestSync; @@ -62,7 +61,9 @@ public void createDrop() { for (Node node : nodes) { String response = Info.request(node, cmd); - assertEquals(response, "FAIL:201:no-index"); + int code = Info.parseResultCode(response); + + assertEquals(code, 201); } } From a7afc60fa19fe63d0d7a76c831ffd779a09801d9 Mon Sep 17 00:00:00 2001 From: Brian Nichols Date: Tue, 7 May 2024 09:56:21 -0400 Subject: [PATCH 8/9] Upgrade to netty 4.1.108.Final and commons-cli 1.7.0 per snyk. --- pom.xml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pom.xml b/pom.xml index 48ca85c8b..a75ae3ec1 100644 --- a/pom.xml +++ b/pom.xml @@ -39,12 +39,12 @@ 2.18.1 3.2.0 - 4.1.107.Final + 4.1.108.Final 2.0.62.Final 1.59.0 3.0.1 0.4 - 1.6.0 + 1.7.0 4.13.1 From 39e923c1b41f7d84928a09f90c2e6b022177fdda Mon Sep 17 00:00:00 2001 From: Brian Nichols Date: Tue, 7 May 2024 10:21:24 -0400 Subject: [PATCH 9/9] Update version 8.1.1 --- benchmarks/pom.xml | 2 +- client/pom.xml | 2 +- examples/pom.xml | 2 +- pom.xml | 2 +- proxy/pom.xml | 2 +- test/pom.xml | 2 +- 6 files changed, 6 insertions(+), 6 deletions(-) diff --git a/benchmarks/pom.xml b/benchmarks/pom.xml index 549857d2d..d142e8dfb 100644 --- a/benchmarks/pom.xml +++ b/benchmarks/pom.xml @@ -6,7 +6,7 @@ com.aerospike aerospike-parent - 8.1.0 + 8.1.1 aerospike-benchmarks jar diff --git a/client/pom.xml b/client/pom.xml index 9457604a3..f575c3e25 100644 --- a/client/pom.xml +++ b/client/pom.xml @@ -6,7 +6,7 @@ com.aerospike aerospike-parent - 8.1.0 + 8.1.1 aerospike-client-jdk21 jar diff --git a/examples/pom.xml b/examples/pom.xml index dfbaa565b..318c231be 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -6,7 +6,7 @@ com.aerospike aerospike-parent - 8.1.0 + 8.1.1 aerospike-examples jar diff --git a/pom.xml b/pom.xml index a75ae3ec1..f22dbf19c 100644 --- a/pom.xml +++ b/pom.xml @@ -5,7 +5,7 @@ com.aerospike aerospike-parent aerospike-parent - 8.1.0 + 8.1.1 pom https://github.com/aerospike/aerospike-client-java diff --git a/proxy/pom.xml b/proxy/pom.xml index ef74d5f45..0c269eb7f 100644 --- a/proxy/pom.xml +++ b/proxy/pom.xml @@ -6,7 +6,7 @@ com.aerospike aerospike-parent - 8.1.0 + 8.1.1 aerospike-proxy-client jar diff --git a/test/pom.xml b/test/pom.xml index 618322ccc..218b6c7ba 100644 --- a/test/pom.xml +++ b/test/pom.xml @@ -6,7 +6,7 @@ com.aerospike aerospike-parent - 8.1.0 + 8.1.1 aerospike-client-test jar