From 01748982b240a75892a51bebbc32143697c77fce Mon Sep 17 00:00:00 2001 From: Mark Paluch Date: Thu, 28 Nov 2024 16:18:54 +0100 Subject: [PATCH] Guard Executor/Scheduler unwrapping against implementations returning themselves. If unwrap isn't able to unwrap then it would return itself causing a cast exception. [resolves #220] Signed-off-by: Mark Paluch --- .../java/io/r2dbc/pool/ConnectionPool.java | 35 +- .../r2dbc/pool/ConnectionPoolUnitTests.java | 328 ++++++++++-------- 2 files changed, 212 insertions(+), 151 deletions(-) diff --git a/src/main/java/io/r2dbc/pool/ConnectionPool.java b/src/main/java/io/r2dbc/pool/ConnectionPool.java index 9930583..90d32d0 100644 --- a/src/main/java/io/r2dbc/pool/ConnectionPool.java +++ b/src/main/java/io/r2dbc/pool/ConnectionPool.java @@ -108,22 +108,10 @@ public ConnectionPool(ConnectionPoolConfiguration configuration) { Connection connection = ref.poolable(); Scheduler scheduler = null; - Executor executor = null; Mono conn; if (connection instanceof Wrapped) { - - Wrapped wrapped = (Wrapped) connection; - - scheduler = wrapped.unwrap(Scheduler.class); - - if (scheduler == null) { - executor = wrapped.unwrap(Executor.class); - } - - if (executor != null) { - scheduler = Schedulers.fromExecutor(executor); - } + scheduler = findScheduler((Wrapped) connection); } if (scheduler != null) { @@ -167,6 +155,27 @@ public ConnectionPool(ConnectionPoolConfiguration configuration) { this.create = configuration.getAcquireRetry() > 0 ? create.retry(configuration.getAcquireRetry()) : create; } + @Nullable + @SuppressWarnings("DataFlowIssue") + private static Scheduler findScheduler(Wrapped connection) { + + Object unwrapped = connection.unwrap(Scheduler.class); + + if (!(unwrapped instanceof Scheduler)) { + unwrapped = connection.unwrap(Executor.class); + } + + if (unwrapped instanceof Executor) { + unwrapped = Schedulers.fromExecutor((Executor) unwrapped); + } + + if (unwrapped instanceof Scheduler) { + return (Scheduler) unwrapped; + } + + return null; + } + private Mono prepareConnection(ConnectionPoolConfiguration configuration, PooledRef ref, Connection connection, Function> allocateValidation) { Mono prepare = null; diff --git a/src/test/java/io/r2dbc/pool/ConnectionPoolUnitTests.java b/src/test/java/io/r2dbc/pool/ConnectionPoolUnitTests.java index d7b46cd..004ce27 100644 --- a/src/test/java/io/r2dbc/pool/ConnectionPoolUnitTests.java +++ b/src/test/java/io/r2dbc/pool/ConnectionPoolUnitTests.java @@ -118,7 +118,7 @@ void shouldCreateConnection() { @Test @SuppressWarnings("unchecked") - void shouldCreateConnectionAndeOffloadPreparation() { + void shouldCreateConnectionAndOffloadPreparation() { ConnectionFactory connectionFactoryMock = mock(ConnectionFactory.class); Connection connectionMock = mock(Connection.class, withSettings().extraInterfaces(Wrapped.class)); @@ -141,6 +141,58 @@ void shouldCreateConnectionAndeOffloadPreparation() { verify(connectionFactoryMock).create(); } + @Test + @SuppressWarnings("unchecked") + void shouldCreateConnectionAndOffloadPreparationToExecutor() { + + ConnectionFactory connectionFactoryMock = mock(ConnectionFactory.class); + Connection connectionMock = mock(Connection.class, withSettings().extraInterfaces(Wrapped.class)); + Wrapped wrapped = (Wrapped) connectionMock; + + when(connectionFactoryMock.create()).thenReturn((Publisher) Mono.just(connectionMock)); + when(connectionMock.validate(any())).thenReturn(Mono.empty()); + when(wrapped.unwrap(Executor.class)).thenReturn(ForkJoinPool.commonPool()); + + ConnectionPoolConfiguration configuration = ConnectionPoolConfiguration.builder(connectionFactoryMock).build(); + ConnectionPool pool = new ConnectionPool(configuration); + + pool.create().as(StepVerifier::create).consumeNextWith(actual -> { + + assertThat(actual).isInstanceOf(PooledConnection.class); + assertThat(((Wrapped) actual).unwrap()).isSameAs(connectionMock); + assertThat(Thread.currentThread().getName()).startsWith("ForkJoinPool"); + }).verifyComplete(); + + verify(connectionFactoryMock).create(); + } + + @Test + @SuppressWarnings("unchecked") + void wrappedReturnsItself() { + + ConnectionFactory connectionFactoryMock = mock(ConnectionFactory.class); + Connection connectionMock = mock(Connection.class, withSettings().extraInterfaces(Wrapped.class)); + Wrapped wrapped = (Wrapped) connectionMock; + + when(connectionFactoryMock.create()).thenReturn((Publisher) Mono.just(connectionMock)); + when(connectionMock.validate(any())).thenReturn(Mono.empty()); + when((Object) wrapped.unwrap(Scheduler.class)).thenReturn(connectionMock); + + ConnectionPoolConfiguration configuration = ConnectionPoolConfiguration.builder(connectionFactoryMock).build(); + ConnectionPool pool = new ConnectionPool(configuration); + + Thread mainThread = Thread.currentThread(); + + pool.create().as(StepVerifier::create).consumeNextWith(actual -> { + + assertThat(actual).isInstanceOf(PooledConnection.class); + assertThat(((Wrapped) actual).unwrap()).isSameAs(connectionMock); + assertThat(mainThread.getName()).isEqualTo(mainThread.getName()); + }).verifyComplete(); + + verify(connectionFactoryMock).create(); + } + @Test @SuppressWarnings("unchecked") void shouldConsiderInitialSize() { @@ -264,20 +316,20 @@ void shouldTimeoutCreateConnection() { ConnectionFactory connectionFactoryMock = mock(ConnectionFactory.class); Connection connectionMock = mock(Connection.class); when(connectionFactoryMock.create()).thenReturn((Publisher) Mono.defer(() -> - Mono.delay(Duration.ofSeconds(5)).thenReturn(connectionMock)) + Mono.delay(Duration.ofSeconds(5)).thenReturn(connectionMock)) ); ConnectionPoolConfiguration configuration = ConnectionPoolConfiguration.builder(connectionFactoryMock) - .maxCreateConnectionTime(Duration.ofSeconds(1)) - .build(); + .maxCreateConnectionTime(Duration.ofSeconds(1)) + .build(); ConnectionPool pool = new ConnectionPool(configuration); StepVerifier.withVirtualTime(pool::create) - .expectSubscription() - .thenAwait(Duration.ofSeconds(2)) - .expectError(TimeoutException.class) - .verify(); + .expectSubscription() + .thenAwait(Duration.ofSeconds(2)) + .expectError(TimeoutException.class) + .verify(); verify(connectionFactoryMock).create(); } @@ -289,20 +341,20 @@ void shouldTimeoutCreateConnectionUsingZeroTimeout() { ConnectionFactory connectionFactoryMock = mock(ConnectionFactory.class); Connection connectionMock = mock(Connection.class); when(connectionFactoryMock.create()).thenReturn((Publisher) Mono.defer(() -> - Mono.delay(Duration.ofSeconds(15)).thenReturn(connectionMock)) + Mono.delay(Duration.ofSeconds(15)).thenReturn(connectionMock)) ); ConnectionPoolConfiguration configuration = ConnectionPoolConfiguration.builder(connectionFactoryMock) - .maxCreateConnectionTime(Duration.ZERO) - .build(); + .maxCreateConnectionTime(Duration.ZERO) + .build(); ConnectionPool pool = new ConnectionPool(configuration); StepVerifier.withVirtualTime(pool::create) - .expectSubscription() - .thenAwait(Duration.ofSeconds(11)) - .expectError(TimeoutException.class) - .verify(); + .expectSubscription() + .thenAwait(Duration.ofSeconds(11)) + .expectError(TimeoutException.class) + .verify(); verify(connectionFactoryMock).create(); } @@ -316,20 +368,20 @@ void shouldTimeoutAcquireConnection() { // acquire time should also consider the time to obtain an actual connection when(connectionFactoryMock.create()).thenReturn((Publisher) Mono.defer(() -> - Mono.delay(Duration.ofSeconds(15)).thenReturn(connectionMock)) + Mono.delay(Duration.ofSeconds(15)).thenReturn(connectionMock)) ); when(connectionMock.validate(any())).thenReturn(Mono.empty()); ConnectionPoolConfiguration configuration = ConnectionPoolConfiguration.builder(connectionFactoryMock) - .acquireRetry(0) - .maxAcquireTime(Duration.ofSeconds(1)) - .build(); + .acquireRetry(0) + .maxAcquireTime(Duration.ofSeconds(1)) + .build(); StepVerifier.withVirtualTime(() -> new ConnectionPool(configuration).create()) - .expectSubscription() - .thenAwait(Duration.ofSeconds(11)) - .expectError(R2dbcTimeoutException.class) - .verify(); + .expectSubscription() + .thenAwait(Duration.ofSeconds(11)) + .expectError(R2dbcTimeoutException.class) + .verify(); verify(connectionFactoryMock).create(); } @@ -342,28 +394,28 @@ void shouldNotTimeoutAcquireConnectionWhenPooled() { Connection connectionMock = mock(Connection.class); when(connectionFactoryMock.create()).thenReturn((Publisher) Mono.defer(() -> - Mono.delay(Duration.ofMillis(100)).thenReturn(connectionMock)) + Mono.delay(Duration.ofMillis(100)).thenReturn(connectionMock)) ); when(connectionMock.validate(any())).thenReturn(Mono.empty()); ConnectionPoolConfiguration configuration = ConnectionPoolConfiguration.builder(connectionFactoryMock) - .initialSize(1) - .maxAcquireTime(Duration.ofMillis(10)) - .build(); + .initialSize(1) + .maxAcquireTime(Duration.ofMillis(10)) + .build(); ConnectionPool pool = new ConnectionPool(configuration); pool.warmup() - .as(StepVerifier::create) - .expectNext(1) - .verifyComplete(); + .as(StepVerifier::create) + .expectNext(1) + .verifyComplete(); // When initial size of the pool is non-zero, even though creating connection is slow, // once connection is in pool, acquiring a connection from pool is fast. // Therefore, it should not timeout for acquiring a connection from pool. pool.create().as(StepVerifier::create) - .expectNextCount(1) - .verifyComplete(); + .expectNextCount(1) + .verifyComplete(); verify(connectionFactoryMock).create(); } @@ -376,24 +428,24 @@ void shouldTimeoutValidation() { Connection connectionMock = mock(Connection.class); when(connectionFactoryMock.create()).thenReturn((Publisher) Mono.defer(() -> - Mono.delay(Duration.ofSeconds(1)).thenReturn(connectionMock)) + Mono.delay(Duration.ofSeconds(1)).thenReturn(connectionMock)) ); when(connectionMock.validate(any())).thenReturn(Mono.defer(() -> - Mono.delay(Duration.ofSeconds(10)).thenReturn(false)) + Mono.delay(Duration.ofSeconds(10)).thenReturn(false)) ); when(connectionMock.close()).thenReturn(Mono.empty()); ConnectionPoolConfiguration configuration = ConnectionPoolConfiguration.builder(connectionFactoryMock) - .acquireRetry(0) - .maxValidationTime(Duration.ofSeconds(5)) - .maxAcquireTime(Duration.ofSeconds(15)) - .build(); + .acquireRetry(0) + .maxValidationTime(Duration.ofSeconds(5)) + .maxAcquireTime(Duration.ofSeconds(15)) + .build(); StepVerifier.withVirtualTime(() -> new ConnectionPool(configuration).create()) - .expectSubscription() - .thenAwait(Duration.ofSeconds(7)) - .expectError(R2dbcTimeoutException.class) - .verify(); + .expectSubscription() + .thenAwait(Duration.ofSeconds(7)) + .expectError(R2dbcTimeoutException.class) + .verify(); verify(connectionFactoryMock).create(); verify(connectionMock).close(); @@ -421,26 +473,26 @@ void shouldReusePooledConnectionAfterTimeout() { when(connectionFactoryMock.create()).thenReturn((Publisher) connectionPublisher); ConnectionPoolConfiguration configuration = ConnectionPoolConfiguration.builder(connectionFactoryMock) - .allocatorSubscribeOn(Schedulers.immediate()) - .acquireRetry(0) - .initialSize(0) - .maxAcquireTime(Duration.ofMillis(70)) - .build(); + .allocatorSubscribeOn(Schedulers.immediate()) + .acquireRetry(0) + .initialSize(0) + .maxAcquireTime(Duration.ofMillis(70)) + .build(); ConnectionPool pool = new ConnectionPool(configuration); AtomicReference firstConnectionHolder = new AtomicReference<>(); // fast connection retrieval, do not close the connection yet, so that next call will create a new connection pool.create() - .as(StepVerifier::create) - .consumeNextWith(firstConnectionHolder::set) - .verifyComplete(); + .as(StepVerifier::create) + .consumeNextWith(firstConnectionHolder::set) + .verifyComplete(); // slow connection retrieval pool.create() - .as(StepVerifier::create) - .expectError(R2dbcTimeoutException.class) - .verify(); + .as(StepVerifier::create) + .expectError(R2dbcTimeoutException.class) + .verify(); assertThat(counter).hasValue(2); @@ -449,10 +501,10 @@ void shouldReusePooledConnectionAfterTimeout() { // This should retrieve from pool, not fetching from the connection publisher. pool.create() - .as(StepVerifier::create) - .assertNext(actual -> { - StepVerifier.create(actual.close()).verifyComplete(); - }).verifyComplete(); + .as(StepVerifier::create) + .assertNext(actual -> { + StepVerifier.create(actual.close()).verifyComplete(); + }).verifyComplete(); assertThat(counter).hasValue(2); } @@ -468,11 +520,11 @@ void shouldConsiderMaxIdleTime() { CountingConnectionFactory connectionFactory = new CountingConnectionFactory(firstConnection, secondConnection); ConnectionPoolConfiguration configuration = ConnectionPoolConfiguration.builder(connectionFactory) - .clock(delayClock) - .initialSize(0) - .metricsRecorder(metricsRecorder) - .maxIdleTime(Duration.ofDays(2)) // set idle to 2 days - .build(); + .clock(delayClock) + .initialSize(0) + .metricsRecorder(metricsRecorder) + .maxIdleTime(Duration.ofDays(2)) // set idle to 2 days + .build(); ConnectionPool pool = new ConnectionPool(configuration); assertPoolCreatesConnectionSuccessfully(pool, firstConnection); @@ -500,10 +552,10 @@ void shouldConsiderMaxIdleTimeWithDefault() { CountingConnectionFactory connectionFactory = new CountingConnectionFactory(firstConnection, secondConnection); ConnectionPoolConfiguration configuration = ConnectionPoolConfiguration.builder(connectionFactory) - .clock(delayClock) - .initialSize(0) - .metricsRecorder(metricsRecorder) - .build(); + .clock(delayClock) + .initialSize(0) + .metricsRecorder(metricsRecorder) + .build(); ConnectionPool pool = new ConnectionPool(configuration); assertPoolCreatesConnectionSuccessfully(pool, firstConnection); @@ -526,9 +578,9 @@ void shouldConsiderNegativeMaxIdleTime() { CountingConnectionFactory connectionFactory = new CountingConnectionFactory(firstConnection, secondConnection); ConnectionPoolConfiguration configuration = ConnectionPoolConfiguration.builder(connectionFactory) - .initialSize(0) - .maxIdleTime(Duration.ofNanos(-1)) - .build(); + .initialSize(0) + .maxIdleTime(Duration.ofNanos(-1)) + .build(); ConnectionPool pool = new ConnectionPool(configuration); assertPoolCreatesConnectionSuccessfully(pool, firstConnection); @@ -545,10 +597,10 @@ void shouldConsiderBackgroundEvictionInterval() { CountingConnectionFactory connectionFactory = new CountingConnectionFactory(firstConnection); ConnectionPoolConfiguration configuration = ConnectionPoolConfiguration.builder(connectionFactory) - .initialSize(0) - .maxIdleTime(Duration.ofMillis(200)) - .backgroundEvictionInterval(Duration.ofMillis(100)) - .build(); + .initialSize(0) + .maxIdleTime(Duration.ofMillis(200)) + .backgroundEvictionInterval(Duration.ofMillis(100)) + .build(); ConnectionPool pool = new ConnectionPool(configuration); assertPoolCreatesConnectionSuccessfully(pool, firstConnection); @@ -564,9 +616,9 @@ void shouldConsiderMaxIdleWithBackgroundEviction() { CountingConnectionFactory connectionFactory = new CountingConnectionFactory(firstConnection); ConnectionPoolConfiguration configuration = ConnectionPoolConfiguration.builder(connectionFactory) - .initialSize(0) - .maxIdleTime(Duration.ofMillis(200)) - .build(); + .initialSize(0) + .maxIdleTime(Duration.ofMillis(200)) + .build(); ConnectionPool pool = new ConnectionPool(configuration); assertPoolCreatesConnectionSuccessfully(pool, firstConnection); @@ -583,10 +635,10 @@ void shouldConsiderDisabledBackgroundEvictionInterval() throws InterruptedExcept CountingConnectionFactory connectionFactory = new CountingConnectionFactory(firstConnection); ConnectionPoolConfiguration configuration = ConnectionPoolConfiguration.builder(connectionFactory) - .initialSize(0) - .maxIdleTime(Duration.ofMillis(200)) - .backgroundEvictionInterval(Duration.ZERO) - .build(); + .initialSize(0) + .maxIdleTime(Duration.ofMillis(200)) + .backgroundEvictionInterval(Duration.ZERO) + .build(); ConnectionPool pool = new ConnectionPool(configuration); assertPoolCreatesConnectionSuccessfully(pool, firstConnection); @@ -611,11 +663,11 @@ void shouldConsiderMaxLifetime() { CountingConnectionFactory connectionFactory = new CountingConnectionFactory(firstConnection, secondConnection); ConnectionPoolConfiguration configuration = ConnectionPoolConfiguration.builder(connectionFactory) - .clock(delayClock) - .initialSize(0) - .metricsRecorder(metricsRecorder) - .maxLifeTime(Duration.ofDays(1)) - .build(); + .clock(delayClock) + .initialSize(0) + .metricsRecorder(metricsRecorder) + .maxLifeTime(Duration.ofDays(1)) + .build(); ConnectionPool pool = new ConnectionPool(configuration); @@ -645,11 +697,11 @@ void shouldConsiderMaxLifetimeWithDefault() { CountingConnectionFactory connectionFactory = new CountingConnectionFactory(firstConnection); ConnectionPoolConfiguration configuration = ConnectionPoolConfiguration.builder(connectionFactory) - .clock(delayClock) - .initialSize(0) - .metricsRecorder(metricsRecorder) - .maxIdleTime(Duration.ofSeconds(-1)) // do not evict by idle time - .build(); + .clock(delayClock) + .initialSize(0) + .metricsRecorder(metricsRecorder) + .maxIdleTime(Duration.ofSeconds(-1)) // do not evict by idle time + .build(); ConnectionPool pool = new ConnectionPool(configuration); @@ -676,9 +728,9 @@ void shouldReportMetrics() { ConnectionPool pool = new ConnectionPool(configuration); pool.warmup() - .as(StepVerifier::create) - .expectNext(10) - .verifyComplete(); + .as(StepVerifier::create) + .expectNext(10) + .verifyComplete(); assertThat(pool.getMetrics()).isPresent().hasValueSatisfying(actual -> { @@ -712,9 +764,9 @@ void shouldRegisterToJmx() { when(connectionFactoryMock.create()).thenAnswer(it -> Mono.just(connectionMock)); ConnectionPoolConfiguration configuration = ConnectionPoolConfiguration.builder(connectionFactoryMock) - .name("my-pool") - .registerJmx(true) - .build(); + .name("my-pool") + .registerJmx(true) + .build(); ConnectionPool pool = new ConnectionPool(configuration); MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer(); @@ -725,9 +777,9 @@ void shouldRegisterToJmx() { assertThat(poolObjectNames).hasSize(1); ObjectName objectName = poolObjectNames.get(0); assertThat(objectName.getKeyPropertyList()) - .hasSize(2) - .containsEntry("name", "my-pool") - .containsEntry("type", ConnectionPool.class.getSimpleName()); + .hasSize(2) + .containsEntry("name", "my-pool") + .containsEntry("type", ConnectionPool.class.getSimpleName()); } @Test @@ -740,8 +792,8 @@ void shouldNotRegisterToJmx() { when(connectionFactoryMock.create()).thenAnswer(it -> Mono.just(connectionMock)); ConnectionPoolConfiguration configuration = ConnectionPoolConfiguration.builder(connectionFactoryMock) - .registerJmx(false) - .build(); + .registerJmx(false) + .build(); ConnectionPool pool = new ConnectionPool(configuration); MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer(); @@ -763,9 +815,9 @@ void shouldMBeanUnregisteredAtPoolDisposal() { when(connectionMock.close()).thenReturn(Mono.empty()); ConnectionPoolConfiguration configuration = ConnectionPoolConfiguration.builder(connectionFactoryMock) - .registerJmx(true) - .name("my-pool") - .build(); + .registerJmx(true) + .name("my-pool") + .build(); ConnectionPool pool = new ConnectionPool(configuration); MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer(); @@ -783,9 +835,9 @@ void shouldPropagateGracefullyDestroyHandlerFailure() { Connection connectionMock = mock(Connection.class); ConnectionPool pool = createConnectionPoolForDisposeTest(connectionMock); pool.warmup() - .as(StepVerifier::create) - .expectNext(10) - .verifyComplete(); + .as(StepVerifier::create) + .expectNext(10) + .verifyComplete(); IllegalArgumentException iae = new IllegalArgumentException(); @@ -807,9 +859,9 @@ void shouldPropagateGracefullyDestroyHandlerFailureOnDisposeLater() { ConnectionPool pool = createConnectionPoolForDisposeTest(connectionMock); pool.warmup() - .as(StepVerifier::create) - .expectNext(10) - .verifyComplete(); + .as(StepVerifier::create) + .expectNext(10) + .verifyComplete(); IllegalArgumentException iae = new IllegalArgumentException(); @@ -834,9 +886,9 @@ void disposedPoolShouldNoOpOnClose() { ConnectionPool pool = createConnectionPoolForDisposeTest(connectionMock); pool.warmup() - .as(StepVerifier::create) - .expectNext(10) - .verifyComplete(); + .as(StepVerifier::create) + .expectNext(10) + .verifyComplete(); pool.close().as(StepVerifier::create).verifyComplete(); @@ -861,11 +913,11 @@ void shouldDropConnectionOnFailedValidation() { when(connectionMock.validate(ValidationDepth.LOCAL)).thenReturn(Mono.just(false), Mono.empty()); ConnectionPoolConfiguration configuration = ConnectionPoolConfiguration.builder(connectionFactoryMock) - .allocatorSubscribeOn(Schedulers.immediate()) - .acquireRetry(0) - .initialSize(0) - .maxSize(2) - .build(); + .allocatorSubscribeOn(Schedulers.immediate()) + .acquireRetry(0) + .initialSize(0) + .maxSize(2) + .build(); ConnectionPool pool = new ConnectionPool(configuration); @@ -888,11 +940,11 @@ void shouldDropConnectionOnFailedValidationWithRetry() { when(connectionMock.validate(ValidationDepth.LOCAL)).thenReturn(Mono.just(false), Mono.just(false), Mono.empty()); ConnectionPoolConfiguration configuration = ConnectionPoolConfiguration.builder(connectionFactoryMock) - .allocatorSubscribeOn(Schedulers.immediate()) - .acquireRetry(1) - .initialSize(0) - .maxSize(2) - .build(); + .allocatorSubscribeOn(Schedulers.immediate()) + .acquireRetry(1) + .initialSize(0) + .maxSize(2) + .build(); ConnectionPool pool = new ConnectionPool(configuration); @@ -924,8 +976,8 @@ void shouldReportPoolInPool() { when(connectionFactoryMock.create()).thenReturn(Mono.empty()); ConnectionPoolConfiguration configuration = ConnectionPoolConfiguration.builder(connectionFactoryMock) - .initialSize(0) - .build(); + .initialSize(0) + .build(); new ConnectionPool(configuration); } @@ -943,7 +995,7 @@ void shouldInvokeLifecyclePostAllocate() { when(connectionMock.postAllocate()).thenReturn(Mono.fromRunnable(() -> wasCalled.set(true))); ConnectionPoolConfiguration configuration = ConnectionPoolConfiguration.builder(connectionFactoryMock) - .build(); + .build(); ConnectionPool pool = new ConnectionPool(configuration); pool.create().as(StepVerifier::create).expectNextCount(1).verifyComplete(); @@ -963,8 +1015,8 @@ void shouldInvokePostAllocate() { when(connectionMock.validate(any())).thenReturn(Mono.empty()); ConnectionPoolConfiguration configuration = ConnectionPoolConfiguration.builder(connectionFactoryMock) - .postAllocate(connection -> Mono.fromRunnable(() -> wasCalled.set(true))) - .build(); + .postAllocate(connection -> Mono.fromRunnable(() -> wasCalled.set(true))) + .build(); ConnectionPool pool = new ConnectionPool(configuration); pool.create().as(StepVerifier::create).expectNextCount(1).verifyComplete(); @@ -988,11 +1040,11 @@ void shouldInvokePostAllocateInOrder() { }); ConnectionPoolConfiguration configuration = ConnectionPoolConfiguration.builder(connectionFactoryMock) - .postAllocate(connection -> { - order.add("postAllocate"); - return Mono.fromRunnable(() -> order.add("postAllocate.subscribe")); - }) - .build(); + .postAllocate(connection -> { + order.add("postAllocate"); + return Mono.fromRunnable(() -> order.add("postAllocate.subscribe")); + }) + .build(); ConnectionPool pool = new ConnectionPool(configuration); pool.create().as(StepVerifier::create).expectNextCount(1).verifyComplete(); @@ -1021,7 +1073,7 @@ void cancelDuringAllocationShouldCompleteAtomically() throws InterruptedExceptio when(connectionMock.postAllocate()).thenReturn(prepare); ConnectionPoolConfiguration configuration = ConnectionPoolConfiguration.builder(connectionFactoryMock) - .build(); + .build(); ConnectionPool pool = new ConnectionPool(configuration); Disposable subscribe = pool.create().subscribe(); @@ -1067,7 +1119,7 @@ public void cancel() { when(connectionMock.validate(any())).thenReturn(validationPublisher); ConnectionPoolConfiguration configuration = ConnectionPoolConfiguration.builder(connectionFactoryMock).initialSize(0).maxAcquireTime(Duration.ofMillis(150)).acquireRetry(0) - .build(); + .build(); ConnectionPool pool = new ConnectionPool(configuration); CompletableFuture future = pool.create().toFuture(); @@ -1116,7 +1168,7 @@ public void cancel() { when(connectionMock.validate(any())).thenReturn(Mono.just(true)); ConnectionPoolConfiguration configuration = ConnectionPoolConfiguration.builder(connectionFactoryMock).initialSize(0).maxCreateConnectionTime(Duration.ofMillis(1)).acquireRetry(0) - .build(); + .build(); ConnectionPool pool = new ConnectionPool(configuration); CompletableFuture future = pool.create().toFuture(); @@ -1212,8 +1264,8 @@ public Publisher create() { int count = this.createCounter.getAndIncrement(); if (this.connections.size() <= count) { return Mono.error(new RuntimeException( - format("ConnectionFactory#create is called %d times which is more than given connection size %d.", - count + 1, this.connections.size()))); + format("ConnectionFactory#create is called %d times which is more than given connection size %d.", + count + 1, this.connections.size()))); } return Mono.just(this.connections.get(count)); });