Skip to content

Commit

Permalink
Release memory where possible in RestElasticSearchClient
Browse files Browse the repository at this point in the history
Closes JanusGraph#4684

Signed-off-by: Allan Clements <[email protected]>
  • Loading branch information
criminosis committed Sep 26, 2024
1 parent 4a576f6 commit 6703d18
Showing 1 changed file with 47 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterators;
import com.google.common.collect.PeekingIterator;
import org.apache.http.HttpEntity;
import org.apache.http.HttpStatus;
import org.apache.http.entity.ByteArrayEntity;
Expand Down Expand Up @@ -50,17 +48,16 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
Expand Down Expand Up @@ -426,28 +423,41 @@ class RequestBytes {
@VisibleForTesting
int getSerializedSize() {
int serializedSize = this.requestBytes.length;
serializedSize+= 1; //For follow-up NEW_LINE_BYTES
serializedSize += 1; //For follow-up NEW_LINE_BYTES
if (this.requestSource != null) {
serializedSize += this.requestSource.length;
serializedSize+= 1; //For follow-up NEW_LINE_BYTES
serializedSize += 1; //For follow-up NEW_LINE_BYTES
}
return serializedSize;
}

private void writeTo(OutputStream outputStream) throws IOException {
outputStream.write(this.requestBytes);
outputStream.write(NEW_LINE_BYTES);
private int writeTo(byte[] target, int initialOffset) {
int offset = initialOffset;
System.arraycopy(this.requestBytes, 0, target, offset, this.requestBytes.length);
offset += this.requestBytes.length;
System.arraycopy(NEW_LINE_BYTES, 0, target, offset, NEW_LINE_BYTES.length);
offset += NEW_LINE_BYTES.length;
if (this.requestSource != null) {
outputStream.write(requestSource);
outputStream.write(NEW_LINE_BYTES);
System.arraycopy(this.requestSource, 0, target, offset, this.requestSource.length);
offset += this.requestSource.length;
System.arraycopy(NEW_LINE_BYTES, 0, target, offset, NEW_LINE_BYTES.length);
offset += NEW_LINE_BYTES.length;
}
return offset;
}
}

private Pair<String, byte[]> buildBulkRequestInput(List<RequestBytes> requests, String ingestPipeline) throws IOException {
final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
private Pair<String, byte[]> buildBulkRequestInput(List<RequestBytes> requests, String ingestPipeline) {
int totalBytes = requests.stream().mapToInt(RequestBytes::getSerializedSize).sum();
//By making a singular array we copy into avoids any dynamically expanded growth of the array that may overshoot
//how much memory we actually need, additionally it also avoids a final copy at the end normally done by
//ByteArrayOutputStream's toByteArray()
byte[] bytes = new byte[totalBytes];
int offset = 0;
for (final RequestBytes request : requests) {
request.writeTo(outputStream);
//We can't remove the element from the collection like we do elsewhere, because we need to retain the
//serialized form in case of an error so the error can be paired to the originating request based on index
offset = request.writeTo(bytes, offset);
}

final StringBuilder bulkRequestQueryParameters = new StringBuilder();
Expand All @@ -458,7 +468,7 @@ private Pair<String, byte[]> buildBulkRequestInput(List<RequestBytes> requests,
APPEND_OP.apply(bulkRequestQueryParameters).append("refresh=").append(bulkRefresh);
}
final String bulkRequestPath = REQUEST_SEPARATOR + "_bulk" + bulkRequestQueryParameters;
return Pair.with(bulkRequestPath, outputStream.toByteArray());
return Pair.with(bulkRequestPath, bytes);
}

private List<Triplet<Object, Integer, RequestBytes>> pairErrorsWithSubmittedMutation(
Expand Down Expand Up @@ -490,14 +500,20 @@ class BulkRequestChunker implements Iterator<List<RequestBytes>> {
//There is no "correct" number of actions to perform in a single bulk request. Experiment with different
// settings to find the optimal size for your particular workload. Note that Elasticsearch limits the maximum
// size of a HTTP request to 100mb by default
private final PeekingIterator<RequestBytes> requestIterator;
private final ListIterator<RequestBytes> requestIterator;
private final int[] exceptionallyLargeRequests;
private RequestBytes peeked;

@VisibleForTesting
BulkRequestChunker(List<ElasticSearchMutation> requests) throws JsonProcessingException {
List<RequestBytes> serializedRequests = new ArrayList<>(requests.size());
List<Integer> requestSizesThatWereTooLarge = new ArrayList<>();
for (ElasticSearchMutation request : requests) {
ListIterator<ElasticSearchMutation> requestsIter = requests.listIterator();
while (requestsIter.hasNext()) {
ElasticSearchMutation request = requestsIter.next();
//Remove the element from the collection so the collection's reference to it doesn't hold it from being
//GC'ed after it has been converted to its serialized form
requestsIter.set(null);
RequestBytes requestBytes = new RequestBytes(request);
int requestSerializedSize = requestBytes.getSerializedSize();
if (requestSerializedSize <= bulkChunkSerializedLimitBytes) {
Expand All @@ -507,7 +523,7 @@ class BulkRequestChunker implements Iterator<List<RequestBytes>> {
requestSizesThatWereTooLarge.add(requestSerializedSize);
}
}
this.requestIterator = Iterators.peekingIterator(serializedRequests.iterator());
this.requestIterator = serializedRequests.listIterator();
//Condense request sizes that are too large into an int array to remove Boxed & List memory overhead
this.exceptionallyLargeRequests = requestSizesThatWereTooLarge.isEmpty() ? null :
requestSizesThatWereTooLarge.stream().mapToInt(Integer::intValue).toArray();
Expand All @@ -517,20 +533,29 @@ class BulkRequestChunker implements Iterator<List<RequestBytes>> {
public boolean hasNext() {
//Make sure hasNext() still returns true if exceptionally large requests were attempted to be submitted
//This allows next() to throw after all well sized requests have been chunked for submission
return requestIterator.hasNext() || exceptionallyLargeRequests != null;
return peeked != null || requestIterator.hasNext() || exceptionallyLargeRequests != null;
}

@Override
public List<RequestBytes> next() {
List<RequestBytes> serializedRequests = new ArrayList<>();
int chunkSerializedTotal = 0;
//If we peeked at something but stopped on it, then add it to this list
chunkSerializedTotal += peeked.getSerializedSize();
serializedRequests.add(peeked);
peeked = null;

while (requestIterator.hasNext()) {
RequestBytes peeked = requestIterator.peek();
chunkSerializedTotal += peeked.getSerializedSize();
RequestBytes next = requestIterator.next();
//Remove the element from the collection, so the iterator doesn't prevent it from being GC'ed
//due to the reference to it in the collection
requestIterator.set(null);
chunkSerializedTotal += next.getSerializedSize();
if (chunkSerializedTotal <= bulkChunkSerializedLimitBytes) {
serializedRequests.add(requestIterator.next());
serializedRequests.add(next);
} else {
//Adding this element would exceed the limit, so return the chunk
this.peeked = next;
return serializedRequests;
}
}
Expand Down

1 comment on commit 6703d18

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Benchmark

Benchmark suite Current: 6703d18 Previous: 4a576f6 Ratio
org.janusgraph.JanusGraphSpeedBenchmark.basicAddAndDelete 12603.64602179322 ms/op 12387.503702656188 ms/op 1.02
org.janusgraph.GraphCentricQueryBenchmark.getVertices 909.0547980730987 ms/op 881.5787764770677 ms/op 1.03
org.janusgraph.MgmtOlapJobBenchmark.runClearIndex 216.69933882282612 ms/op 216.02639356231884 ms/op 1.00
org.janusgraph.MgmtOlapJobBenchmark.runReindex 323.19491292583336 ms/op 319.4162031316667 ms/op 1.01
org.janusgraph.JanusGraphSpeedBenchmark.basicCount 247.44376959423494 ms/op 214.43612039488983 ms/op 1.15
org.janusgraph.CQLMultiQueryMultiSlicesBenchmark.getValuesAllPropertiesWithAllMultiQuerySlicesUnderMaxRequestsPerConnection 4895.888044343626 ms/op 4620.418355055224 ms/op 1.06
org.janusgraph.CQLMultiQueryBenchmark.getElementsWithUsingEmitRepeatSteps 17069.176038236903 ms/op 16143.572659017893 ms/op 1.06
org.janusgraph.CQLMultiQueryMultiSlicesBenchmark.getValuesMultiplePropertiesWithSmallBatch 20211.095525800607 ms/op 20253.546099167273 ms/op 1.00
org.janusgraph.CQLMultiQueryMultiSlicesBenchmark.vertexCentricPropertiesFetching 57128.87149903334 ms/op 56628.47083503333 ms/op 1.01
org.janusgraph.CQLMultiQueryDropBenchmark.dropVertices 1565.9244634741292 ms/op 1521.104838248883 ms/op 1.03
org.janusgraph.CQLMultiQueryBenchmark.getAllElementsTraversedFromOuterVertex 8354.302320786293 ms/op 7631.26260980391 ms/op 1.09
org.janusgraph.CQLMultiQueryBenchmark.getVerticesWithDoubleUnion 389.17480489924844 ms/op 376.440884295281 ms/op 1.03
org.janusgraph.CQLMultiQueryMultiSlicesBenchmark.getValuesAllPropertiesWithUnlimitedBatch 3920.6775941553115 ms/op 4082.0649584542043 ms/op 0.96
org.janusgraph.CQLMultiQueryBenchmark.getNames 8291.188345767689 ms/op 7858.0254477321005 ms/op 1.06
org.janusgraph.CQLMultiQueryMultiSlicesBenchmark.getValuesThreePropertiesWithAllMultiQuerySlicesUnderMaxRequestsPerConnection 5740.437964757775 ms/op 5530.8370018318055 ms/op 1.04
org.janusgraph.CQLMultiQueryBenchmark.getLabels 7062.94879464218 ms/op 6651.821376948318 ms/op 1.06
org.janusgraph.CQLMultiQueryBenchmark.getVerticesFilteredByAndStep 441.90285392598463 ms/op 413.53179240439476 ms/op 1.07
org.janusgraph.CQLMultiQueryBenchmark.getVerticesFromMultiNestedRepeatStepStartingFromSingleVertex 12830.304765907304 ms/op 12753.082837076472 ms/op 1.01
org.janusgraph.CQLMultiQueryBenchmark.getVerticesWithCoalesceUsage 369.0876763191086 ms/op 355.9563125681389 ms/op 1.04
org.janusgraph.CQLMultiQueryMultiSlicesBenchmark.getValuesMultiplePropertiesWithAllMultiQuerySlicesUnderMaxRequestsPerConnection 14655.249360794285 ms/op 14119.051614935463 ms/op 1.04
org.janusgraph.CQLMultiQueryBenchmark.getIdToOutVerticesProjection 253.6822421243298 ms/op 250.88565330056014 ms/op 1.01
org.janusgraph.CQLMultiQueryMultiSlicesBenchmark.getValuesMultiplePropertiesWithUnlimitedBatch 14511.919921357507 ms/op 14488.354575075491 ms/op 1.00
org.janusgraph.CQLMultiQueryBenchmark.getNeighborNames 8188.6946062051775 ms/op 7873.668866501788 ms/op 1.04
org.janusgraph.CQLMultiQueryBenchmark.getElementsWithUsingRepeatUntilSteps 9060.969990996053 ms/op 8690.493274357672 ms/op 1.04
org.janusgraph.CQLMultiQueryBenchmark.getAdjacentVerticesLocalCounts 8622.17558440736 ms/op 8127.869559427535 ms/op 1.06

This comment was automatically generated by workflow using github-action-benchmark.

Please sign in to comment.