diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/plugin/Instance.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/plugin/Instance.java index 0d43587f6eb..e67d5438821 100755 --- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/plugin/Instance.java +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/plugin/Instance.java @@ -39,6 +39,11 @@ public abstract class Instance extends AbstractStateWrapper { */ public abstract void destroy(); + /** + * notify destroy instance. + */ + public abstract void notifyDestroy(); + /** * get instance profile */ diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/CommonInstance.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/CommonInstance.java index 415b05825af..7267066aeee 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/CommonInstance.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/CommonInstance.java @@ -95,15 +95,29 @@ public boolean init(Object srcManager, InstanceProfile srcProfile) { @Override public void destroy() { - if (!inited) { - return; - } - doChangeState(State.SUCCEEDED); + Long start = AgentUtils.getCurrentTime(); + notifyDestroy(); while (running) { AgentUtils.silenceSleepInMs(DESTROY_LOOP_WAIT_TIME_MS); } + LOGGER.info("destroy instance wait run elapse {} ms instance {}", AgentUtils.getCurrentTime() - start, + profile.getInstanceId()); + start = AgentUtils.getCurrentTime(); this.source.destroy(); + LOGGER.info("destroy instance wait source elapse {} ms instance {}", AgentUtils.getCurrentTime() - start, + profile.getInstanceId()); + start = AgentUtils.getCurrentTime(); this.sink.destroy(); + LOGGER.info("destroy instance wait sink elapse {} ms instance {}", AgentUtils.getCurrentTime() - start, + profile.getInstanceId()); + } + + @Override + public void notifyDestroy() { + if (!inited) { + return; + } + doChangeState(State.SUCCEEDED); } @Override diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java index a37a171a372..ec4502a7fb7 100755 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java @@ -70,6 +70,7 @@ public class SenderManager { private static final Logger LOGGER = LoggerFactory.getLogger(SenderManager.class); private static final SequentialID SEQUENTIAL_ID = SequentialID.getInstance(); + public static final int RESEND_QUEUE_WAIT_MS = 10; // cache for group and sender list, share the map cross agent lifecycle. private DefaultMessageSender sender; private LinkedBlockingQueue resendQueue; @@ -172,9 +173,12 @@ public void Stop() { } private void closeMessageSender() { + Long start = AgentUtils.getCurrentTime(); if (sender != null) { sender.close(); } + LOGGER.info("close sender elapse {} ms instance {}", AgentUtils.getCurrentTime() - start, + profile.getInstanceId()); } private AgentMetricItem getMetricItem(Map otherDimensions) { @@ -286,7 +290,7 @@ private Runnable flushResendQueue() { resendRunning = true; while (!shutdown) { try { - AgentSenderCallback callback = resendQueue.poll(1, TimeUnit.SECONDS); + AgentSenderCallback callback = resendQueue.poll(RESEND_QUEUE_WAIT_MS, TimeUnit.MILLISECONDS); if (callback != null) { SenderMessage message = callback.message; AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_RESEND, message.getGroupId(), diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/AbstractSource.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/AbstractSource.java index 8929b33d019..803b9235d2c 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/AbstractSource.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/AbstractSource.java @@ -80,8 +80,8 @@ protected class SourceData { protected final Integer BATCH_READ_LINE_COUNT = 10000; protected final Integer BATCH_READ_LINE_TOTAL_LEN = 1024 * 1024; protected final Integer CACHE_QUEUE_SIZE = 10 * BATCH_READ_LINE_COUNT; - protected final Integer READ_WAIT_TIMEOUT_MS = 10; - private final Integer EMPTY_CHECK_COUNT_AT_LEAST = 5 * 60; + protected final Integer WAIT_TIMEOUT_MS = 10; + private final Integer EMPTY_CHECK_COUNT_AT_LEAST = 5 * 60 * 100; private final Integer CORE_THREAD_PRINT_INTERVAL_MS = 1000; protected BlockingQueue queue; @@ -172,7 +172,7 @@ private void doRun() { emptyCount = 0; } MemoryManager.getInstance().release(AGENT_GLOBAL_READER_SOURCE_PERMIT, BATCH_READ_LINE_TOTAL_LEN); - AgentUtils.silenceSleepInSeconds(1); + AgentUtils.silenceSleepInMs(WAIT_TIMEOUT_MS); continue; } emptyCount = 0; @@ -231,7 +231,7 @@ private boolean waitForPermit(String permitName, int permitLen) { if (!isRunnable()) { return false; } - AgentUtils.silenceSleepInSeconds(1); + AgentUtils.silenceSleepInMs(WAIT_TIMEOUT_MS); } } return true; @@ -247,7 +247,7 @@ private void putIntoQueue(SourceData sourceData) { try { boolean offerSuc = false; while (isRunnable() && !offerSuc) { - offerSuc = queue.offer(sourceData, 1, TimeUnit.SECONDS); + offerSuc = queue.offer(sourceData, WAIT_TIMEOUT_MS, TimeUnit.MILLISECONDS); } if (!offerSuc) { MemoryManager.getInstance().release(AGENT_GLOBAL_READER_QUEUE_PERMIT, sourceData.getData().length); @@ -338,7 +338,7 @@ private boolean filterSourceData(Message msg) { private SourceData readFromQueue() { SourceData sourceData = null; try { - sourceData = queue.poll(READ_WAIT_TIMEOUT_MS, TimeUnit.MILLISECONDS); + sourceData = queue.poll(WAIT_TIMEOUT_MS, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { LOGGER.warn("poll {} data get interrupted.", instanceId); } @@ -405,7 +405,7 @@ private void clearQueue(BlockingQueue queue) { while (queue != null && !queue.isEmpty()) { SourceData sourceData = null; try { - sourceData = queue.poll(READ_WAIT_TIMEOUT_MS, TimeUnit.MILLISECONDS); + sourceData = queue.poll(WAIT_TIMEOUT_MS, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { LOGGER.warn("poll {} data get interrupted.", instanceId, e); } diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/instance/MockInstance.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/instance/MockInstance.java index d3dc67df5ca..278a9298f96 100644 --- a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/instance/MockInstance.java +++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/instance/MockInstance.java @@ -55,6 +55,11 @@ public void destroy() { destroyTime = index.getAndAdd(1); } + @Override + public void notifyDestroy() { + + } + @Override public InstanceProfile getProfile() { return profile; diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java index 6ee892c914a..5d6871fecbc 100644 --- a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java +++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java @@ -90,7 +90,7 @@ private LogFileSource getSource(int taskId, long offset) { Whitebox.setInternalState(source, "CORE_THREAD_PRINT_INTERVAL_MS", 0); Whitebox.setInternalState(source, "SIZE_OF_BUFFER_TO_READ_FILE", 2); Whitebox.setInternalState(source, "EMPTY_CHECK_COUNT_AT_LEAST", 3); - Whitebox.setInternalState(source, "READ_WAIT_TIMEOUT_MS", 10); + Whitebox.setInternalState(source, "WAIT_TIMEOUT_MS", 10); if (offset > 0) { OffsetProfile offsetProfile = new OffsetProfile(instanceProfile.getTaskId(), instanceProfile.getInstanceId(),