Skip to content

Commit

Permalink
feat(*): Migrate from RxJava2 to Reactor
Browse files Browse the repository at this point in the history
  • Loading branch information
loicmathieu committed Jan 30, 2024
1 parent 3d6a065 commit 616a72b
Show file tree
Hide file tree
Showing 5 changed files with 19 additions and 180 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ dependencies {
compileOnly "io.micronaut:micronaut-inject"
compileOnly "io.micronaut.validation:micronaut-validation"
compileOnly "io.micronaut:micronaut-http-client"
compileOnly "io.micronaut.rxjava2:micronaut-rxjava2-http-client"
compileOnly "io.micronaut.reactor:micronaut-reactor-http-client"

// kestra
compileOnly group: "io.kestra", name: "core", version: kestraVersion
Expand Down
12 changes: 5 additions & 7 deletions src/main/java/io/kestra/plugin/fs/http/AbstractHttp.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@
import io.micronaut.http.client.DefaultHttpClientConfiguration;
import io.micronaut.http.client.HttpClient;
import io.micronaut.http.client.multipart.MultipartBody;
import io.micronaut.http.client.netty.NettyHttpClientFactory;
import io.micronaut.http.ssl.ClientSslConfiguration;
import io.micronaut.rxjava2.http.client.RxStreamingHttpClient;
import io.micronaut.reactor.http.client.ReactorHttpClient;
import io.micronaut.reactor.http.client.ReactorStreamingHttpClient;
import lombok.*;
import lombok.experimental.SuperBuilder;
import org.apache.commons.io.IOUtils;
Expand All @@ -37,8 +37,6 @@
@Getter
@NoArgsConstructor
abstract public class AbstractHttp extends Task implements HttpInterface {
private static final NettyHttpClientFactory FACTORY = new NettyHttpClientFactory();

@NotNull
protected String uri;

Expand Down Expand Up @@ -134,13 +132,13 @@ protected DefaultHttpClientConfiguration configuration(RunContext runContext, Ht
protected HttpClient client(RunContext runContext, HttpMethod httpMethod) throws IllegalVariableEvaluationException, MalformedURLException, URISyntaxException {
URI from = new URI(runContext.render(this.uri));

return FACTORY.createClient(from.toURL(), this.configuration(runContext, httpMethod));
return ReactorHttpClient.create(from.toURL(), this.configuration(runContext, httpMethod));
}

protected RxStreamingHttpClient streamingClient(RunContext runContext, HttpMethod httpMethod) throws IllegalVariableEvaluationException, MalformedURLException, URISyntaxException {
protected ReactorStreamingHttpClient streamingClient(RunContext runContext, HttpMethod httpMethod) throws IllegalVariableEvaluationException, MalformedURLException, URISyntaxException {
URI from = new URI(runContext.render(this.uri));

return new ExtendedBridgedRxHttpClient(FACTORY.createStreamingClient(from.toURL(), this.configuration(runContext, httpMethod)));
return ReactorStreamingHttpClient.create(from.toURL(), this.configuration(runContext, httpMethod));
}

@SuppressWarnings({"unchecked", "rawtypes"})
Expand Down
12 changes: 7 additions & 5 deletions src/main/java/io/kestra/plugin/fs/http/Download.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import io.micronaut.http.HttpResponse;
import io.micronaut.http.HttpStatus;
import io.micronaut.http.client.exceptions.HttpClientResponseException;
import io.micronaut.rxjava2.http.client.RxStreamingHttpClient;
import io.micronaut.reactor.http.client.ReactorStreamingHttpClient;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.*;
import lombok.experimental.SuperBuilder;
Expand All @@ -25,6 +25,8 @@
import java.util.List;
import java.util.Map;

import static io.kestra.core.utils.Rethrow.throwFunction;

@SuperBuilder
@ToString
@EqualsAndHashCode
Expand Down Expand Up @@ -65,15 +67,15 @@ public Download.Output run(RunContext runContext) throws Exception {

// do it
try (
RxStreamingHttpClient client = this.streamingClient(runContext, this.method);
ReactorStreamingHttpClient client = this.streamingClient(runContext, this.method);
BufferedOutputStream output = new BufferedOutputStream(new FileOutputStream(tempFile));
) {
@SuppressWarnings("unchecked")
HttpRequest<String> request = this.request(runContext);

Long size = client
.exchangeStream(request)
.map(response -> {
.map(throwFunction(response -> {
if (builder.code == null) {
builder
.code(response.code())
Expand All @@ -88,9 +90,9 @@ public Download.Output run(RunContext runContext) throws Exception {
} else {
return 0L;
}
})
}))
.reduce(Long::sum)
.blockingGet();
.block();

if (size == null) {
size = 0L;
Expand Down

This file was deleted.

11 changes: 6 additions & 5 deletions src/test/java/io/kestra/plugin/fs/http/RequestTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@
import io.micronaut.http.multipart.StreamingFileUpload;
import io.micronaut.runtime.server.EmbeddedServer;
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
import io.reactivex.Single;
import jakarta.inject.Inject;
import org.apache.commons.io.IOUtils;
import org.junit.jupiter.api.Test;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Mono;

import java.io.File;
import java.io.FileInputStream;
Expand All @@ -28,6 +28,7 @@
import java.nio.charset.StandardCharsets;
import java.util.Objects;

import static io.kestra.core.utils.Rethrow.throwFunction;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.is;
Expand Down Expand Up @@ -242,17 +243,17 @@ HttpResponse<String> simple(HttpRequest<?> request, String hello) {
}

@Post(uri = "/post/multipart", consumes = MediaType.MULTIPART_FORM_DATA)
Single<String> multipart(HttpRequest<?> request, String hello, StreamingFileUpload file) throws IOException {
Mono<String> multipart(HttpRequest<?> request, String hello, StreamingFileUpload file) throws IOException {
File tempFile = File.createTempFile(file.getFilename(), "temp");

Publisher<Boolean> uploadPublisher = file.transferTo(tempFile);

return Single.fromPublisher(uploadPublisher)
.map(success -> {
return Mono.from(uploadPublisher)
.map(throwFunction(success -> {
try (FileInputStream fileInputStream = new FileInputStream(tempFile)) {
return hello + " > " + IOUtils.toString(fileInputStream, StandardCharsets.UTF_8);
}
});
}));
}
}
}

0 comments on commit 616a72b

Please sign in to comment.