Skip to content

Commit

Permalink
server: resume event assert (#838)
Browse files Browse the repository at this point in the history
  • Loading branch information
brig authored Dec 1, 2023
1 parent 21dc130 commit 7f8ae35
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,8 @@

import javax.inject.Inject;
import javax.inject.Named;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import java.io.IOException;
import java.io.Serializable;
import java.io.*;
import java.util.*;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -312,6 +310,22 @@ public ProcessEntry assertProcess(UUID instanceId) {
return p;
}

public void assertResumeEvents(ProcessKey processKey, Set<String> events) {
if (events.isEmpty()) {
throw new ConcordApplicationException("Empty resume events", Status.BAD_REQUEST);
}

Set<String> expectedEvents = getResumeEvents(processKey);

Set<String> unexpectedEvents = new HashSet<>(events);
unexpectedEvents.removeAll(expectedEvents);

if (!unexpectedEvents.isEmpty()) {
logManager.warn(processKey, "Unexpected 'resume' events: {}, expected: {}", unexpectedEvents, expectedEvents);
throw new ConcordApplicationException("Unexpected 'resume' events: " + unexpectedEvents, Status.BAD_REQUEST);
}
}

public void updateExclusive(DSLContext tx, ProcessKey processKey, ExclusiveMode exclusive) {
queueDao.updateExclusive(tx, processKey, exclusive);
}
Expand Down Expand Up @@ -419,6 +433,28 @@ public void auditLogOnCancelled(ProcessEntry p) {
.log();
}

private Set<String> getResumeEvents(ProcessKey processKey) {
String path = ProcessStateManager.path(Constants.Files.JOB_ATTACHMENTS_DIR_NAME,
Constants.Files.JOB_STATE_DIR_NAME,
Constants.Files.SUSPEND_MARKER_FILE_NAME);

return stateManager.get(processKey, path, ProcessManager::deserialize)
.orElse(Set.of());
}

private static Optional<Set<String>> deserialize(InputStream in) {
Set<String> result = new HashSet<>();
try (BufferedReader reader = new BufferedReader(new InputStreamReader(in))) {
String line;
while ((line = reader.readLine()) != null) {
result.add(line);
}
return Optional.of(result);
} catch (IOException e) {
throw new RuntimeException("Error while deserializing a resume events: " + e.getMessage(), e);
}
}

private static List<ProcessEntry> filterProcesses(List<ProcessEntry> l, List<ProcessStatus> expected) {
return l.stream()
.filter(r -> expected.contains(r.status()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,9 @@ public ResumeProcessResponse resume(@PathParam("id") UUID instanceId,
@QueryParam("saveAs") String saveAs,
Map<String, Object> req) {

PartialProcessKey processKey = PartialProcessKey.from(instanceId);
ProcessKey processKey = assertProcessKey(instanceId);

processManager.assertResumeEvents(processKey, Set.of(eventName));

if (saveAs != null && !saveAs.isEmpty() && req != null) {
req = ConfigurationUtils.toNested(saveAs, req);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,11 @@ public void process(Payload payload, Exception e) {
return;
}

logManager.error(processKey, "Process failed: {}", e.getMessage());

if (!(e instanceof InvalidProcessStateException)) {
logManager.error(processKey, "Process failed: {}", e.getMessage());
queueManager.updateStatus(processKey, ProcessStatus.FAILED);
} else {
logManager.warn(processKey, "Invalid process state: {}", e.getMessage());
}
}
}

0 comments on commit 7f8ae35

Please sign in to comment.