diff --git a/benchmarks/src/main/java/zipkin2/elasticsearch/internal/BulkRequestBenchmarks.java b/benchmarks/src/main/java/zipkin2/elasticsearch/internal/BulkRequestBenchmarks.java index 051d046ccc4..8758ef95cfc 100644 --- a/benchmarks/src/main/java/zipkin2/elasticsearch/internal/BulkRequestBenchmarks.java +++ b/benchmarks/src/main/java/zipkin2/elasticsearch/internal/BulkRequestBenchmarks.java @@ -1,5 +1,5 @@ /* - * Copyright 2015-2020 The OpenZipkin Authors + * Copyright 2015-2023 The OpenZipkin Authors * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at @@ -14,7 +14,6 @@ package zipkin2.elasticsearch.internal; import com.linecorp.armeria.common.HttpRequest; -import com.linecorp.armeria.common.HttpRequestWriter; import io.netty.buffer.ByteBuf; import io.netty.buffer.PooledByteBufAllocator; import java.util.concurrent.TimeUnit; @@ -36,7 +35,6 @@ import zipkin2.codec.SpanBytesDecoder; import zipkin2.elasticsearch.ElasticsearchStorage; import zipkin2.elasticsearch.internal.BulkCallBuilder.IndexEntry; -import zipkin2.elasticsearch.internal.client.HttpCall; import static java.nio.charset.StandardCharsets.UTF_8; import static zipkin2.elasticsearch.ElasticsearchVersion.V6_0; @@ -67,10 +65,7 @@ public class BulkRequestBenchmarks { @Benchmark public HttpRequest buildAndWriteRequest_singleSpan() { BulkCallBuilder builder = new BulkCallBuilder(es, V6_0, "index-span"); builder.index(spanIndex, "span", CLIENT_SPAN, BulkIndexWriter.SPAN); - HttpCall.RequestSupplier supplier = builder.build().request; - HttpRequestWriter request = HttpRequest.streaming(supplier.headers()); - supplier.writeBody(request::tryWrite); - return request; + return builder.build().request.get(); } @Benchmark public HttpRequest buildAndWriteRequest_tenSpans() { @@ -78,10 +73,7 @@ public class BulkRequestBenchmarks { for (int i = 0; i < 10; i++) { builder.index(spanIndex, "span", CLIENT_SPAN, BulkIndexWriter.SPAN); } - HttpCall.RequestSupplier supplier = builder.build().request; - HttpRequestWriter request = HttpRequest.streaming(supplier.headers()); - supplier.writeBody(request::tryWrite); - return request; + return builder.build().request.get(); } // Convenience main entry-point diff --git a/zipkin-server/src/main/resources/zipkin-server-shared.yml b/zipkin-server/src/main/resources/zipkin-server-shared.yml index 60121a65b86..2eada30b3f8 100644 --- a/zipkin-server/src/main/resources/zipkin-server-shared.yml +++ b/zipkin-server/src/main/resources/zipkin-server-shared.yml @@ -267,7 +267,7 @@ logging: com.linecorp.armeria: 'WARN' # # But allow to say it's ready to serve requests com.linecorp.armeria.server.Server: 'INFO' - # kafka is quite chatty so we switch everything off by default + # kafka is quite chatty, so we switch everything off by default org.apache.kafka: 'OFF' # # investigate /api/v2/dependencies # zipkin2.internal.DependencyLinker: 'DEBUG' diff --git a/zipkin-server/src/test/java/zipkin2/server/internal/elasticsearch/ITElasticsearchHealthCheck.java b/zipkin-server/src/test/java/zipkin2/server/internal/elasticsearch/ITElasticsearchHealthCheck.java index 27462589e65..691591e9465 100644 --- a/zipkin-server/src/test/java/zipkin2/server/internal/elasticsearch/ITElasticsearchHealthCheck.java +++ b/zipkin-server/src/test/java/zipkin2/server/internal/elasticsearch/ITElasticsearchHealthCheck.java @@ -1,5 +1,5 @@ /* - * Copyright 2015-2020 The OpenZipkin Authors + * Copyright 2015-2023 The OpenZipkin Authors * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at @@ -13,12 +13,15 @@ */ package zipkin2.server.internal.elasticsearch; -import com.linecorp.armeria.client.endpoint.EmptyEndpointGroupException; import com.linecorp.armeria.client.endpoint.EndpointSelectionTimeoutException; +import com.linecorp.armeria.common.AggregatedHttpResponse; +import com.linecorp.armeria.common.HttpRequest; +import com.linecorp.armeria.common.HttpResponse; import com.linecorp.armeria.server.ServerBuilder; import com.linecorp.armeria.server.healthcheck.HealthCheckService; import com.linecorp.armeria.server.healthcheck.SettableHealthChecker; import com.linecorp.armeria.testing.junit4.server.ServerRule; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import javax.net.ssl.SSLException; import org.awaitility.core.ConditionFactory; @@ -47,26 +50,36 @@ public class ITElasticsearchHealthCheck { static final SettableHealthChecker server1Health = new SettableHealthChecker(true); - static { - // Gives better context when there's an exception such as AbortedStreamException - System.setProperty("com.linecorp.armeria.verboseExceptions", "always"); - } - @ClassRule public static ServerRule server1 = new ServerRule() { @Override protected void configure(ServerBuilder sb) { - sb.service("/", (ctx, req) -> VERSION_RESPONSE.toHttpResponse()); + sb.service("/", (ctx, req) -> sendResponseAfterAggregate(req, VERSION_RESPONSE)); sb.service("/_cluster/health", HealthCheckService.of(server1Health)); sb.serviceUnder("/_cluster/health/", (ctx, req) -> GREEN_RESPONSE.toHttpResponse()); } }; + /** This ensures the response is sent after the request is fully read. */ + private static HttpResponse sendResponseAfterAggregate(HttpRequest req, + AggregatedHttpResponse response) { + final CompletableFuture future = new CompletableFuture<>(); + req.aggregate().whenComplete((aggregatedReq, cause) -> { + if (cause != null) { + future.completeExceptionally(cause); + } else { + future.complete(response.toHttpResponse()); + } + }); + return HttpResponse.from(future); + } + static final SettableHealthChecker server2Health = new SettableHealthChecker(true); @ClassRule public static ServerRule server2 = new ServerRule() { @Override protected void configure(ServerBuilder sb) { - sb.service("/", (ctx, req) -> VERSION_RESPONSE.toHttpResponse()); + sb.service("/", (ctx, req) -> sendResponseAfterAggregate(req, VERSION_RESPONSE)); sb.service("/_cluster/health", HealthCheckService.of(server2Health)); - sb.serviceUnder("/_cluster/health/", (ctx, req) -> GREEN_RESPONSE.toHttpResponse()); + sb.serviceUnder("/_cluster/health/", + (ctx, req) -> sendResponseAfterAggregate(req, GREEN_RESPONSE)); } }; @@ -99,7 +112,9 @@ private void initWithHosts(String hosts) { @Test public void allHealthy() { try (ElasticsearchStorage storage = context.getBean(ElasticsearchStorage.class)) { - assertOk(storage.check()); + + // There's an initialization delay, so await instead of expect everything up now. + awaitTimeout.untilAsserted(() -> assertThat(storage.check().ok()).isTrue()); } } diff --git a/zipkin-storage/elasticsearch/src/main/java/zipkin2/elasticsearch/internal/BulkCallBuilder.java b/zipkin-storage/elasticsearch/src/main/java/zipkin2/elasticsearch/internal/BulkCallBuilder.java index efd74255239..148c2019eba 100644 --- a/zipkin-storage/elasticsearch/src/main/java/zipkin2/elasticsearch/internal/BulkCallBuilder.java +++ b/zipkin-storage/elasticsearch/src/main/java/zipkin2/elasticsearch/internal/BulkCallBuilder.java @@ -1,5 +1,5 @@ /* - * Copyright 2015-2020 The OpenZipkin Authors + * Copyright 2015-2023 The OpenZipkin Authors * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at @@ -20,6 +20,8 @@ import com.linecorp.armeria.common.HttpData; import com.linecorp.armeria.common.HttpHeaderNames; import com.linecorp.armeria.common.HttpMethod; +import com.linecorp.armeria.common.HttpRequest; +import com.linecorp.armeria.common.HttpRequestWriter; import com.linecorp.armeria.common.MediaType; import com.linecorp.armeria.common.RequestContext; import com.linecorp.armeria.common.RequestHeaders; @@ -31,6 +33,7 @@ import io.netty.handler.codec.http.QueryStringEncoder; import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.concurrent.RejectedExecutionException; import java.util.function.Supplier; @@ -112,7 +115,9 @@ public void index(String index, String typeName, T input, BulkIndexWriter entries.add(newIndexEntry(index, typeName, input, writer)); } - /** Creates a bulk request when there is more than one object to store */ + /** + * Creates a bulk request when there is more than one object to store + */ public HttpCall build() { QueryStringEncoder urlBuilder = new QueryStringEncoder("/_bulk"); if (pipeline != null) urlBuilder.addParam("pipeline", pipeline); @@ -139,7 +144,7 @@ static class BulkRequestSupplier implements HttpCall.RequestSupplier { BulkRequestSupplier(List> entries, boolean shouldAddType, RequestHeaders headers, ByteBufAllocator alloc) { - this.entries = entries; + this.entries = Collections.unmodifiableList(entries); this.shouldAddType = shouldAddType; this.headers = headers; this.alloc = alloc; @@ -149,13 +154,29 @@ static class BulkRequestSupplier implements HttpCall.RequestSupplier { return headers; } - @Override public void writeBody(HttpCall.RequestStream requestStream) { - for (IndexEntry entry : entries) { - if (!requestStream.tryWrite(HttpData.wrap(serialize(alloc, entry, shouldAddType)))) { - // Stream aborted, no need to serialize anymore. - return; - } + @Override public HttpRequest get() { + HttpRequestWriter writer = HttpRequest.streaming(headers); + writeEntry(writer, 0); + return writer; + } + + // There's a high chance that the response is received before the request + // is complete. This can be a problem for BulkCallBuilder when it's sending + // streaming requests. Hence, we use backpressure, instead of buffering. + // + // Follow https://github.com/line/armeria/issues/3119 for doc updates. + private void writeEntry(HttpRequestWriter writer, int index) { + if (index == entries.size()) { // out of entries. + writer.close(); + return; + } + // Write the current entry directly to the current request. + if (!writer.tryWrite(HttpData.wrap(serialize(alloc, entries.get(index), shouldAddType)))) { + // Stream aborted, no need to serialize anymore. + return; } + // Recurse to proceed to the next entry, if any. + writer.whenConsumed().thenRun(() -> writeEntry(writer, index + 1)); } } diff --git a/zipkin-storage/elasticsearch/src/main/java/zipkin2/elasticsearch/internal/client/HttpCall.java b/zipkin-storage/elasticsearch/src/main/java/zipkin2/elasticsearch/internal/client/HttpCall.java index b1a0593de8d..6f9dd13d128 100644 --- a/zipkin-storage/elasticsearch/src/main/java/zipkin2/elasticsearch/internal/client/HttpCall.java +++ b/zipkin-storage/elasticsearch/src/main/java/zipkin2/elasticsearch/internal/client/HttpCall.java @@ -1,5 +1,5 @@ /* - * Copyright 2015-2020 The OpenZipkin Authors + * Copyright 2015-2023 The OpenZipkin Authors * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at @@ -23,7 +23,6 @@ import com.linecorp.armeria.common.HttpData; import com.linecorp.armeria.common.HttpHeaders; import com.linecorp.armeria.common.HttpRequest; -import com.linecorp.armeria.common.HttpRequestWriter; import com.linecorp.armeria.common.HttpResponse; import com.linecorp.armeria.common.HttpStatus; import com.linecorp.armeria.common.HttpStatusClass; @@ -55,34 +54,15 @@ public interface BodyConverter { V convert(JsonParser parser, Supplier contentString) throws IOException; } - /** - * A request stream which can have {@link HttpData} of the request body written to it. - */ - public interface RequestStream { - /** - * Writes the {@link HttpData} to the stream. Returns {@code false} if the stream has been - * aborted (e.g., the request timed out while writing), or {@code true} otherwise. - */ - boolean tryWrite(HttpData obj); - } - /** * A supplier of {@linkplain HttpHeaders headers} and {@linkplain HttpData body} of a request to * Elasticsearch. */ - public interface RequestSupplier { + public interface RequestSupplier extends Supplier { /** * Returns the {@linkplain HttpHeaders headers} for this request. */ RequestHeaders headers(); - - /** - * Writes the body of this request into the {@link RequestStream}. {@link - * RequestStream#tryWrite(HttpData)} can be called any number of times to publish any number of - * payload objects. It can be useful to split up a large payload into smaller chunks instead of - * buffering everything as one payload. - */ - void writeBody(RequestStream requestStream); } static class AggregatedRequestSupplier implements RequestSupplier { @@ -106,8 +86,8 @@ static class AggregatedRequestSupplier implements RequestSupplier { return request.headers(); } - @Override public void writeBody(RequestStream requestStream) { - requestStream.tryWrite(request.content()); + @Override public HttpRequest get() { + return request.toHttpRequest(); } } @@ -205,10 +185,7 @@ CompletableFuture sendRequest() { final HttpResponse response; try (SafeCloseable ignored = Clients.withContextCustomizer(ctx -> ctx.logBuilder().name(name))) { - HttpRequestWriter httpRequest = HttpRequest.streaming(request.headers()); - response = httpClient.execute(httpRequest); - request.writeBody(httpRequest::tryWrite); - httpRequest.close(); + response = httpClient.execute(request.get()); } CompletableFuture responseFuture = RequestContext.mapCurrent( diff --git a/zipkin-storage/elasticsearch/src/main/java/zipkin2/elasticsearch/internal/client/SearchCallFactory.java b/zipkin-storage/elasticsearch/src/main/java/zipkin2/elasticsearch/internal/client/SearchCallFactory.java index 564b1d64813..ccbaa7615fc 100644 --- a/zipkin-storage/elasticsearch/src/main/java/zipkin2/elasticsearch/internal/client/SearchCallFactory.java +++ b/zipkin-storage/elasticsearch/src/main/java/zipkin2/elasticsearch/internal/client/SearchCallFactory.java @@ -1,5 +1,5 @@ /* - * Copyright 2015-2019 The OpenZipkin Authors + * Copyright 2015-2023 The OpenZipkin Authors * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at diff --git a/zipkin-storage/elasticsearch/src/test/java/zipkin2/elasticsearch/internal/client/HttpCallTest.java b/zipkin-storage/elasticsearch/src/test/java/zipkin2/elasticsearch/internal/client/HttpCallTest.java index 824d3037912..bbe50d653f8 100644 --- a/zipkin-storage/elasticsearch/src/test/java/zipkin2/elasticsearch/internal/client/HttpCallTest.java +++ b/zipkin-storage/elasticsearch/src/test/java/zipkin2/elasticsearch/internal/client/HttpCallTest.java @@ -1,5 +1,5 @@ /* - * Copyright 2015-2020 The OpenZipkin Authors + * Copyright 2015-2023 The OpenZipkin Authors * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at @@ -20,6 +20,7 @@ import com.linecorp.armeria.common.AggregatedHttpResponse; import com.linecorp.armeria.common.HttpData; import com.linecorp.armeria.common.HttpMethod; +import com.linecorp.armeria.common.HttpRequest; import com.linecorp.armeria.common.HttpStatus; import com.linecorp.armeria.common.RequestHeaders; import com.linecorp.armeria.common.ResponseHeaders; @@ -253,13 +254,15 @@ class HttpCallTest { server.enqueue(SUCCESS_RESPONSE); HttpCall.RequestSupplier supplier = new HttpCall.RequestSupplier() { + + final RequestHeaders headers = RequestHeaders.of(HttpMethod.POST, "/"); + @Override public RequestHeaders headers() { - return RequestHeaders.of(HttpMethod.POST, "/"); + return headers; } - @Override public void writeBody(HttpCall.RequestStream requestStream) { - requestStream.tryWrite(HttpData.ofUtf8("hello")); - requestStream.tryWrite(HttpData.ofUtf8(" world")); + @Override public HttpRequest get() { + return HttpRequest.of(headers, HttpData.ofUtf8("hello"), HttpData.ofUtf8(" world")); } };