Skip to content

Commit

Permalink
Add option to only index workflows on Postgres when their status chan…
Browse files Browse the repository at this point in the history
…ges (#83)
  • Loading branch information
bjpirt authored Mar 5, 2024
1 parent ce21854 commit e195ef6
Show file tree
Hide file tree
Showing 3 changed files with 228 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ public class PostgresProperties {

private Integer experimentalQueueNotifyStalePeriod = 5000;

private boolean onlyIndexOnStatusChange = false;

public String schema = "public";

public boolean allowFullTextQueries = true;
Expand Down Expand Up @@ -73,6 +75,14 @@ public void setTaskDefCacheRefreshInterval(Duration taskDefCacheRefreshInterval)
this.taskDefCacheRefreshInterval = taskDefCacheRefreshInterval;
}

public boolean getOnlyIndexOnStatusChange() {
return onlyIndexOnStatusChange;
}

public void setOnlyIndexOnStatusChange(boolean onlyIndexOnStatusChange) {
this.onlyIndexOnStatusChange = onlyIndexOnStatusChange;
}

public Integer getDeadlockRetryMax() {
return deadlockRetryMax;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,16 @@ public class PostgresIndexDAO extends PostgresBaseDAO implements IndexDAO {
private static final int CORE_POOL_SIZE = 6;
private static final long KEEP_ALIVE_TIME = 1L;

private boolean onlyIndexOnStatusChange;

public PostgresIndexDAO(
RetryTemplate retryTemplate,
ObjectMapper objectMapper,
DataSource dataSource,
PostgresProperties properties) {
super(retryTemplate, objectMapper, dataSource);
this.properties = properties;
this.onlyIndexOnStatusChange = properties.getOnlyIndexOnStatusChange();

int maximumPoolSize = properties.getAsyncMaxPoolSize();
int workerQueueSize = properties.getAsyncWorkerQueueSize();
Expand Down Expand Up @@ -84,19 +87,25 @@ public void indexWorkflow(WorkflowSummary workflow) {
+ "DO UPDATE SET correlation_id = EXCLUDED.correlation_id, workflow_type = EXCLUDED.workflow_type, "
+ "start_time = EXCLUDED.start_time, status = EXCLUDED.status, json_data = EXCLUDED.json_data";

if (onlyIndexOnStatusChange) {
INSERT_WORKFLOW_INDEX_SQL += " WHERE workflow_index.status != EXCLUDED.status";
}

TemporalAccessor ta = DateTimeFormatter.ISO_INSTANT.parse(workflow.getStartTime());
Timestamp startTime = Timestamp.from(Instant.from(ta));

queryWithTransaction(
INSERT_WORKFLOW_INDEX_SQL,
q ->
q.addParameter(workflow.getWorkflowId())
.addParameter(workflow.getCorrelationId())
.addParameter(workflow.getWorkflowType())
.addParameter(startTime)
.addParameter(workflow.getStatus().toString())
.addJsonParameter(workflow)
.executeUpdate());
int rowsUpdated =
queryWithTransaction(
INSERT_WORKFLOW_INDEX_SQL,
q ->
q.addParameter(workflow.getWorkflowId())
.addParameter(workflow.getCorrelationId())
.addParameter(workflow.getWorkflowType())
.addParameter(startTime)
.addParameter(workflow.getStatus().toString())
.addJsonParameter(workflow)
.executeUpdate());
logger.debug("Postgres index workflow rows updated: {}", rowsUpdated);
}

@Override
Expand Down Expand Up @@ -128,24 +137,30 @@ public void indexTask(TaskSummary task) {
+ "DO UPDATE SET task_type = EXCLUDED.task_type, task_def_name = EXCLUDED.task_def_name, "
+ "status = EXCLUDED.status, update_time = EXCLUDED.update_time, json_data = EXCLUDED.json_data";

if (onlyIndexOnStatusChange) {
INSERT_TASK_INDEX_SQL += " WHERE task_index.status != EXCLUDED.status";
}

TemporalAccessor updateTa = DateTimeFormatter.ISO_INSTANT.parse(task.getUpdateTime());
Timestamp updateTime = Timestamp.from(Instant.from(updateTa));

TemporalAccessor startTa = DateTimeFormatter.ISO_INSTANT.parse(task.getStartTime());
Timestamp startTime = Timestamp.from(Instant.from(startTa));

queryWithTransaction(
INSERT_TASK_INDEX_SQL,
q ->
q.addParameter(task.getTaskId())
.addParameter(task.getTaskType())
.addParameter(task.getTaskDefName())
.addParameter(task.getStatus().toString())
.addParameter(startTime)
.addParameter(updateTime)
.addParameter(task.getWorkflowType())
.addJsonParameter(task)
.executeUpdate());
int rowsUpdated =
queryWithTransaction(
INSERT_TASK_INDEX_SQL,
q ->
q.addParameter(task.getTaskId())
.addParameter(task.getTaskType())
.addParameter(task.getTaskDefName())
.addParameter(task.getStatus().toString())
.addParameter(startTime)
.addParameter(updateTime)
.addParameter(task.getWorkflowType())
.addJsonParameter(task)
.executeUpdate());
logger.debug("Postgres index task rows updated: {}", rowsUpdated);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
/*
* Copyright 2023 Conductor Authors.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package com.netflix.conductor.postgres.dao;

import java.sql.Connection;
import java.sql.SQLException;
import java.util.*;

import javax.sql.DataSource;

import org.flywaydb.core.Flyway;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.flyway.FlywayAutoConfiguration;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.TestPropertySource;
import org.springframework.test.context.junit4.SpringRunner;

import com.netflix.conductor.common.config.TestObjectMapperConfiguration;
import com.netflix.conductor.common.metadata.tasks.Task;
import com.netflix.conductor.common.run.TaskSummary;
import com.netflix.conductor.common.run.Workflow;
import com.netflix.conductor.common.run.WorkflowSummary;
import com.netflix.conductor.postgres.config.PostgresConfiguration;
import com.netflix.conductor.postgres.util.Query;

import com.fasterxml.jackson.databind.ObjectMapper;

import static org.junit.Assert.assertEquals;

@ContextConfiguration(
classes = {
TestObjectMapperConfiguration.class,
PostgresConfiguration.class,
FlywayAutoConfiguration.class
})
@RunWith(SpringRunner.class)
@TestPropertySource(
properties = {
"conductor.app.asyncIndexingEnabled=false",
"conductor.elasticsearch.version=0",
"conductor.indexing.type=postgres",
"conductor.postgres.onlyIndexOnStatusChange=true",
"spring.flyway.clean-disabled=false"
})
@SpringBootTest
public class PostgresIndexDAOStatusChangeOnlyTest {

@Autowired private PostgresIndexDAO indexDAO;

@Autowired private ObjectMapper objectMapper;

@Qualifier("dataSource")
@Autowired
private DataSource dataSource;

@Autowired Flyway flyway;

// clean the database between tests.
@Before
public void before() {
flyway.migrate();
}

private WorkflowSummary getMockWorkflowSummary(String id) {
WorkflowSummary wfs = new WorkflowSummary();
wfs.setWorkflowId(id);
wfs.setCorrelationId("correlation-id");
wfs.setWorkflowType("workflow-type");
wfs.setStartTime("2023-02-07T08:42:45Z");
wfs.setStatus(Workflow.WorkflowStatus.RUNNING);
return wfs;
}

private TaskSummary getMockTaskSummary(String taskId) {
TaskSummary ts = new TaskSummary();
ts.setTaskId(taskId);
ts.setTaskType("task-type");
ts.setTaskDefName("task-def-name");
ts.setStatus(Task.Status.SCHEDULED);
ts.setStartTime("2023-02-07T09:41:45Z");
ts.setUpdateTime("2023-02-07T09:42:45Z");
ts.setWorkflowType("workflow-type");
return ts;
}

private List<Map<String, Object>> queryDb(String query) throws SQLException {
try (Connection c = dataSource.getConnection()) {
try (Query q = new Query(objectMapper, c, query)) {
return q.executeAndFetchMap();
}
}
}

public void checkWorkflow(String workflowId, String status, String correlationId)
throws SQLException {
List<Map<String, Object>> result =
queryDb(
String.format(
"SELECT * FROM workflow_index WHERE workflow_id = '%s'",
workflowId));
assertEquals("Wrong number of rows returned", 1, result.size());
assertEquals("Wrong status returned", status, result.get(0).get("status"));
assertEquals(
"Correlation id does not match",
correlationId,
result.get(0).get("correlation_id"));
}

public void checkTask(String taskId, String status, String updateTime) throws SQLException {
List<Map<String, Object>> result =
queryDb(String.format("SELECT * FROM task_index WHERE task_id = '%s'", taskId));
assertEquals("Wrong number of rows returned", 1, result.size());
assertEquals("Wrong status returned", status, result.get(0).get("status"));
assertEquals(
"Update time does not match",
updateTime,
result.get(0).get("update_time").toString());
}

@Test
public void testIndexWorkflowOnlyStatusChange() throws SQLException {
WorkflowSummary wfs = getMockWorkflowSummary("workflow-id");
indexDAO.indexWorkflow(wfs);

// retrieve the record, make sure it exists
checkWorkflow("workflow-id", "RUNNING", "correlation-id");

// Change the record, but not the status, and re-index
wfs.setCorrelationId("new-correlation-id");
indexDAO.indexWorkflow(wfs);

// retrieve the record, make sure it hasn't changed
checkWorkflow("workflow-id", "RUNNING", "correlation-id");

// Change the status and re-index
wfs.setStatus(Workflow.WorkflowStatus.FAILED);
indexDAO.indexWorkflow(wfs);

// retrieve the record, make sure it has changed
checkWorkflow("workflow-id", "FAILED", "new-correlation-id");
}

@Test
public void testIndexTaskOnlyStatusChange() throws SQLException {
TaskSummary ts = getMockTaskSummary("task-id");

indexDAO.indexTask(ts);

// retrieve the record, make sure it exists
checkTask("task-id", "SCHEDULED", "2023-02-07 09:42:45.0");

// Change the record, but not the status
ts.setUpdateTime("2023-02-07T10:42:45Z");
indexDAO.indexTask(ts);

// retrieve the record, make sure it hasn't changed
checkTask("task-id", "SCHEDULED", "2023-02-07 09:42:45.0");

// Change the status and re-index
ts.setStatus(Task.Status.FAILED);
indexDAO.indexTask(ts);

// retrieve the record, make sure it has changed
checkTask("task-id", "FAILED", "2023-02-07 10:42:45.0");
}
}

0 comments on commit e195ef6

Please sign in to comment.