Skip to content

Commit

Permalink
concord-server: implement a better way to kill processes (#572)
Browse files Browse the repository at this point in the history
  • Loading branch information
brig authored Mar 6, 2022
1 parent da9732f commit fd66d5b
Showing 1 changed file with 57 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -171,58 +171,67 @@ public void kill(ProcessKey processKey) {
}

public void kill(DSLContext tx, ProcessKey processKey) {
ProcessEntry process = queueDao.get(tx, processKey, Collections.emptySet());
if (process == null) {
throw new ProcessException(null, "Process not found: " + processKey, Status.NOT_FOUND);
}
ProcessEntry process = assertProcess(tx, processKey);

assertKillOrDisableRights(process);

kill(tx, process);
}

public void kill(DSLContext tx, ProcessEntry process) {
ProcessStatus s = process.status();
if (TERMINATED_PROCESS_STATUSES.contains(s)) {
return;
}

boolean cancelled = false;
boolean isServerProcess = SERVER_PROCESS_STATUSES.contains(s);
if (isServerProcess) {
cancelled = queueManager.updateExpectedStatus(tx, new ProcessKey(process.instanceId(), process.createdAt()), process.status(), ProcessStatus.CANCELLED);
while (!cancelled) {
if (TERMINATED_PROCESS_STATUSES.contains(process.status())) {
return;
}

boolean isServerProcess = SERVER_PROCESS_STATUSES.contains(process.status());
if (isServerProcess) {
cancelled = queueManager.updateExpectedStatus(tx, processKey, process.status(), ProcessStatus.CANCELLED);
}

if (!cancelled && process.lastAgentId() != null) {
agentManager.killProcess(tx, processKey, process.lastAgentId());
cancelled = true;
}

if (cancelled) {
auditLogOnCancelled(process);
} else {
process = assertProcess(tx, processKey);
}
}

if (!cancelled) {
agentManager.killProcess(tx, new ProcessKey(process.instanceId(), process.createdAt()), process.lastAgentId());
}

auditLogOnCancelled(process);
}

public void kill(DSLContext tx, List<ProcessKey> processKeys) {
// TODO: better way
List<ProcessEntry> processes = processKeys.stream()
.map(k -> queueDao.get(tx, k, Collections.emptySet()))
.collect(Collectors.toList());

List<ProcessEntry> serverProcesses = filterProcesses(processes, SERVER_PROCESS_STATUSES);
if (!serverProcesses.isEmpty()) {
List<ProcessKey> serverProcessKeys = serverProcesses.stream()
.map(p -> new ProcessKey(p.instanceId(), p.createdAt()))
List<ProcessKey> keys = new ArrayList<>(processKeys);
while(!keys.isEmpty()) {
// TODO: better way
List<ProcessEntry> processes = keys.stream()
.map(k -> queueDao.get(tx, k, Collections.emptySet()))
.collect(Collectors.toList());

List<ProcessKey> updated = queueManager.updateExpectedStatus(tx, serverProcessKeys, SERVER_PROCESS_STATUSES, ProcessStatus.CANCELLED);
serverProcesses.stream()
.filter(p -> updated.contains(new ProcessKey(p.instanceId(), p.createdAt())))
.forEach(this::auditLogOnCancelled);
}
List<ProcessEntry> terminatedProcesses = filterProcesses(processes, TERMINATED_PROCESS_STATUSES);
terminatedProcesses.forEach(p -> keys.remove(new ProcessKey(p.instanceId(), p.createdAt())));

List<ProcessEntry> serverProcesses = filterProcesses(processes, SERVER_PROCESS_STATUSES);
if (!serverProcesses.isEmpty()) {
List<ProcessKey> serverProcessKeys = serverProcesses.stream()
.map(p -> new ProcessKey(p.instanceId(), p.createdAt()))
.collect(Collectors.toList());

List<ProcessKey> updated = queueManager.updateExpectedStatus(tx, serverProcessKeys, SERVER_PROCESS_STATUSES, ProcessStatus.CANCELLED);
serverProcesses.stream()
.filter(p -> updated.contains(new ProcessKey(p.instanceId(), p.createdAt())))
.forEach(this::auditLogOnCancelled);

keys.removeAll(updated);
}

List<ProcessEntry> agentProcesses = filterProcesses(processes, AGENT_PROCESS_STATUSES);
if (!agentProcesses.isEmpty()) {
agentManager.killProcess(agentProcesses.stream().map(p -> new KeyAndAgent(new ProcessKey(p.instanceId(), p.createdAt()), p.lastAgentId())).collect(Collectors.toList()));
List<ProcessEntry> agentProcesses = filterProcesses(processes, AGENT_PROCESS_STATUSES);
if (!agentProcesses.isEmpty()) {
agentManager.killProcess(agentProcesses.stream().map(p -> new KeyAndAgent(new ProcessKey(p.instanceId(), p.createdAt()), p.lastAgentId())).collect(Collectors.toList()));

agentProcesses.forEach(this::auditLogOnCancelled);
agentProcesses.forEach(this::auditLogOnCancelled);

agentProcesses.forEach(p -> keys.remove(new ProcessKey(p.instanceId(), p.createdAt())));
}
}
}

Expand Down Expand Up @@ -307,6 +316,14 @@ public void updateExclusive(DSLContext tx, ProcessKey processKey, ExclusiveMode
queueDao.updateExclusive(tx, processKey, exclusive);
}

private ProcessEntry assertProcess(DSLContext tx, ProcessKey processKey) {
ProcessEntry process = queueDao.get(tx, processKey, Collections.emptySet());
if (process != null) {
return process;
}
throw new ProcessException(null, "Process not found: " + processKey, Status.NOT_FOUND);
}

private boolean isSuspended(ProcessKey processKey) {
String resource = path(Constants.Files.JOB_ATTACHMENTS_DIR_NAME,
Constants.Files.JOB_STATE_DIR_NAME,
Expand Down

0 comments on commit fd66d5b

Please sign in to comment.