Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add retries to Data.read #11269

Merged
merged 8 commits into from
Oct 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@ import project.Data.Text.Text
import project.Data.Vector.Vector
import project.Enso_Cloud.Data_Link.Data_Link
import project.Enso_Cloud.Data_Link_Helpers
import project.Error.Error
import project.Errors.Deprecated.Deprecated
import project.Errors.Problem_Behavior.Problem_Behavior
import project.Metadata.Display
import project.Metadata.Widget
import project.Network.HTTP.HTTP
import project.Network.HTTP.HTTP_Error.HTTP_Error
import project.Network.HTTP.HTTP_Method.HTTP_Method
import project.Network.URI.URI
import project.Warning.Warning
Expand All @@ -20,6 +22,8 @@ from project.Metadata.Choice import Option
from project.Metadata.Widget import Single_Choice
from project.System.File_Format import Auto_Detect, format_types

polyglot java import java.lang.Thread

## PRIVATE
looks_like_uri path:Text -> Boolean =
(path.starts_with "http://" Case_Sensitivity.Insensitive) || (path.starts_with "https://" Case_Sensitivity.Insensitive)
Expand All @@ -28,8 +32,19 @@ looks_like_uri path:Text -> Boolean =
A common implementation for fetching a resource and decoding it,
following encountered data links.
fetch_following_data_links (uri:URI) (method:HTTP_Method = HTTP_Method.Get) (headers:Vector = []) format =
response = HTTP.fetch uri method headers
decode_http_response_following_data_links response format
fetch_and_decode =
response = HTTP.fetch uri method headers
decode_http_response_following_data_links response format

error_handler attempt =
hubertp marked this conversation as resolved.
Show resolved Hide resolved
caught_error ->
exponential_backoff = [100, 200, 400]
if method != HTTP_Method.Get || attempt >= exponential_backoff.length then Error.throw caught_error else
sleep_time = exponential_backoff . get attempt
Thread.sleep sleep_time
fetch_and_decode . catch HTTP_Error (error_handler (attempt + 1))

fetch_and_decode . catch HTTP_Error (error_handler 0)

## PRIVATE
Decodes a HTTP response, handling data link access.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
Internal threading utilities used for working with threads.

import project.Any.Any
import project.Data.Numbers.Integer

## PRIVATE
ADVANCED
Expand Down
18 changes: 14 additions & 4 deletions test/Base_Tests/src/Network/Http_Spec.enso
Original file line number Diff line number Diff line change
Expand Up @@ -591,7 +591,7 @@ add_specs suite_builder =

## Checking this error partially as a warning - I spent a lot of time debugging why I'm getting such an error.
Apparently it happens when the httpbin server was crashing without sending any response.
group_builder.specify "should be able to handle server crash resulting in no response" pending=pending_has_url <| Test.with_retries <|
group_builder.specify "should be able to handle server crash resulting in no response" pending=pending_has_url <|
err = Data.fetch (base_url_with_slash+"crash")
err.should_fail_with Request_Error
err.catch.error_type . should_equal "java.io.IOException"
Expand All @@ -600,10 +600,20 @@ add_specs suite_builder =
I think it may be worth adding, because it may be really quite confusing for end users who get that kind of error.
err.catch.message . should_equal "HTTP/1.1 header parser received no bytes"

group_builder.specify "should be able to handle IO errors" pending="TODO: Currently I was unable to figure out a way to test such errors" <|
# how to trigger this error???
err = Data.fetch "TODO"
group_builder.specify "should be able to handle occasional server crashes and retry" pending=pending_has_url <|
r1 = Data.fetch (base_url_with_slash+"crash?success_every=2")
r1.should_succeed
r1.should_be_a Response

group_builder.specify "should be able to handle server crash that closes stream abruptly" pending=pending_has_url <|
err = Data.fetch (base_url_with_slash+"crash?type=stream")
err.should_fail_with HTTP_Error
err.catch.message . should_equal "An IO error has occurred: java.io.IOException: closed"

group_builder.specify "should be able to handle occasional abrupt stream closures and retry" pending=pending_has_url <|
r1 = Data.fetch (base_url_with_slash+"crash?type=stream&success_every=2")
r1.should_succeed
r1.should_be_a Response

suite_builder.group "Http Auth" group_builder->
group_builder.specify "should support Basic user+password authentication" pending=pending_has_url <| Test.with_retries <|
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,83 @@

import com.sun.net.httpserver.HttpExchange;
import java.io.IOException;
import java.io.OutputStream;
import java.net.URI;
import org.apache.http.client.utils.URIBuilder;
import org.enso.shttp.SimpleHttpHandler;

public class CrashingTestHandler extends SimpleHttpHandler {
private int requests = 0;

@Override
protected void doHandle(HttpExchange exchange) throws IOException {
// This exception will be logged by SimpleHttpHandler, but that's OK - let's know that this
// Exceptions will be logged by SimpleHttpHandler, but that's OK - let's know that this
// crash is happening.
throw new RuntimeException("This handler crashes on purpose.");

URI uri = exchange.getRequestURI();
URIBuilder builder = new URIBuilder(uri);
final String successEveryParam = "success_every";
final String crashTypeParam = "type";

int successEvery = 0;
CrashType crashType = CrashType.RUNTIME;

for (var queryPair : builder.getQueryParams()) {
if (queryPair.getName().equals(successEveryParam)) {
successEvery = Integer.decode(queryPair.getValue());
} else if (queryPair.getName().equals(crashTypeParam)) {
crashType =
switch (queryPair.getValue()) {
case "stream" -> CrashType.STREAM;
default -> CrashType.RUNTIME;
};
}
}
if (successEvery == 0) {
// Reset counter
requests = 0;
}

boolean shouldSucceed = successEvery == (requests + 1);

switch (crashType) {
case RUNTIME:
if (shouldSucceed) {
// return OK, reset
requests = 0;
exchange.sendResponseHeaders(200, -1);
exchange.close();
break;
} else {
requests += 1;
throw new RuntimeException("This handler crashes on purpose.");
}

case STREAM:
byte[] responseData = "Crash and Burn".getBytes();
exchange.sendResponseHeaders(200, responseData.length);
try {
if (shouldSucceed) {
requests = 0;
// return OK, reset
try (OutputStream os = exchange.getResponseBody()) {
os.write(responseData, 0, responseData.length);
}
} else {
requests += 1;
try (OutputStream os = exchange.getResponseBody()) {
os.write(responseData, 0, responseData.length / 2);
}
}
} finally {
exchange.close();
}
break;
}
}

enum CrashType {
RUNTIME, // Simulate an internal server crash
STREAM // Simulate a crash by abruptly closing response's body stream
}
}
Loading