Skip to content

Commit 768d46a

Browse files
authored
Merge branch 'master' into brig/v2-log-yaml
2 parents 97e7cde + d1a81d1 commit 768d46a

File tree

7 files changed

+96
-74
lines changed

7 files changed

+96
-74
lines changed

server/db/src/main/resources/com/walmartlabs/concord/server/db/v2.10.0.xml

+6
Original file line numberDiff line numberDiff line change
@@ -119,4 +119,10 @@
119119
onDelete="SET NULL"/>
120120
</changeSet>
121121

122+
<changeSet id="210000" author="[email protected]" runInTransaction="false">
123+
<sql>
124+
create index concurrently IDX_WAIT_CONDITIONS on PROCESS_WAIT_CONDITIONS using gin (WAIT_CONDITIONS)
125+
</sql>
126+
</changeSet>
127+
122128
</databaseChangeLog>

server/impl/src/main/java/com/walmartlabs/concord/server/org/project/ProjectManager.java

+3
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,9 @@ private Map<String, ProcessDefinition> loadProcessDefinitions(UUID orgId, UUID p
213213

214214
Map<String, ProcessDefinition> result = new HashMap<>();
215215
for (Map.Entry<String, RepositoryEntry> e : projectEntry.getRepositories().entrySet()) {
216+
if (e.getValue().isDisabled()) {
217+
continue;
218+
}
216219
ProcessDefinition processDefinition = projectRepositoryManager.processDefinition(orgId, projectId, e.getValue());
217220
result.put(e.getKey(), processDefinition);
218221
}

server/impl/src/main/java/com/walmartlabs/concord/server/org/project/ProjectRepositoryManager.java

+1-10
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,6 @@
4141

4242
import javax.annotation.Nullable;
4343
import javax.inject.Inject;
44-
import javax.ws.rs.WebApplicationException;
4544
import javax.ws.rs.core.Response;
4645
import java.util.*;
4746

@@ -120,7 +119,7 @@ public void replace(DSLContext tx, UUID orgId, UUID projectId, Collection<Reposi
120119

121120
for (RepositoryEntry re : repos) {
122121
SecretEntry secret = assertSecret(orgId, re);
123-
insertOrUpdate(tx, projectId, null, re, secret, assertProcessDefinition(re.getName(), processDefinitions));
122+
insertOrUpdate(tx, projectId, null, re, secret, processDefinitions.get(re.getName()));
124123
}
125124
}
126125

@@ -209,14 +208,6 @@ private SecretEntry assertSecret(UUID orgId, RepositoryEntry entry) {
209208
return secretManager.assertAccess(orgId, entry.getSecretId(), entry.getSecretName(), ResourceAccessLevel.READER, false);
210209
}
211210

212-
private ProcessDefinition assertProcessDefinition(String name, Map<String, ProcessDefinition> processDefinitions) {
213-
ProcessDefinition result = processDefinitions.get(name);
214-
if (result != null) {
215-
return result;
216-
}
217-
throw new WebApplicationException("Process definition not found: " + name, Response.Status.INTERNAL_SERVER_ERROR);
218-
}
219-
220211
private static String trim(String s) {
221212
if (s == null) {
222213
return null;

server/impl/src/main/java/com/walmartlabs/concord/server/org/project/RepositoryEntry.java

+12
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,18 @@ public boolean isTriggersDisabled() {
175175
return triggersDisabled;
176176
}
177177

178+
public RepositoryEntry withBranch(String branch) {
179+
return new RepositoryEntry(id, projectId, name, url, branch, commitId, path, disabled, secretId, secretName, secretStoreType, meta, triggersDisabled);
180+
}
181+
182+
public RepositoryEntry withPath(String path) {
183+
return new RepositoryEntry(id, projectId, name, url, branch, commitId, path, disabled, secretId, secretName, secretStoreType, meta, triggersDisabled);
184+
}
185+
186+
public RepositoryEntry withDisabled(boolean disabled) {
187+
return new RepositoryEntry(id, projectId, name, url, branch, commitId, path, disabled, secretId, secretName, secretStoreType, meta, triggersDisabled);
188+
}
189+
178190
@Override
179191
public String toString() {
180192
return "RepositoryEntry{" +

server/impl/src/main/java/com/walmartlabs/concord/server/process/ProcessModule.java

+1
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@ public void configure(Binder binder) {
8484

8585
newSetBinder(binder, ProcessStatusListener.class).addBinding().to(WaitProcessStatusListener.class);
8686
newSetBinder(binder, ProcessStatusListener.class).addBinding().to(ExternalProcessListenerHandler.class);
87+
newSetBinder(binder, ProcessStatusListener.class).addBinding().to(WaitConditionUpdater.class);
8788

8889
newSetBinder(binder, Filter.class).addBinding().to(ConcurrentProcessFilter.class);
8990
newSetBinder(binder, Filter.class).addBinding().to(ExclusiveProcessFilter.class);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
package com.walmartlabs.concord.server.process.waits;
2+
3+
/*-
4+
* *****
5+
* Concord
6+
* -----
7+
* Copyright (C) 2017 - 2024 Walmart Inc.
8+
* -----
9+
* Licensed under the Apache License, Version 2.0 (the "License");
10+
* you may not use this file except in compliance with the License.
11+
* You may obtain a copy of the License at
12+
*
13+
* http://www.apache.org/licenses/LICENSE-2.0
14+
*
15+
* Unless required by applicable law or agreed to in writing, software
16+
* distributed under the License is distributed on an "AS IS" BASIS,
17+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18+
* See the License for the specific language governing permissions and
19+
* limitations under the License.
20+
* =====
21+
*/
22+
23+
import com.walmartlabs.concord.server.process.queue.ProcessStatusListener;
24+
import com.walmartlabs.concord.server.sdk.ProcessKey;
25+
import com.walmartlabs.concord.server.sdk.ProcessStatus;
26+
import org.jooq.DSLContext;
27+
28+
public class WaitConditionUpdater implements ProcessStatusListener {
29+
30+
@Override
31+
public void onStatusChange(DSLContext tx, ProcessKey processKey, ProcessStatus status) {
32+
switch (status) {
33+
case SUSPENDED:
34+
case FINISHED:
35+
case FAILED:
36+
case CANCELLED:
37+
case TIMED_OUT:
38+
updateWaitConditions(tx, processKey, status);
39+
break;
40+
default:
41+
// do nothing
42+
}
43+
}
44+
45+
private static void updateWaitConditions(DSLContext tx, ProcessKey processKey, ProcessStatus status) {
46+
String sql = """
47+
UPDATE process_wait_conditions
48+
SET wait_conditions =
49+
(SELECT jsonb_agg(
50+
CASE
51+
WHEN obj->>'type' = 'PROCESS_COMPLETION' and obj->>'completeCondition' = 'ALL'
52+
THEN jsonb_set(obj, '{processes}', (obj->'processes') - ?)
53+
WHEN obj->>'type' = 'PROCESS_COMPLETION' and obj->>'completeCondition' = 'ONE_OF' and obj->'processes' ?? ?
54+
THEN jsonb_set(obj, '{processes}', '[]')
55+
ELSE obj
56+
END
57+
)
58+
FROM jsonb_array_elements(wait_conditions) obj),
59+
version = version + 1
60+
WHERE wait_conditions @> ?::jsonb;
61+
""";
62+
63+
String jsonMatch = String.format("[{\"type\": \"PROCESS_COMPLETION\", \"finalStatuses\": [\"%s\"], \"processes\": [\"%s\"]}]", status.name(), processKey.getInstanceId().toString());
64+
65+
tx.execute(sql, processKey.getInstanceId().toString(), processKey.getInstanceId().toString(), jsonMatch);
66+
}
67+
}

server/impl/src/main/java/com/walmartlabs/concord/server/process/waits/WaitProcessFinishHandler.java

+6-64
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,7 @@
2020
* =====
2121
*/
2222

23-
import com.walmartlabs.concord.db.AbstractDao;
2423
import com.walmartlabs.concord.db.MainDB;
25-
import com.walmartlabs.concord.server.jooq.tables.ProcessQueue;
2624
import com.walmartlabs.concord.server.process.queue.ProcessQueueManager;
2725
import com.walmartlabs.concord.server.sdk.ProcessKey;
2826
import com.walmartlabs.concord.server.sdk.ProcessStatus;
@@ -31,26 +29,19 @@
3129

3230
import javax.inject.Inject;
3331
import javax.inject.Singleton;
34-
import java.util.ArrayList;
35-
import java.util.List;
3632
import java.util.Set;
3733
import java.util.UUID;
3834

39-
import static com.walmartlabs.concord.server.jooq.tables.ProcessQueue.PROCESS_QUEUE;
40-
import static com.walmartlabs.concord.server.process.waits.ProcessCompletionCondition.CompleteCondition;
41-
4235
/**
4336
* Handles the processes that are waiting for other processes to finish.
4437
*/
4538
@Singleton
4639
public class WaitProcessFinishHandler implements ProcessWaitHandler<ProcessCompletionCondition> {
4740

48-
private final Dao dao;
4941
private final ProcessQueueManager processQueueManager;
5042

5143
@Inject
52-
public WaitProcessFinishHandler(@MainDB Configuration dbCfg, ProcessQueueManager processQueueManager) {
53-
this.dao = new Dao(dbCfg);
44+
public WaitProcessFinishHandler(ProcessQueueManager processQueueManager) {
5445
this.processQueueManager = processQueueManager;
5546
}
5647

@@ -62,64 +53,15 @@ public WaitType getType() {
6253
@Override
6354
@WithTimer
6455
public Result<ProcessCompletionCondition> process(ProcessKey processKey, ProcessCompletionCondition wait) {
65-
Set<ProcessStatus> finishedStatuses = wait.finalStatuses();
6656
Set<UUID> awaitProcesses = wait.processes();
67-
if (awaitProcesses.isEmpty()) {
68-
return Result.resume(wait.resumeEvent());
69-
}
70-
71-
Set<UUID> finishedProcesses = dao.findFinished(awaitProcesses, finishedStatuses);
72-
if (finishedProcesses.isEmpty()) {
57+
if (!awaitProcesses.isEmpty()) {
7358
return Result.of(wait);
7459
}
7560

76-
boolean completed = isCompleted(wait.completeCondition(), awaitProcesses, finishedProcesses);
77-
if (completed) {
78-
if (wait.resumeEvent() != null) {
79-
return Result.resume(wait.resumeEvent());
80-
} else {
81-
return Result.action(tx -> processQueueManager.updateExpectedStatus(tx, processKey, ProcessStatus.WAITING, ProcessStatus.ENQUEUED));
82-
}
83-
}
84-
85-
List<UUID> processes = new ArrayList<>(awaitProcesses);
86-
processes.removeAll(finishedProcesses);
87-
88-
return Result.of(
89-
ProcessCompletionCondition.builder().from(wait)
90-
.processes(processes)
91-
.reason(wait.reason())
92-
.build());
93-
}
94-
95-
private static boolean isCompleted(CompleteCondition condition, Set<UUID> awaitProcesses, Set<UUID> finishedProcesses) {
96-
switch (condition) {
97-
case ALL: {
98-
return awaitProcesses.size() == finishedProcesses.size();
99-
}
100-
case ONE_OF: {
101-
return !finishedProcesses.isEmpty();
102-
}
103-
default:
104-
throw new IllegalArgumentException("Unknown condition type: " + condition);
105-
}
106-
}
107-
108-
private static final class Dao extends AbstractDao {
109-
110-
private Dao(@MainDB Configuration cfg) {
111-
super(cfg);
112-
}
113-
114-
public Set<UUID> findFinished(Set<UUID> awaitProcesses, Set<ProcessStatus> finishedStatuses) {
115-
return txResult(tx -> {
116-
ProcessQueue q = PROCESS_QUEUE.as("q");
117-
return tx.select(q.INSTANCE_ID)
118-
.from(q)
119-
.where(q.INSTANCE_ID.in(awaitProcesses)
120-
.and(q.CURRENT_STATUS.in(finishedStatuses)))
121-
.fetchSet(q.INSTANCE_ID);
122-
});
61+
if (wait.resumeEvent() != null) {
62+
return Result.resume(wait.resumeEvent());
63+
} else {
64+
return Result.action(tx -> processQueueManager.updateExpectedStatus(tx, processKey, ProcessStatus.WAITING, ProcessStatus.ENQUEUED));
12365
}
12466
}
12567
}

0 commit comments

Comments
 (0)