Skip to content

Commit

Permalink
Add retries to Data.read (#11269)
Browse files Browse the repository at this point in the history
* Add retries to HTTP Get requests

A quick solution to random network failures for GET HTTP requests.
Should reduce the number of IOExceptions that users see while fetching
data.

* Use homemade retry logic for http requests

* Add retries to whole Data.read

Previously, we added retries only to fetch HTTP_Request. That was
insufficient as intermittent errors might happen while reading body's
stream.

Enhanced our simple server's crash endpoint to allow for different kind
of failures as well as simulate random failures.

* Remove retries from Java

Increased the scope of retries in the previous commit.

* nit

* Address PR comments

* PR comments

* Remove builtin
  • Loading branch information
hubertp authored Oct 10, 2024
1 parent 669ac97 commit 468b643
Show file tree
Hide file tree
Showing 4 changed files with 104 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@ import project.Data.Text.Text
import project.Data.Vector.Vector
import project.Enso_Cloud.Data_Link.Data_Link
import project.Enso_Cloud.Data_Link_Helpers
import project.Error.Error
import project.Errors.Deprecated.Deprecated
import project.Errors.Problem_Behavior.Problem_Behavior
import project.Metadata.Display
import project.Metadata.Widget
import project.Network.HTTP.HTTP
import project.Network.HTTP.HTTP_Error.HTTP_Error
import project.Network.HTTP.HTTP_Method.HTTP_Method
import project.Network.URI.URI
import project.Warning.Warning
Expand All @@ -20,6 +22,8 @@ from project.Metadata.Choice import Option
from project.Metadata.Widget import Single_Choice
from project.System.File_Format import Auto_Detect, format_types

polyglot java import java.lang.Thread

## PRIVATE
looks_like_uri path:Text -> Boolean =
(path.starts_with "http://" Case_Sensitivity.Insensitive) || (path.starts_with "https://" Case_Sensitivity.Insensitive)
Expand All @@ -28,8 +32,19 @@ looks_like_uri path:Text -> Boolean =
A common implementation for fetching a resource and decoding it,
following encountered data links.
fetch_following_data_links (uri:URI) (method:HTTP_Method = HTTP_Method.Get) (headers:Vector = []) format =
response = HTTP.fetch uri method headers
decode_http_response_following_data_links response format
fetch_and_decode =
response = HTTP.fetch uri method headers
decode_http_response_following_data_links response format

error_handler attempt =
caught_error ->
exponential_backoff = [100, 200, 400]
if method != HTTP_Method.Get || attempt >= exponential_backoff.length then Error.throw caught_error else
sleep_time = exponential_backoff . get attempt
Thread.sleep sleep_time
fetch_and_decode . catch HTTP_Error (error_handler (attempt + 1))

fetch_and_decode . catch HTTP_Error (error_handler 0)

## PRIVATE
Decodes a HTTP response, handling data link access.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
Internal threading utilities used for working with threads.

import project.Any.Any
import project.Data.Numbers.Integer

## PRIVATE
ADVANCED
Expand Down
18 changes: 14 additions & 4 deletions test/Base_Tests/src/Network/Http_Spec.enso
Original file line number Diff line number Diff line change
Expand Up @@ -591,7 +591,7 @@ add_specs suite_builder =

## Checking this error partially as a warning - I spent a lot of time debugging why I'm getting such an error.
Apparently it happens when the httpbin server was crashing without sending any response.
group_builder.specify "should be able to handle server crash resulting in no response" pending=pending_has_url <| Test.with_retries <|
group_builder.specify "should be able to handle server crash resulting in no response" pending=pending_has_url <|
err = Data.fetch (base_url_with_slash+"crash")
err.should_fail_with Request_Error
err.catch.error_type . should_equal "java.io.IOException"
Expand All @@ -600,10 +600,20 @@ add_specs suite_builder =
I think it may be worth adding, because it may be really quite confusing for end users who get that kind of error.
err.catch.message . should_equal "HTTP/1.1 header parser received no bytes"

group_builder.specify "should be able to handle IO errors" pending="TODO: Currently I was unable to figure out a way to test such errors" <|
# how to trigger this error???
err = Data.fetch "TODO"
group_builder.specify "should be able to handle occasional server crashes and retry" pending=pending_has_url <|
r1 = Data.fetch (base_url_with_slash+"crash?success_every=2")
r1.should_succeed
r1.should_be_a Response

group_builder.specify "should be able to handle server crash that closes stream abruptly" pending=pending_has_url <|
err = Data.fetch (base_url_with_slash+"crash?type=stream")
err.should_fail_with HTTP_Error
err.catch.message . should_equal "An IO error has occurred: java.io.IOException: closed"

group_builder.specify "should be able to handle occasional abrupt stream closures and retry" pending=pending_has_url <|
r1 = Data.fetch (base_url_with_slash+"crash?type=stream&success_every=2")
r1.should_succeed
r1.should_be_a Response

suite_builder.group "Http Auth" group_builder->
group_builder.specify "should support Basic user+password authentication" pending=pending_has_url <| Test.with_retries <|
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,83 @@

import com.sun.net.httpserver.HttpExchange;
import java.io.IOException;
import java.io.OutputStream;
import java.net.URI;
import org.apache.http.client.utils.URIBuilder;
import org.enso.shttp.SimpleHttpHandler;

public class CrashingTestHandler extends SimpleHttpHandler {
private int requests = 0;

@Override
protected void doHandle(HttpExchange exchange) throws IOException {
// This exception will be logged by SimpleHttpHandler, but that's OK - let's know that this
// Exceptions will be logged by SimpleHttpHandler, but that's OK - let's know that this
// crash is happening.
throw new RuntimeException("This handler crashes on purpose.");

URI uri = exchange.getRequestURI();
URIBuilder builder = new URIBuilder(uri);
final String successEveryParam = "success_every";
final String crashTypeParam = "type";

int successEvery = 0;
CrashType crashType = CrashType.RUNTIME;

for (var queryPair : builder.getQueryParams()) {
if (queryPair.getName().equals(successEveryParam)) {
successEvery = Integer.decode(queryPair.getValue());
} else if (queryPair.getName().equals(crashTypeParam)) {
crashType =
switch (queryPair.getValue()) {
case "stream" -> CrashType.STREAM;
default -> CrashType.RUNTIME;
};
}
}
if (successEvery == 0) {
// Reset counter
requests = 0;
}

boolean shouldSucceed = successEvery == (requests + 1);

switch (crashType) {
case RUNTIME:
if (shouldSucceed) {
// return OK, reset
requests = 0;
exchange.sendResponseHeaders(200, -1);
exchange.close();
break;
} else {
requests += 1;
throw new RuntimeException("This handler crashes on purpose.");
}

case STREAM:
byte[] responseData = "Crash and Burn".getBytes();
exchange.sendResponseHeaders(200, responseData.length);
try {
if (shouldSucceed) {
requests = 0;
// return OK, reset
try (OutputStream os = exchange.getResponseBody()) {
os.write(responseData, 0, responseData.length);
}
} else {
requests += 1;
try (OutputStream os = exchange.getResponseBody()) {
os.write(responseData, 0, responseData.length / 2);
}
}
} finally {
exchange.close();
}
break;
}
}

enum CrashType {
RUNTIME, // Simulate an internal server crash
STREAM // Simulate a crash by abruptly closing response's body stream
}
}

0 comments on commit 468b643

Please sign in to comment.