From 710063b7db282c14977f5b6eb23d9d5e54770da2 Mon Sep 17 00:00:00 2001 From: Julien Viet Date: Tue, 17 Dec 2024 17:44:43 +0100 Subject: [PATCH] Preliminary upgrade to Vert.x 5.0.0.CR3 --- chapter1/pom.xml | 5 ++--- chapter13/heat-api/pom.xml | 2 ++ .../src/main/java/chapter13/api/HeatApi.java | 14 ++++++++------ chapter13/heat-sensor-service/pom.xml | 2 ++ .../java/chapter13/sensor/HeatSensor.java | 5 +++-- chapter13/pom.xml | 5 ++--- chapter13/sensor-gateway/pom.xml | 2 ++ .../main/java/chapter13/gateway/Gateway.java | 5 +++-- chapter2/pom.xml | 5 ++--- .../main/java/chapter2/deploy/Deployer.java | 4 ++-- .../java/chapter2/execblocking/Offload.java | 14 +++++--------- .../java/chapter2/future/SomeVerticle.java | 3 ++- .../java/chapter2/worker/WorkerVerticle.java | 3 ++- chapter3/pom.xml | 5 ++--- .../src/main/java/chapter3/HttpServer.java | 8 +++----- .../java/chapter3/cluster/FirstInstance.java | 2 +- .../java/chapter3/cluster/SecondInstance.java | 2 +- .../test/java/chapter3/SensorDataTest.java | 4 ++-- chapter4/pom.xml | 5 ++--- .../main/java/chapter4/jukebox/Jukebox.java | 6 +++--- .../java/chapter4/jukebox/NetControl.java | 2 +- .../parsetools/SampleDatabaseWriter.java | 2 +- .../chapter4/streamapis/VertxStreams.java | 2 +- chapter5/pom.xml | 9 ++++----- .../chapter5/callbacks/CollectorService.java | 12 +++++++----- .../callbacks/CollectorServiceCBH.java | 12 +++++++----- .../chapter5/future/CollectorService.java | 8 ++++---- .../chapter5/reactivex/CollectorService.java | 8 ++++---- .../kotlin/coroutines/CollectorService.kt | 19 ++++++++++--------- chapter6/pom.xml | 5 ++--- .../main/java/chapter6/SensorDataService.java | 7 +++---- .../java/chapter6/SensorDataServiceImpl.java | 10 +++++----- .../java/chapter6/SensorDataServiceTest.java | 10 +++++----- 33 files changed, 105 insertions(+), 102 deletions(-) diff --git a/chapter1/pom.xml b/chapter1/pom.xml index d44b4df..06f983c 100644 --- a/chapter1/pom.xml +++ b/chapter1/pom.xml @@ -8,10 +8,9 @@ 1.0 - 1.8 - 1.8 + 11 UTF-8 - 4.0.3 + 5.0.0.CR3 1.6.0 chapter1.firstapp.VertxEcho diff --git a/chapter13/heat-api/pom.xml b/chapter13/heat-api/pom.xml index 0a27d1d..df6a89d 100644 --- a/chapter13/heat-api/pom.xml +++ b/chapter13/heat-api/pom.xml @@ -32,6 +32,7 @@ + diff --git a/chapter13/heat-api/src/main/java/chapter13/api/HeatApi.java b/chapter13/heat-api/src/main/java/chapter13/api/HeatApi.java index 3400879..f925680 100644 --- a/chapter13/heat-api/src/main/java/chapter13/api/HeatApi.java +++ b/chapter13/heat-api/src/main/java/chapter13/api/HeatApi.java @@ -3,6 +3,7 @@ import io.vertx.core.AbstractVerticle; import io.vertx.core.Promise; import io.vertx.core.Vertx; +import io.vertx.core.http.HttpResponseExpectation; import io.vertx.core.json.JsonArray; import io.vertx.core.json.JsonObject; import io.vertx.ext.web.Router; @@ -10,7 +11,6 @@ import io.vertx.ext.web.client.HttpResponse; import io.vertx.ext.web.client.WebClient; import io.vertx.ext.web.client.WebClientOptions; -import io.vertx.ext.web.client.predicate.ResponsePredicate; import io.vertx.ext.web.codec.BodyCodec; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -50,7 +50,7 @@ public void start(Promise startPromise) { vertx.createHttpServer() .requestHandler(router) - .listen(httpPort, ar -> { + .listen(httpPort).onComplete(ar -> { if (ar.succeeded()) { logger.info("HTTP server listening on port {}", httpPort); startPromise.complete(); @@ -63,9 +63,10 @@ public void start(Promise startPromise) { private void fetchData(RoutingContext routingContext, Consumer> action) { webClient.get("/data") .as(BodyCodec.jsonObject()) - .expect(ResponsePredicate.SC_OK) .timeout(5000) - .send(ar -> { + .send() + .expecting(HttpResponseExpectation.SC_OK) + .onComplete(ar -> { if (ar.succeeded()) { action.accept(ar.result()); } else { @@ -114,9 +115,10 @@ private void livenessCheck(RoutingContext ctx) { private void readinessCheck(RoutingContext ctx) { webClient.get("/health") - .expect(ResponsePredicate.SC_OK) .timeout(5000) - .send(ar -> { + .send() + .expecting(HttpResponseExpectation.SC_OK) + .onComplete(ar -> { if (ar.succeeded()) { logger.info("Readiness check complete"); ctx.response() diff --git a/chapter13/heat-sensor-service/pom.xml b/chapter13/heat-sensor-service/pom.xml index 0acc37d..98abd07 100644 --- a/chapter13/heat-sensor-service/pom.xml +++ b/chapter13/heat-sensor-service/pom.xml @@ -37,6 +37,7 @@ + diff --git a/chapter13/heat-sensor-service/src/main/java/chapter13/sensor/HeatSensor.java b/chapter13/heat-sensor-service/src/main/java/chapter13/sensor/HeatSensor.java index ecaf9c6..17c53e9 100644 --- a/chapter13/heat-sensor-service/src/main/java/chapter13/sensor/HeatSensor.java +++ b/chapter13/heat-sensor-service/src/main/java/chapter13/sensor/HeatSensor.java @@ -57,7 +57,8 @@ public void start(Promise startPromise) { vertx.createHttpServer() .requestHandler(router) - .listen(httpPort, ar -> { + .listen(httpPort) + .onComplete(ar -> { if (ar.succeeded()) { logger.info("HTTP server listening on port {}", httpPort); startPromise.complete(); @@ -97,7 +98,7 @@ public static void main(String[] args) throws UnknownHostException { .setEventBusOptions(new EventBusOptions() .setHost(ipv4) .setClusterPublicHost(ipv4)); - Vertx.clusteredVertx(options, ar -> { + Vertx.clusteredVertx(options).onComplete(ar -> { if (ar.succeeded()) { ar.result().deployVerticle(new HeatSensor()); } else { diff --git a/chapter13/pom.xml b/chapter13/pom.xml index 4db3d49..f3e06eb 100644 --- a/chapter13/pom.xml +++ b/chapter13/pom.xml @@ -16,10 +16,9 @@ - 1.8 - 1.8 + 11 UTF-8 - 4.0.3 + 5.0.0.CR3 1.2.3 2.4.0 2.0.1 diff --git a/chapter13/sensor-gateway/pom.xml b/chapter13/sensor-gateway/pom.xml index 2899c61..76efbf8 100644 --- a/chapter13/sensor-gateway/pom.xml +++ b/chapter13/sensor-gateway/pom.xml @@ -47,6 +47,7 @@ + diff --git a/chapter13/sensor-gateway/src/main/java/chapter13/gateway/Gateway.java b/chapter13/sensor-gateway/src/main/java/chapter13/gateway/Gateway.java index a239197..a4f6aff 100644 --- a/chapter13/sensor-gateway/src/main/java/chapter13/gateway/Gateway.java +++ b/chapter13/sensor-gateway/src/main/java/chapter13/gateway/Gateway.java @@ -52,7 +52,8 @@ public void start(Promise startPromise) throws Exception { vertx.createHttpServer() .requestHandler(router) - .listen(httpPort, ar -> { + .listen(httpPort) + .onComplete(ar -> { if (ar.succeeded()) { logger.info("HTTP server listening on port {}", httpPort); startPromise.complete(); @@ -94,7 +95,7 @@ public static void main(String[] args) throws UnknownHostException { .setPublishQuantiles(true) .setEnabled(true)) .setEnabled(true)); - Vertx.clusteredVertx(options, ar -> { + Vertx.clusteredVertx(options).onComplete(ar -> { if (ar.succeeded()) { ar.result().deployVerticle(new Gateway()); } else { diff --git a/chapter2/pom.xml b/chapter2/pom.xml index 4fed18e..fec9c69 100644 --- a/chapter2/pom.xml +++ b/chapter2/pom.xml @@ -8,10 +8,9 @@ 1.0 - 1.8 - 1.8 + 11 UTF-8 - 4.0.3 + 5.0.0.CR3 1.2.3 1.6.0 chapter2.hello.HelloVerticle diff --git a/chapter2/src/main/java/chapter2/deploy/Deployer.java b/chapter2/src/main/java/chapter2/deploy/Deployer.java index 7dee98f..01164bf 100644 --- a/chapter2/src/main/java/chapter2/deploy/Deployer.java +++ b/chapter2/src/main/java/chapter2/deploy/Deployer.java @@ -18,7 +18,7 @@ public void start() { } private void deploy() { - vertx.deployVerticle(new EmptyVerticle(), ar -> { // <2> + vertx.deployVerticle(new EmptyVerticle()).onComplete(ar -> { // <2> if (ar.succeeded()) { String id = ar.result(); logger.info("Successfully deployed {}", id); @@ -30,7 +30,7 @@ private void deploy() { } private void undeployLater(String id) { - vertx.undeploy(id, ar -> { // <4> + vertx.undeploy(id).onComplete(ar -> { // <4> if (ar.succeeded()) { logger.info("{} was undeployed", id); } else { diff --git a/chapter2/src/main/java/chapter2/execblocking/Offload.java b/chapter2/src/main/java/chapter2/execblocking/Offload.java index c34a9ba..d52f478 100644 --- a/chapter2/src/main/java/chapter2/execblocking/Offload.java +++ b/chapter2/src/main/java/chapter2/execblocking/Offload.java @@ -12,19 +12,15 @@ public class Offload extends AbstractVerticle { public void start() { vertx.setPeriodic(5000, id -> { logger.info("Tick"); - vertx.executeBlocking(this::blockingCode, this::resultHandler); + vertx.executeBlocking(this::blockingCode).onComplete(this::resultHandler); }); } - private void blockingCode(Promise promise) { + private String blockingCode() throws Exception { logger.info("Blocking code running"); - try { - Thread.sleep(4000); - logger.info("Done!"); - promise.complete("Ok!"); - } catch (InterruptedException e) { - promise.fail(e); - } + Thread.sleep(4000); + logger.info("Done!"); + return "Ok!"; } private void resultHandler(AsyncResult ar) { diff --git a/chapter2/src/main/java/chapter2/future/SomeVerticle.java b/chapter2/src/main/java/chapter2/future/SomeVerticle.java index b6135dc..a0683d7 100644 --- a/chapter2/src/main/java/chapter2/future/SomeVerticle.java +++ b/chapter2/src/main/java/chapter2/future/SomeVerticle.java @@ -11,7 +11,8 @@ public class SomeVerticle extends AbstractVerticle { public void start(Promise promise) { // <1> vertx.createHttpServer() .requestHandler(req -> req.response().end("Ok")) - .listen(8080, ar -> { + .listen(8080) + .onComplete(ar -> { if (ar.succeeded()) { // <2> promise.complete(); // <3> } else { diff --git a/chapter2/src/main/java/chapter2/worker/WorkerVerticle.java b/chapter2/src/main/java/chapter2/worker/WorkerVerticle.java index 6a6443a..86c6ec1 100644 --- a/chapter2/src/main/java/chapter2/worker/WorkerVerticle.java +++ b/chapter2/src/main/java/chapter2/worker/WorkerVerticle.java @@ -3,6 +3,7 @@ import io.vertx.core.AbstractVerticle; import io.vertx.core.DeploymentOptions; import io.vertx.core.Vertx; +import io.vertx.core.ThreadingModel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -27,7 +28,7 @@ public static void main(String[] args) { Vertx vertx = Vertx.vertx(); DeploymentOptions opts = new DeploymentOptions() .setInstances(2) - .setWorker(true); + .setThreadingModel(ThreadingModel.WORKER); vertx.deployVerticle("chapter2.worker.WorkerVerticle", opts); } } diff --git a/chapter3/pom.xml b/chapter3/pom.xml index a577f92..339a0aa 100644 --- a/chapter3/pom.xml +++ b/chapter3/pom.xml @@ -8,10 +8,9 @@ 1.0 - 1.8 - 1.8 + 11 UTF-8 - 4.0.3 + 5.0.0.CR3 1.2.3 1.6.0 5.7.1 diff --git a/chapter3/src/main/java/chapter3/HttpServer.java b/chapter3/src/main/java/chapter3/HttpServer.java index b857eed..61a158c 100644 --- a/chapter3/src/main/java/chapter3/HttpServer.java +++ b/chapter3/src/main/java/chapter3/HttpServer.java @@ -1,7 +1,6 @@ package chapter3; import io.vertx.core.AbstractVerticle; -import io.vertx.core.TimeoutStream; import io.vertx.core.eventbus.MessageConsumer; import io.vertx.core.http.HttpServerRequest; import io.vertx.core.http.HttpServerResponse; @@ -40,9 +39,8 @@ private void sse(HttpServerRequest request) { }); - TimeoutStream ticks = vertx.periodicStream(1000); - ticks.handler(id -> { - vertx.eventBus().request("sensor.average", "", reply -> { + long periodic = vertx.setPeriodic(1000, id -> { + vertx.eventBus().request("sensor.average", "").onComplete(reply -> { if (reply.succeeded()) { response.write("event: average\n"); response.write("data: " + reply.result().body().encode() + "\n\n"); @@ -52,7 +50,7 @@ private void sse(HttpServerRequest request) { response.endHandler(v -> { consumer.unregister(); - ticks.cancel(); + vertx.cancelTimer(periodic); }); } } diff --git a/chapter3/src/main/java/chapter3/cluster/FirstInstance.java b/chapter3/src/main/java/chapter3/cluster/FirstInstance.java index e32fb93..c8b09b5 100644 --- a/chapter3/src/main/java/chapter3/cluster/FirstInstance.java +++ b/chapter3/src/main/java/chapter3/cluster/FirstInstance.java @@ -11,7 +11,7 @@ public class FirstInstance { private static final Logger logger = LoggerFactory.getLogger(FirstInstance.class); public static void main(String[] args) { - Vertx.clusteredVertx(new VertxOptions(), ar -> { + Vertx.clusteredVertx(new VertxOptions()).onComplete(ar -> { if (ar.succeeded()) { logger.info("First instance has been started"); Vertx vertx = ar.result(); diff --git a/chapter3/src/main/java/chapter3/cluster/SecondInstance.java b/chapter3/src/main/java/chapter3/cluster/SecondInstance.java index 0b633f2..07892ae 100644 --- a/chapter3/src/main/java/chapter3/cluster/SecondInstance.java +++ b/chapter3/src/main/java/chapter3/cluster/SecondInstance.java @@ -13,7 +13,7 @@ public class SecondInstance { private static final Logger logger = LoggerFactory.getLogger(SecondInstance.class); public static void main(String[] args) { - Vertx.clusteredVertx(new VertxOptions(), ar -> { + Vertx.clusteredVertx(new VertxOptions()).onComplete(ar -> { if (ar.succeeded()) { logger.info("Second instance has been started"); Vertx vertx = ar.result(); diff --git a/chapter3/src/test/java/chapter3/SensorDataTest.java b/chapter3/src/test/java/chapter3/SensorDataTest.java index af0cf0b..164bf1e 100644 --- a/chapter3/src/test/java/chapter3/SensorDataTest.java +++ b/chapter3/src/test/java/chapter3/SensorDataTest.java @@ -16,12 +16,12 @@ class SensorDataTest { @Test void testAverage(Vertx vertx, VertxTestContext ctx) { EventBus bus = vertx.eventBus(); - vertx.deployVerticle(new SensorData(), ctx.succeeding(id -> { + vertx.deployVerticle(new SensorData()).onComplete(ctx.succeeding(id -> { bus.publish("sensor.updates", new JsonObject() .put("id", "a").put("temp", 20.0d)); bus.publish("sensor.updates", new JsonObject() .put("id", "b").put("temp", 22.0d)); - bus.request("sensor.average", "", ctx.succeeding(reply -> ctx.verify(() -> { + bus.request("sensor.average", "").onComplete(ctx.succeeding(reply -> ctx.verify(() -> { JsonObject json = (JsonObject) reply.body(); assertEquals(21.0d, (double) json.getDouble("average")); ctx.completeNow(); diff --git a/chapter4/pom.xml b/chapter4/pom.xml index 604b120..b5af0e6 100644 --- a/chapter4/pom.xml +++ b/chapter4/pom.xml @@ -8,10 +8,9 @@ 1.0 - 1.8 - 1.8 + 11 UTF-8 - 4.0.3 + 5.0.0.CR3 1.2.3 1.6.0 chapter4.jukebox.Main diff --git a/chapter4/src/main/java/chapter4/jukebox/Jukebox.java b/chapter4/src/main/java/chapter4/jukebox/Jukebox.java index 9b79b29..6c906e6 100644 --- a/chapter4/src/main/java/chapter4/jukebox/Jukebox.java +++ b/chapter4/src/main/java/chapter4/jukebox/Jukebox.java @@ -49,7 +49,7 @@ private enum State {PLAYING, PAUSED} // --------------------------------------------------------------------------------- // private void list(Message request) { - vertx.fileSystem().readDir("tracks", ".*mp3$", ar -> { + vertx.fileSystem().readDir("tracks", ".*mp3$").onComplete(ar -> { if (ar.succeeded()) { List files = ar.result() .stream() @@ -127,7 +127,7 @@ private void download(String path, HttpServerRequest request) { return; } OpenOptions opts = new OpenOptions().setRead(true); - vertx.fileSystem().open(file, opts, ar -> { + vertx.fileSystem().open(file, opts).onComplete(ar -> { if (ar.succeeded()) { downloadFile(ar.result(), request); } else { @@ -178,7 +178,7 @@ private void streamAudioChunk(long id) { if (currentFile == null) { openNextFile(); } - currentFile.read(Buffer.buffer(4096), 0, positionInFile, 4096, ar -> { + currentFile.read(Buffer.buffer(4096), 0, positionInFile, 4096).onComplete(ar -> { if (ar.succeeded()) { processReadBuffer(ar.result()); } else { diff --git a/chapter4/src/main/java/chapter4/jukebox/NetControl.java b/chapter4/src/main/java/chapter4/jukebox/NetControl.java index 9ca7937..34a8f1d 100644 --- a/chapter4/src/main/java/chapter4/jukebox/NetControl.java +++ b/chapter4/src/main/java/chapter4/jukebox/NetControl.java @@ -61,7 +61,7 @@ private void schedule(String command) { } private void listCommand(NetSocket socket) { - vertx.eventBus().request("jukebox.list", "", reply -> { + vertx.eventBus().request("jukebox.list", "").onComplete(reply -> { if (reply.succeeded()) { JsonObject data = (JsonObject) reply.result().body(); data.getJsonArray("files") diff --git a/chapter4/src/main/java/chapter4/parsetools/SampleDatabaseWriter.java b/chapter4/src/main/java/chapter4/parsetools/SampleDatabaseWriter.java index 0d270a5..33720a2 100644 --- a/chapter4/src/main/java/chapter4/parsetools/SampleDatabaseWriter.java +++ b/chapter4/src/main/java/chapter4/parsetools/SampleDatabaseWriter.java @@ -41,6 +41,6 @@ public static void main(String[] args) { .appendInt(value.length()) .appendString(value); - file.end(buffer, ar -> vertx.close()); + file.end(buffer).onComplete(ar -> vertx.close()); } } diff --git a/chapter4/src/main/java/chapter4/streamapis/VertxStreams.java b/chapter4/src/main/java/chapter4/streamapis/VertxStreams.java index 773bb7d..178dc3b 100644 --- a/chapter4/src/main/java/chapter4/streamapis/VertxStreams.java +++ b/chapter4/src/main/java/chapter4/streamapis/VertxStreams.java @@ -9,7 +9,7 @@ public class VertxStreams { public static void main(String[] args) { Vertx vertx = Vertx.vertx(); OpenOptions opts = new OpenOptions().setRead(true); - vertx.fileSystem().open("build.gradle.kts", opts, ar -> { + vertx.fileSystem().open("build.gradle.kts", opts).onComplete(ar -> { if (ar.succeeded()) { AsyncFile file = ar.result(); file.handler(System.out::println) diff --git a/chapter5/pom.xml b/chapter5/pom.xml index 100234a..18ac79f 100644 --- a/chapter5/pom.xml +++ b/chapter5/pom.xml @@ -8,12 +8,11 @@ 1.0 - 1.8 - 1.8 + 11 UTF-8 - 4.0.3 + 5.0.0.CR3 1.2.3 - 1.3.61 + 2.0.0 3.8.1 1.6.0 chapter5.callbacks.Main @@ -82,7 +81,7 @@ compile - 1.8 + 11 enable ${project.basedir}/src/main/kotlin diff --git a/chapter5/src/main/java/chapter5/callbacks/CollectorService.java b/chapter5/src/main/java/chapter5/callbacks/CollectorService.java index 01895bb..5da7c17 100644 --- a/chapter5/src/main/java/chapter5/callbacks/CollectorService.java +++ b/chapter5/src/main/java/chapter5/callbacks/CollectorService.java @@ -1,11 +1,11 @@ package chapter5.callbacks; import io.vertx.core.AbstractVerticle; +import io.vertx.core.http.HttpResponseExpectation; import io.vertx.core.http.HttpServerRequest; import io.vertx.core.json.JsonArray; import io.vertx.core.json.JsonObject; import io.vertx.ext.web.client.WebClient; -import io.vertx.ext.web.client.predicate.ResponsePredicate; import io.vertx.ext.web.codec.BodyCodec; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -33,9 +33,10 @@ private void handleRequest(HttpServerRequest request) { for (int i = 0; i < 3; i++) { webClient .get(3000 + i, "localhost", "/") - .expect(ResponsePredicate.SC_SUCCESS) .as(BodyCodec.jsonObject()) - .send(ar -> { + .send() + .expecting(HttpResponseExpectation.SC_OK) + .onComplete(ar -> { if (ar.succeeded()) { responses.add(ar.result().body()); } else { @@ -53,8 +54,9 @@ private void handleRequest(HttpServerRequest request) { private void sendToSnapshot(HttpServerRequest request, JsonObject data) { webClient .post(4000, "localhost", "/") - .expect(ResponsePredicate.SC_SUCCESS) - .sendJsonObject(data, ar -> { + .sendJsonObject(data) + .expecting(HttpResponseExpectation.SC_OK) + .onComplete(ar -> { if (ar.succeeded()) { sendResponse(request, data); } else { diff --git a/chapter5/src/main/java/chapter5/callbacks/CollectorServiceCBH.java b/chapter5/src/main/java/chapter5/callbacks/CollectorServiceCBH.java index 59c5119..7c62896 100644 --- a/chapter5/src/main/java/chapter5/callbacks/CollectorServiceCBH.java +++ b/chapter5/src/main/java/chapter5/callbacks/CollectorServiceCBH.java @@ -1,11 +1,11 @@ package chapter5.callbacks; import io.vertx.core.AbstractVerticle; +import io.vertx.core.http.HttpResponseExpectation; import io.vertx.core.http.HttpServerRequest; import io.vertx.core.json.JsonArray; import io.vertx.core.json.JsonObject; import io.vertx.ext.web.client.WebClient; -import io.vertx.ext.web.client.predicate.ResponsePredicate; import io.vertx.ext.web.codec.BodyCodec; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -33,9 +33,10 @@ private void handleRequest(HttpServerRequest request) { for (int i = 0; i < 3; i++) { webClient .get(3000 + i, "localhost", "/") - .expect(ResponsePredicate.SC_SUCCESS) .as(BodyCodec.jsonObject()) - .send(ar -> { + .send() + .expecting(HttpResponseExpectation.SC_OK) + .onComplete(ar -> { if (ar.succeeded()) { responses.add(ar.result().body()); } else { @@ -46,8 +47,9 @@ private void handleRequest(HttpServerRequest request) { .put("data", new JsonArray(responses)); webClient .post(4000, "localhost", "/") - .expect(ResponsePredicate.SC_SUCCESS) - .sendJsonObject(data, ar1 -> { + .sendJsonObject(data) + .expecting(HttpResponseExpectation.SC_OK) + .onComplete(ar1 -> { if (ar1.succeeded()) { request.response() .putHeader("Content-Type", "application/json") diff --git a/chapter5/src/main/java/chapter5/future/CollectorService.java b/chapter5/src/main/java/chapter5/future/CollectorService.java index 01aacfa..b2d3031 100644 --- a/chapter5/src/main/java/chapter5/future/CollectorService.java +++ b/chapter5/src/main/java/chapter5/future/CollectorService.java @@ -4,12 +4,12 @@ import io.vertx.core.CompositeFuture; import io.vertx.core.Future; import io.vertx.core.Promise; +import io.vertx.core.http.HttpResponseExpectation; import io.vertx.core.http.HttpServerRequest; import io.vertx.core.json.JsonArray; import io.vertx.core.json.JsonObject; import io.vertx.ext.web.client.HttpResponse; import io.vertx.ext.web.client.WebClient; -import io.vertx.ext.web.client.predicate.ResponsePredicate; import io.vertx.ext.web.codec.BodyCodec; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -35,7 +35,7 @@ public void start(Promise promise) { } private void handleRequest(HttpServerRequest request) { - CompositeFuture.all( + Future.all( fetchTemperature(3000), fetchTemperature(3001), fetchTemperature(3002)) @@ -58,17 +58,17 @@ private Future sendToSnapshot(CompositeFuture temps) { .add(tempData.get(2))); return webClient .post(4000, "localhost", "/") - .expect(ResponsePredicate.SC_SUCCESS) .sendJson(data) + .expecting(HttpResponseExpectation.SC_OK) .map(response -> data); } private Future fetchTemperature(int port) { return webClient .get(port, "localhost", "/") - .expect(ResponsePredicate.SC_SUCCESS) .as(BodyCodec.jsonObject()) .send() + .expecting(HttpResponseExpectation.SC_OK) .map(HttpResponse::body); } } diff --git a/chapter5/src/main/java/chapter5/reactivex/CollectorService.java b/chapter5/src/main/java/chapter5/reactivex/CollectorService.java index 449f692..a85ab5f 100644 --- a/chapter5/src/main/java/chapter5/reactivex/CollectorService.java +++ b/chapter5/src/main/java/chapter5/reactivex/CollectorService.java @@ -5,10 +5,10 @@ import io.vertx.core.json.JsonArray; import io.vertx.core.json.JsonObject; import io.vertx.reactivex.core.AbstractVerticle; +import io.vertx.reactivex.core.http.HttpResponseExpectation; import io.vertx.reactivex.core.http.HttpServerRequest; import io.vertx.reactivex.ext.web.client.HttpResponse; import io.vertx.reactivex.ext.web.client.WebClient; -import io.vertx.reactivex.ext.web.client.predicate.ResponsePredicate; import io.vertx.reactivex.ext.web.codec.BodyCodec; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -57,16 +57,16 @@ private Single collectTemperatures() { private Single> fetchTemperature(int port) { return webClient .get(port, "localhost", "/") - .expect(ResponsePredicate.SC_SUCCESS) .as(BodyCodec.jsonObject()) - .rxSend(); + .rxSend() + .compose(HttpResponseExpectation.status(200)); } private Single sendToSnapshot(Single data) { return data.flatMap(json -> webClient .post(4000, "localhost", "") - .expect(ResponsePredicate.SC_SUCCESS) .rxSendJsonObject(json) + .compose(HttpResponseExpectation.status(200)) .flatMap(resp -> Single.just(json))); } } diff --git a/chapter5/src/main/kotlin/chapter5/kotlin/coroutines/CollectorService.kt b/chapter5/src/main/kotlin/chapter5/kotlin/coroutines/CollectorService.kt index 3de2495..663392f 100644 --- a/chapter5/src/main/kotlin/chapter5/kotlin/coroutines/CollectorService.kt +++ b/chapter5/src/main/kotlin/chapter5/kotlin/coroutines/CollectorService.kt @@ -1,18 +1,16 @@ package chapter5.kotlin.coroutines +import io.vertx.core.http.HttpResponseExpectation import io.vertx.core.http.HttpServerRequest import io.vertx.core.json.JsonObject import io.vertx.ext.web.client.WebClient -import io.vertx.ext.web.client.predicate.ResponsePredicate import io.vertx.ext.web.codec.BodyCodec -import io.vertx.kotlin.core.http.listenAwait import io.vertx.kotlin.core.json.Json import io.vertx.kotlin.core.json.array import io.vertx.kotlin.core.json.json import io.vertx.kotlin.core.json.obj import io.vertx.kotlin.coroutines.CoroutineVerticle -import io.vertx.kotlin.ext.web.client.sendAwait -import io.vertx.kotlin.ext.web.client.sendJsonAwait +import io.vertx.kotlin.coroutines.coAwait import kotlinx.coroutines.async import kotlinx.coroutines.launch import org.slf4j.LoggerFactory @@ -27,7 +25,8 @@ class CollectorService : CoroutineVerticle() { webClient = WebClient.create(vertx) vertx.createHttpServer() .requestHandler(this::handleRequest) - .listenAwait(8080) + .listen(8080) + .coAwait() } private fun handleRequest(request: HttpServerRequest) { @@ -55,16 +54,18 @@ class CollectorService : CoroutineVerticle() { private suspend fun fetchTemperature(port: Int): JsonObject { return webClient .get(port, "localhost", "/") - .expect(ResponsePredicate.SC_SUCCESS) .`as`(BodyCodec.jsonObject()) - .sendAwait() + .send() + .expecting(HttpResponseExpectation.SC_SUCCESS) + .coAwait() .body() } private suspend fun sendToSnapshot(json: JsonObject) { webClient .post(4000, "localhost", "/") - .expect(ResponsePredicate.SC_SUCCESS) - .sendJsonAwait(json) + .sendJson(json) + .expecting(HttpResponseExpectation.SC_SUCCESS) + .coAwait() } } diff --git a/chapter6/pom.xml b/chapter6/pom.xml index fbd22c5..290d870 100644 --- a/chapter6/pom.xml +++ b/chapter6/pom.xml @@ -8,10 +8,9 @@ 1.0 - 1.8 - 1.8 + 11 UTF-8 - 4.0.3 + 5.0.0.CR3 1.2.3 1.6.0 5.7.1 diff --git a/chapter6/src/main/java/chapter6/SensorDataService.java b/chapter6/src/main/java/chapter6/SensorDataService.java index 394f18b..5aabf86 100644 --- a/chapter6/src/main/java/chapter6/SensorDataService.java +++ b/chapter6/src/main/java/chapter6/SensorDataService.java @@ -2,8 +2,7 @@ import io.vertx.codegen.annotations.ProxyGen; import io.vertx.codegen.annotations.VertxGen; -import io.vertx.core.AsyncResult; -import io.vertx.core.Handler; +import io.vertx.core.Future; import io.vertx.core.Vertx; import io.vertx.core.json.JsonObject; @@ -19,7 +18,7 @@ static SensorDataService createProxy(Vertx vertx, String address) { return new SensorDataServiceVertxEBProxy(vertx, address); } - void valueFor(String sensorId, Handler> handler); + Future valueFor(String sensorId); - void average(Handler> handler); + Future average(); } diff --git a/chapter6/src/main/java/chapter6/SensorDataServiceImpl.java b/chapter6/src/main/java/chapter6/SensorDataServiceImpl.java index fdc7dae..d9e848d 100644 --- a/chapter6/src/main/java/chapter6/SensorDataServiceImpl.java +++ b/chapter6/src/main/java/chapter6/SensorDataServiceImpl.java @@ -21,22 +21,22 @@ class SensorDataServiceImpl implements SensorDataService { } @Override - public void valueFor(String sensorId, Handler> handler) { + public Future valueFor(String sensorId) { if (lastValues.containsKey(sensorId)) { JsonObject data = new JsonObject() .put("sensorId", sensorId) .put("value", lastValues.get(sensorId)); - handler.handle(Future.succeededFuture(data)); + return Future.succeededFuture(data); } else { - handler.handle(Future.failedFuture("No value has been observed for " + sensorId)); + return Future.failedFuture("No value has been observed for " + sensorId); } } @Override - public void average(Handler> handler) { + public Future average() { double avg = lastValues.values().stream() .collect(Collectors.averagingDouble(Double::doubleValue)); JsonObject data = new JsonObject().put("average", avg); - handler.handle(Future.succeededFuture(data)); + return Future.succeededFuture(data); } } diff --git a/chapter6/src/test/java/chapter6/SensorDataServiceTest.java b/chapter6/src/test/java/chapter6/SensorDataServiceTest.java index 8648219..1a2d714 100644 --- a/chapter6/src/test/java/chapter6/SensorDataServiceTest.java +++ b/chapter6/src/test/java/chapter6/SensorDataServiceTest.java @@ -19,7 +19,7 @@ class SensorDataServiceTest { @BeforeEach void prepare(Vertx vertx, VertxTestContext ctx) { - vertx.deployVerticle(new DataVerticle(), ctx.succeeding(id -> { + vertx.deployVerticle(new DataVerticle()).onComplete(ctx.succeeding(id -> { dataService = SensorDataService.createProxy(vertx, "sensor.data-service"); ctx.completeNow(); })); @@ -30,12 +30,12 @@ void noSensor(VertxTestContext ctx) { Checkpoint failsToGet = ctx.checkpoint(); Checkpoint zeroAvg = ctx.checkpoint(); - dataService.valueFor("abc", ctx.failing(err -> ctx.verify(() -> { + dataService.valueFor("abc").onComplete(ctx.failing(err -> ctx.verify(() -> { assertThat(err.getMessage()).startsWith("No value has been observed"); failsToGet.flag(); }))); - dataService.average(ctx.succeeding(data -> ctx.verify(() -> { + dataService.average().onComplete(ctx.succeeding(data -> ctx.verify(() -> { double avg = data.getDouble("average"); assertThat(avg).isCloseTo(0.0d, withPercentage(1.0d)); zeroAvg.flag(); @@ -55,13 +55,13 @@ void withSensors(Vertx vertx, VertxTestContext ctx) { .publish("sensor.updates", m1) .publish("sensor.updates", m2); - dataService.valueFor("abc", ctx.succeeding(data -> ctx.verify(() -> { + dataService.valueFor("abc").onComplete(ctx.succeeding(data -> ctx.verify(() -> { assertThat(data.getString("sensorId")).isEqualTo("abc"); assertThat(data.getDouble("value")).isEqualTo(21.0d); getValue.flag(); }))); - dataService.average(ctx.succeeding(data -> ctx.verify(() -> { + dataService.average().onComplete(ctx.succeeding(data -> ctx.verify(() -> { assertThat(data.getDouble("average")).isCloseTo(22.0, withPercentage(1.0d)); goodAvg.flag(); })));