Skip to content

Commit

Permalink
[INLONG-11516][Agent] Accelerate the process exit speed
Browse files Browse the repository at this point in the history
  • Loading branch information
justinwwhuang committed Nov 21, 2024
1 parent 22d9a33 commit 5da5e9e
Show file tree
Hide file tree
Showing 6 changed files with 41 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ public abstract class Instance extends AbstractStateWrapper {
*/
public abstract void destroy();

/**
* notify destroy instance.
*/
public abstract void notifyDestroy();

/**
* get instance profile
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,15 +95,29 @@ public boolean init(Object srcManager, InstanceProfile srcProfile) {

@Override
public void destroy() {
if (!inited) {
return;
}
doChangeState(State.SUCCEEDED);
Long start = AgentUtils.getCurrentTime();
notifyDestroy();
while (running) {
AgentUtils.silenceSleepInMs(DESTROY_LOOP_WAIT_TIME_MS);
}
LOGGER.info("destroy instance wait run elapse {} ms instance {}", AgentUtils.getCurrentTime() - start,
profile.getInstanceId());
start = AgentUtils.getCurrentTime();
this.source.destroy();
LOGGER.info("destroy instance wait source elapse {} ms instance {}", AgentUtils.getCurrentTime() - start,
profile.getInstanceId());
start = AgentUtils.getCurrentTime();
this.sink.destroy();
LOGGER.info("destroy instance wait sink elapse {} ms instance {}", AgentUtils.getCurrentTime() - start,
profile.getInstanceId());
}

@Override
public void notifyDestroy() {
if (!inited) {
return;
}
doChangeState(State.SUCCEEDED);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ public class SenderManager {

private static final Logger LOGGER = LoggerFactory.getLogger(SenderManager.class);
private static final SequentialID SEQUENTIAL_ID = SequentialID.getInstance();
public static final int RESEND_QUEUE_WAIT_MS = 10;
// cache for group and sender list, share the map cross agent lifecycle.
private DefaultMessageSender sender;
private LinkedBlockingQueue<AgentSenderCallback> resendQueue;
Expand Down Expand Up @@ -172,9 +173,12 @@ public void Stop() {
}

private void closeMessageSender() {
Long start = AgentUtils.getCurrentTime();
if (sender != null) {
sender.close();
}
LOGGER.info("close sender elapse {} ms instance {}", AgentUtils.getCurrentTime() - start,
profile.getInstanceId());
}

private AgentMetricItem getMetricItem(Map<String, String> otherDimensions) {
Expand Down Expand Up @@ -286,7 +290,7 @@ private Runnable flushResendQueue() {
resendRunning = true;
while (!shutdown) {
try {
AgentSenderCallback callback = resendQueue.poll(1, TimeUnit.SECONDS);
AgentSenderCallback callback = resendQueue.poll(RESEND_QUEUE_WAIT_MS, TimeUnit.MILLISECONDS);
if (callback != null) {
SenderMessage message = callback.message;
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_RESEND, message.getGroupId(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,8 @@ protected class SourceData {
protected final Integer BATCH_READ_LINE_COUNT = 10000;
protected final Integer BATCH_READ_LINE_TOTAL_LEN = 1024 * 1024;
protected final Integer CACHE_QUEUE_SIZE = 10 * BATCH_READ_LINE_COUNT;
protected final Integer READ_WAIT_TIMEOUT_MS = 10;
private final Integer EMPTY_CHECK_COUNT_AT_LEAST = 5 * 60;
protected final Integer WAIT_TIMEOUT_MS = 10;
private final Integer EMPTY_CHECK_COUNT_AT_LEAST = 5 * 60 * 100;
private final Integer CORE_THREAD_PRINT_INTERVAL_MS = 1000;
protected BlockingQueue<SourceData> queue;

Expand Down Expand Up @@ -172,7 +172,7 @@ private void doRun() {
emptyCount = 0;
}
MemoryManager.getInstance().release(AGENT_GLOBAL_READER_SOURCE_PERMIT, BATCH_READ_LINE_TOTAL_LEN);
AgentUtils.silenceSleepInSeconds(1);
AgentUtils.silenceSleepInMs(WAIT_TIMEOUT_MS);
continue;
}
emptyCount = 0;
Expand Down Expand Up @@ -231,7 +231,7 @@ private boolean waitForPermit(String permitName, int permitLen) {
if (!isRunnable()) {
return false;
}
AgentUtils.silenceSleepInSeconds(1);
AgentUtils.silenceSleepInMs(WAIT_TIMEOUT_MS);
}
}
return true;
Expand All @@ -247,7 +247,7 @@ private void putIntoQueue(SourceData sourceData) {
try {
boolean offerSuc = false;
while (isRunnable() && !offerSuc) {
offerSuc = queue.offer(sourceData, 1, TimeUnit.SECONDS);
offerSuc = queue.offer(sourceData, WAIT_TIMEOUT_MS, TimeUnit.MILLISECONDS);
}
if (!offerSuc) {
MemoryManager.getInstance().release(AGENT_GLOBAL_READER_QUEUE_PERMIT, sourceData.getData().length);
Expand Down Expand Up @@ -338,7 +338,7 @@ private boolean filterSourceData(Message msg) {
private SourceData readFromQueue() {
SourceData sourceData = null;
try {
sourceData = queue.poll(READ_WAIT_TIMEOUT_MS, TimeUnit.MILLISECONDS);
sourceData = queue.poll(WAIT_TIMEOUT_MS, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
LOGGER.warn("poll {} data get interrupted.", instanceId);
}
Expand Down Expand Up @@ -405,7 +405,7 @@ private void clearQueue(BlockingQueue<SourceData> queue) {
while (queue != null && !queue.isEmpty()) {
SourceData sourceData = null;
try {
sourceData = queue.poll(READ_WAIT_TIMEOUT_MS, TimeUnit.MILLISECONDS);
sourceData = queue.poll(WAIT_TIMEOUT_MS, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
LOGGER.warn("poll {} data get interrupted.", instanceId, e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,11 @@ public void destroy() {
destroyTime = index.getAndAdd(1);
}

@Override
public void notifyDestroy() {

}

@Override
public InstanceProfile getProfile() {
return profile;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ private LogFileSource getSource(int taskId, long offset) {
Whitebox.setInternalState(source, "CORE_THREAD_PRINT_INTERVAL_MS", 0);
Whitebox.setInternalState(source, "SIZE_OF_BUFFER_TO_READ_FILE", 2);
Whitebox.setInternalState(source, "EMPTY_CHECK_COUNT_AT_LEAST", 3);
Whitebox.setInternalState(source, "READ_WAIT_TIMEOUT_MS", 10);
Whitebox.setInternalState(source, "WAIT_TIMEOUT_MS", 10);
if (offset > 0) {
OffsetProfile offsetProfile = new OffsetProfile(instanceProfile.getTaskId(),
instanceProfile.getInstanceId(),
Expand Down

0 comments on commit 5da5e9e

Please sign in to comment.