From ace6cfe9b0c3b614ecd000f1e4fbd4b495b7fef0 Mon Sep 17 00:00:00 2001 From: Johannes Edmeier Date: Fri, 3 May 2024 14:55:47 +0200 Subject: [PATCH] PostgresChannelMessageTableSubscriber: Renew connection only if invalid Fixes: #9111 An evolution of the #9061: renew the connection only when we need to. (cherry picked from commit da29e2da6a21d1ab1e6d23883f7ac424f2200149) --- ...PostgresChannelMessageTableSubscriber.java | 5 +++-- ...resChannelMessageTableSubscriberTests.java | 20 ++++++++++++++++--- 2 files changed, 20 insertions(+), 5 deletions(-) diff --git a/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/channel/PostgresChannelMessageTableSubscriber.java b/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/channel/PostgresChannelMessageTableSubscriber.java index 9075ebedc31..68ae5110a11 100644 --- a/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/channel/PostgresChannelMessageTableSubscriber.java +++ b/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/channel/PostgresChannelMessageTableSubscriber.java @@ -236,9 +236,10 @@ private void doStart(CountDownLatch startingLatch) { if (!isActive()) { return; } - if (notifications == null || notifications.length == 0) { + if ((notifications == null || notifications.length == 0) && !conn.isValid(1)) { //We did not receive any notifications within the timeout period. - //We will close the connection and re-establish it. + //If the connection is still valid, we will continue polling + //Otherwise, we will close the connection and re-establish it. break; } for (PGNotification notification : notifications) { diff --git a/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/channel/PostgresChannelMessageTableSubscriberTests.java b/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/channel/PostgresChannelMessageTableSubscriberTests.java index 7a74cd72024..d028e7ee820 100644 --- a/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/channel/PostgresChannelMessageTableSubscriberTests.java +++ b/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/channel/PostgresChannelMessageTableSubscriberTests.java @@ -23,8 +23,10 @@ import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; import javax.sql.DataSource; @@ -270,7 +272,18 @@ public void testRenewConnection() throws Exception { CountDownLatch latch = new CountDownLatch(2); List payloads = new ArrayList<>(); CountDownLatch connectionLatch = new CountDownLatch(2); - connectionSupplier.onGetConnection = connectionLatch::countDown; + AtomicBoolean connectionCloseState = new AtomicBoolean(); + connectionSupplier.onGetConnection = conn -> { + connectionLatch.countDown(); + if (connectionCloseState.compareAndSet(false, true)) { + try { + conn.close(); + } + catch (Exception e) { + //nop + } + } + }; postgresChannelMessageTableSubscriber.start(); postgresSubscribableChannel.subscribe(message -> { payloads.add(message.getPayload()); @@ -326,7 +339,7 @@ public JdbcChannelMessageStore jdbcChannelMessageStore(DataSource dataSource) { private static class ConnectionSupplier implements PgConnectionSupplier { - Runnable onGetConnection; + Consumer onGetConnection; @Override public PgConnection get() throws SQLException { @@ -335,10 +348,11 @@ public PgConnection get() throws SQLException { POSTGRES_CONTAINER.getPassword()) .unwrap(PgConnection.class); if (this.onGetConnection != null) { - this.onGetConnection.run(); + this.onGetConnection.accept(conn); } return conn; } } + }