From 100e7c3c9fa7bfc74d525e973c14637aba69bb26 Mon Sep 17 00:00:00 2001 From: Prudhviraj Karumanchi Date: Mon, 9 Dec 2024 11:53:37 -0800 Subject: [PATCH] Changes: Delay of 3s per ASG during boot up. Long term fix is to change this to completableFuture. --- .../java/com/netflix/evcache/EVCacheImpl.java | 2 +- .../metrics/EVCacheMetricsFactory.java | 3 +- .../evcache/pool/EVCacheClientPool.java | 104 ++++++++++++------ 3 files changed, 73 insertions(+), 36 deletions(-) diff --git a/evcache-core/src/main/java/com/netflix/evcache/EVCacheImpl.java b/evcache-core/src/main/java/com/netflix/evcache/EVCacheImpl.java index b17450b3..cb24023c 100644 --- a/evcache-core/src/main/java/com/netflix/evcache/EVCacheImpl.java +++ b/evcache-core/src/main/java/com/netflix/evcache/EVCacheImpl.java @@ -176,7 +176,7 @@ public class EVCacheImpl implements EVCache, EVCacheImplMBean { this._pool = poolManager.getEVCacheClientPool(_appName); }); - _pool.pingServers(); + _pool.pingServers(true); setupMonitoring(); } diff --git a/evcache-core/src/main/java/com/netflix/evcache/metrics/EVCacheMetricsFactory.java b/evcache-core/src/main/java/com/netflix/evcache/metrics/EVCacheMetricsFactory.java index 47dc890d..a57ad347 100644 --- a/evcache-core/src/main/java/com/netflix/evcache/metrics/EVCacheMetricsFactory.java +++ b/evcache-core/src/main/java/com/netflix/evcache/metrics/EVCacheMetricsFactory.java @@ -288,6 +288,8 @@ public String getStatusCode(StatusCode sc) { public static final String INTERNAL_POOL_SG_CONFIG = "internal.evc.client.pool.asg.config"; public static final String INTERNAL_POOL_CONFIG = "internal.evc.client.pool.config"; public static final String INTERNAL_POOL_REFRESH = "internal.evc.client.pool.refresh"; + public static final String INTERNAL_PING_SERVER = "internal.evc.client.ping.server"; + public static final String INTERNAL_PING_SERVER_FAILURES = "internal.evc.client.ping.server.failures"; public static final String INTERNAL_BOOTSTRAP_EUREKA = "internal.evc.client.pool.bootstrap.eureka"; @@ -309,7 +311,6 @@ public String getStatusCode(StatusCode sc) { public static final String POOL_REFRESH_ASYNC = "refreshAsync"; public static final String POOL_OPERATIONS = "operations"; - /** * Metric Tags Names */ diff --git a/evcache-core/src/main/java/com/netflix/evcache/pool/EVCacheClientPool.java b/evcache-core/src/main/java/com/netflix/evcache/pool/EVCacheClientPool.java index 11d77a52..94fb464d 100644 --- a/evcache-core/src/main/java/com/netflix/evcache/pool/EVCacheClientPool.java +++ b/evcache-core/src/main/java/com/netflix/evcache/pool/EVCacheClientPool.java @@ -16,10 +16,7 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentSkipListMap; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantLock; import java.util.function.Consumer; @@ -979,7 +976,7 @@ private void updateMemcachedReadInstancesByZone() { } private void cleanupMemcachedInstances(boolean force) { - pingServers(); + pingServers(false); for (Iterator>> it = memcachedInstancesByServerGroup.entrySet().iterator(); it.hasNext();) { final Entry> serverGroupEntry = it.next(); final List instancesInAServerGroup = serverGroupEntry.getValue(); @@ -1109,7 +1106,7 @@ private synchronized void refresh(boolean force) throws IOException { } updateMemcachedReadInstancesByZone(); updateQueueStats(); - if (_pingServers.get()) pingServers(); + if (_pingServers.get()) pingServers(false); } catch (Throwable t) { log.error("Exception while refreshing the Server list", t); } finally { @@ -1167,52 +1164,91 @@ private void updateQueueStats() { } } - public void pingServers() { + public void pingServers(Boolean bootTimeCheck) { + final long start = System.currentTimeMillis(); + try { final Map> allServers = getAllInstancesByZone(); + for (Entry> entry : allServers.entrySet()) { final List listOfClients = entry.getValue(); for (EVCacheClient client : listOfClients) { - - int maxRetries = 10; - long retryDelayMs = 1000; - for (int i = 0; i < maxRetries; i++) { - final Map versions = client.getVersions(); - boolean allNodesOk = true; - - for (Entry vEntry : versions.entrySet()) { - String version = vEntry.getValue(); - // Only accept version in format like "1.6.15" - if (!version.matches("\\d+\\.\\d+\\.\\d+")) { - allNodesOk = false; - log.warn("Node not ready or invalid version: {}, response: {}, attempt {}", - vEntry.getKey(), version, i + 1); - break; - } - } - - if (allNodesOk) { + if (!bootTimeCheck) { + // Just log versions and continue if not a boot time check + try { + final Map versions = client.getVersions(); if (log.isDebugEnabled()) { for (Entry vEntry : versions.entrySet()) { log.debug("Host : {} Version : {}", vEntry.getKey(), vEntry.getValue()); } } - break; + continue; + } catch (Exception e) { + log.warn("Error getting versions for client: {}", client, e); + continue; } + } - if (i < maxRetries - 1) { - Thread.sleep(retryDelayMs); - } else { - log.error("Some nodes not ready after max retries for client: {}", client); + long startTime = System.currentTimeMillis(); + long timeoutMs = 3000; // 3 seconds + boolean success = false; + + while (System.currentTimeMillis() - startTime < timeoutMs && !success) { + try { + final Map versions = client.getVersions(); + boolean allNodesOk = true; + + for (Entry vEntry : versions.entrySet()) { + String version = vEntry.getValue(); + if (!version.matches("\\d+\\.\\d+\\.\\d+")) { + allNodesOk = false; + log.warn("Node not ready or invalid version: {}, response: {}", + vEntry.getKey(), version); + break; + } + } + + if (allNodesOk) { + if (log.isDebugEnabled()) { + for (Entry vEntry : versions.entrySet()) { + log.debug("Host : {} Version : {}", vEntry.getKey(), vEntry.getValue()); + } + } + success = true; + break; + } + + Thread.sleep(100); // 100ms delay between retries + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + log.warn("Interrupted while pinging servers for client: {}", client); + break; + } catch (Exception e) { + log.warn("Error while pinging server for client: {}", client, e); + try { + Thread.sleep(100); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + break; + } } } + + if (!success && bootTimeCheck) { + log.warn("Failed to get valid version from client {} within timeout", client); + EVCacheMetricsFactory.getInstance().getCounter(EVCacheMetricsFactory.INTERNAL_PING_SERVER_FAILURES,tagList).increment(); + } } } - if (duetClientPool != null) - duetClientPool.pingServers(); + if (duetClientPool != null) { + duetClientPool.pingServers(false); + } } catch (Throwable t) { - log.error("Error while pinging the servers", t); + log.warn("Error while pinging the servers", t); + EVCacheMetricsFactory.getInstance().getCounter(EVCacheMetricsFactory.INTERNAL_PING_SERVER_FAILURES,tagList).increment(); + } finally { + EVCacheMetricsFactory.getInstance().getPercentileTimer(EVCacheMetricsFactory.INTERNAL_PING_SERVER, tagList, Duration.ofMillis(100)).record(System.currentTimeMillis() - start, TimeUnit.MILLISECONDS); } }