From fd66d5bbb178a1ed8a7823d3603767482992e92d Mon Sep 17 00:00:00 2001 From: Yury Brigadirenko Date: Sun, 6 Mar 2022 23:15:53 +0300 Subject: [PATCH] concord-server: implement a better way to kill processes (#572) --- .../server/process/ProcessManager.java | 97 +++++++++++-------- 1 file changed, 57 insertions(+), 40 deletions(-) diff --git a/server/impl/src/main/java/com/walmartlabs/concord/server/process/ProcessManager.java b/server/impl/src/main/java/com/walmartlabs/concord/server/process/ProcessManager.java index 4871241569..4434286f91 100644 --- a/server/impl/src/main/java/com/walmartlabs/concord/server/process/ProcessManager.java +++ b/server/impl/src/main/java/com/walmartlabs/concord/server/process/ProcessManager.java @@ -171,58 +171,67 @@ public void kill(ProcessKey processKey) { } public void kill(DSLContext tx, ProcessKey processKey) { - ProcessEntry process = queueDao.get(tx, processKey, Collections.emptySet()); - if (process == null) { - throw new ProcessException(null, "Process not found: " + processKey, Status.NOT_FOUND); - } + ProcessEntry process = assertProcess(tx, processKey); assertKillOrDisableRights(process); - kill(tx, process); - } - - public void kill(DSLContext tx, ProcessEntry process) { - ProcessStatus s = process.status(); - if (TERMINATED_PROCESS_STATUSES.contains(s)) { - return; - } - boolean cancelled = false; - boolean isServerProcess = SERVER_PROCESS_STATUSES.contains(s); - if (isServerProcess) { - cancelled = queueManager.updateExpectedStatus(tx, new ProcessKey(process.instanceId(), process.createdAt()), process.status(), ProcessStatus.CANCELLED); + while (!cancelled) { + if (TERMINATED_PROCESS_STATUSES.contains(process.status())) { + return; + } + + boolean isServerProcess = SERVER_PROCESS_STATUSES.contains(process.status()); + if (isServerProcess) { + cancelled = queueManager.updateExpectedStatus(tx, processKey, process.status(), ProcessStatus.CANCELLED); + } + + if (!cancelled && process.lastAgentId() != null) { + agentManager.killProcess(tx, processKey, process.lastAgentId()); + cancelled = true; + } + + if (cancelled) { + auditLogOnCancelled(process); + } else { + process = assertProcess(tx, processKey); + } } - - if (!cancelled) { - agentManager.killProcess(tx, new ProcessKey(process.instanceId(), process.createdAt()), process.lastAgentId()); - } - - auditLogOnCancelled(process); } public void kill(DSLContext tx, List processKeys) { - // TODO: better way - List processes = processKeys.stream() - .map(k -> queueDao.get(tx, k, Collections.emptySet())) - .collect(Collectors.toList()); - - List serverProcesses = filterProcesses(processes, SERVER_PROCESS_STATUSES); - if (!serverProcesses.isEmpty()) { - List serverProcessKeys = serverProcesses.stream() - .map(p -> new ProcessKey(p.instanceId(), p.createdAt())) + List keys = new ArrayList<>(processKeys); + while(!keys.isEmpty()) { + // TODO: better way + List processes = keys.stream() + .map(k -> queueDao.get(tx, k, Collections.emptySet())) .collect(Collectors.toList()); - List updated = queueManager.updateExpectedStatus(tx, serverProcessKeys, SERVER_PROCESS_STATUSES, ProcessStatus.CANCELLED); - serverProcesses.stream() - .filter(p -> updated.contains(new ProcessKey(p.instanceId(), p.createdAt()))) - .forEach(this::auditLogOnCancelled); - } + List terminatedProcesses = filterProcesses(processes, TERMINATED_PROCESS_STATUSES); + terminatedProcesses.forEach(p -> keys.remove(new ProcessKey(p.instanceId(), p.createdAt()))); + + List serverProcesses = filterProcesses(processes, SERVER_PROCESS_STATUSES); + if (!serverProcesses.isEmpty()) { + List serverProcessKeys = serverProcesses.stream() + .map(p -> new ProcessKey(p.instanceId(), p.createdAt())) + .collect(Collectors.toList()); + + List updated = queueManager.updateExpectedStatus(tx, serverProcessKeys, SERVER_PROCESS_STATUSES, ProcessStatus.CANCELLED); + serverProcesses.stream() + .filter(p -> updated.contains(new ProcessKey(p.instanceId(), p.createdAt()))) + .forEach(this::auditLogOnCancelled); + + keys.removeAll(updated); + } - List agentProcesses = filterProcesses(processes, AGENT_PROCESS_STATUSES); - if (!agentProcesses.isEmpty()) { - agentManager.killProcess(agentProcesses.stream().map(p -> new KeyAndAgent(new ProcessKey(p.instanceId(), p.createdAt()), p.lastAgentId())).collect(Collectors.toList())); + List agentProcesses = filterProcesses(processes, AGENT_PROCESS_STATUSES); + if (!agentProcesses.isEmpty()) { + agentManager.killProcess(agentProcesses.stream().map(p -> new KeyAndAgent(new ProcessKey(p.instanceId(), p.createdAt()), p.lastAgentId())).collect(Collectors.toList())); - agentProcesses.forEach(this::auditLogOnCancelled); + agentProcesses.forEach(this::auditLogOnCancelled); + + agentProcesses.forEach(p -> keys.remove(new ProcessKey(p.instanceId(), p.createdAt()))); + } } } @@ -307,6 +316,14 @@ public void updateExclusive(DSLContext tx, ProcessKey processKey, ExclusiveMode queueDao.updateExclusive(tx, processKey, exclusive); } + private ProcessEntry assertProcess(DSLContext tx, ProcessKey processKey) { + ProcessEntry process = queueDao.get(tx, processKey, Collections.emptySet()); + if (process != null) { + return process; + } + throw new ProcessException(null, "Process not found: " + processKey, Status.NOT_FOUND); + } + private boolean isSuspended(ProcessKey processKey) { String resource = path(Constants.Files.JOB_ATTACHMENTS_DIR_NAME, Constants.Files.JOB_STATE_DIR_NAME,