Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

Commit

Permalink
Merge pull request #3670 from alexmay48/alexmay48/new-workflow-metada…
Browse files Browse the repository at this point in the history
…ta-endpoint

New workflow metadata endpoint
  • Loading branch information
v1r3n authored Jul 8, 2023
2 parents 690968b + ebf3919 commit f013a53
Show file tree
Hide file tree
Showing 13 changed files with 213 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,11 @@ public List<WorkflowDef> getAllWorkflowDefs() {
return cassandraMetadataDAO.getAllWorkflowDefs();
}

@Override
public List<WorkflowDef> getAllWorkflowDefsLatestVersions() {
return cassandraMetadataDAO.getAllWorkflowDefsLatestVersions();
}

private List<TaskDef> refreshTaskDefsCache() {
try {
Cache taskDefsCache = cacheManager.getCache(TASK_DEF_CACHE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,12 @@

import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.PriorityQueue;
import java.util.stream.Collectors;

import org.apache.commons.lang3.tuple.ImmutablePair;
Expand Down Expand Up @@ -60,8 +63,10 @@ public class CassandraMetadataDAO extends CassandraBaseDAO implements MetadataDA
private final PreparedStatement insertTaskDefStatement;

private final PreparedStatement selectWorkflowDefStatement;

private final PreparedStatement selectAllWorkflowDefVersionsByNameStatement;
private final PreparedStatement selectAllWorkflowDefsStatement;
private final PreparedStatement selectAllWorkflowDefsLatestVersionsStatement;
private final PreparedStatement selectTaskDefStatement;
private final PreparedStatement selectAllTaskDefsStatement;

Expand Down Expand Up @@ -97,6 +102,9 @@ public CassandraMetadataDAO(
this.selectAllWorkflowDefsStatement =
session.prepare(statements.getSelectAllWorkflowDefsStatement())
.setConsistencyLevel(properties.getReadConsistencyLevel());
this.selectAllWorkflowDefsLatestVersionsStatement =
session.prepare(statements.getSelectAllWorkflowDefsLatestVersionsStatement())
.setConsistencyLevel(properties.getReadConsistencyLevel());
this.selectTaskDefStatement =
session.prepare(statements.getSelectTaskDefStatement())
.setConsistencyLevel(properties.getReadConsistencyLevel());
Expand Down Expand Up @@ -289,6 +297,48 @@ public List<WorkflowDef> getAllWorkflowDefs() {
}
}

@Override
public List<WorkflowDef> getAllWorkflowDefsLatestVersions() {
try {
ResultSet resultSet =
session.execute(
selectAllWorkflowDefsLatestVersionsStatement.bind(
WORKFLOW_DEF_INDEX_KEY));
List<Row> rows = resultSet.all();
if (rows.size() == 0) {
LOGGER.info("No workflow definitions were found.");
return Collections.EMPTY_LIST;
}
Map<String, PriorityQueue<WorkflowDef>> allWorkflowDefs = new HashMap<>();

for (Row row : rows) {
String defNameVersion = row.getString(WORKFLOW_DEF_NAME_VERSION_KEY);
var nameVersion = getWorkflowNameAndVersion(defNameVersion);
WorkflowDef def =
getWorkflowDef(nameVersion.getLeft(), nameVersion.getRight()).orElse(null);
if (def == null) {
continue;
}
if (allWorkflowDefs.get(def.getName()) == null) {
allWorkflowDefs.put(
def.getName(),
new PriorityQueue<>(
(WorkflowDef w1, WorkflowDef w2) ->
Integer.compare(w2.getVersion(), w1.getVersion())));
}
allWorkflowDefs.get(def.getName()).add(def);
}
return allWorkflowDefs.values().stream()
.map(PriorityQueue::poll)
.collect(Collectors.toList());
} catch (DriverException e) {
Monitors.error(CLASS_NAME, "getAllWorkflowDefsLatestVersions");
String errorMsg = "Error retrieving all workflow defs latest versions";
LOGGER.error(errorMsg, e);
throw new TransientException(errorMsg, e);
}
}

private TaskDef getTaskDefFromDB(String name) {
try {
ResultSet resultSet = session.execute(selectTaskDefStatement.bind(name));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,14 @@ public String getSelectAllWorkflowDefsStatement() {
.getQueryString();
}

public String getSelectAllWorkflowDefsLatestVersionsStatement() {
return QueryBuilder.select()
.all()
.from(keyspace, TABLE_WORKFLOW_DEFS_INDEX)
.where(eq(WORKFLOW_DEF_INDEX_KEY, bindMarker()))
.getQueryString();
}

/**
* @return cql query statement to fetch a task definition by name from the "task_definitions"
* table
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,36 @@ class CassandraMetadataDAOSpec extends CassandraSpec {

}

def "Get All WorkflowDef"() {
when:
metadataDAO.removeWorkflowDef("workflow_def_1", 1)
WorkflowDef workflowDef = new WorkflowDef()
workflowDef.setName("workflow_def_1")
workflowDef.setVersion(1)
workflowDef.setOwnerEmail("[email protected]")
metadataDAO.createWorkflowDef(workflowDef)

workflowDef.setName("workflow_def_2")
metadataDAO.createWorkflowDef(workflowDef)
workflowDef.setVersion(2)
metadataDAO.createWorkflowDef(workflowDef)

workflowDef.setName("workflow_def_3")
workflowDef.setVersion(1)
metadataDAO.createWorkflowDef(workflowDef)
workflowDef.setVersion(2)
metadataDAO.createWorkflowDef(workflowDef)
workflowDef.setVersion(3)
metadataDAO.createWorkflowDef(workflowDef)

then: // fetch the workflow definition
def allDefsLatestVersions = metadataDAO.getAllWorkflowDefsLatestVersions()
Map<String, WorkflowDef> allDefsMap = allDefsLatestVersions.collectEntries {wfDef -> [wfDef.getName(), wfDef]}
allDefsMap.get("workflow_def_1").getVersion() == 1
allDefsMap.get("workflow_def_2").getVersion() == 2
allDefsMap.get("workflow_def_3").getVersion() == 3
}

def "parse index string"() {
expect:
def pair = metadataDAO.getWorkflowNameAndVersion(nameVersionStr)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,16 @@
import com.netflix.conductor.common.metadata.workflow.WorkflowDef;

import com.sun.jersey.api.client.ClientHandler;
import com.sun.jersey.api.client.GenericType;
import com.sun.jersey.api.client.config.ClientConfig;
import com.sun.jersey.api.client.config.DefaultClientConfig;
import com.sun.jersey.api.client.filter.ClientFilter;

public class MetadataClient extends ClientBase {

private static final GenericType<List<WorkflowDef>> workflowDefList =
new GenericType<List<WorkflowDef>>() {};

/** Creates a default metadata client */
public MetadataClient() {
this(new DefaultClientConfig(), new DefaultConductorClientConfiguration(), null);
Expand Down Expand Up @@ -122,6 +126,12 @@ public WorkflowDef getWorkflowDef(String name, Integer version) {
name);
}

/** */
public List<WorkflowDef> getAllWorkflowsWithLatestVersions() {
return getForEntity(
"metadata/workflow/latest-versions", null, workflowDefList, (Object) null);
}

/**
* Removes the workflow definition of a workflow from the conductor server. It does not remove
* associated workflows. Use with caution.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@
package com.netflix.conductor.client.http

import com.netflix.conductor.client.exception.ConductorClientException
import com.netflix.conductor.common.metadata.workflow.WorkflowDef

import com.sun.jersey.api.client.ClientResponse
import spock.lang.Subject

class MetadataClientSpec extends ClientSpecification {
Expand Down Expand Up @@ -75,4 +77,18 @@ class MetadataClientSpec extends ClientSpecification {
then:
thrown(IllegalArgumentException.class)
}

def "workflow get all definitions latest version"() {
given:
List<WorkflowDef> result = new ArrayList<WorkflowDef>()
URI uri = createURI("metadata/workflow/latest-versions")

when:
metadataClient.getAllWorkflowsWithLatestVersions()

then:
1 * requestHandler.get(uri) >> Mock(ClientResponse.class) {
getEntity(_) >> result
}
}
}
5 changes: 5 additions & 0 deletions core/src/main/java/com/netflix/conductor/dao/MetadataDAO.java
Original file line number Diff line number Diff line change
Expand Up @@ -81,4 +81,9 @@ public interface MetadataDAO {
* @return List of all the workflow definitions
*/
List<WorkflowDef> getAllWorkflowDefs();

/**
* @return List the latest versions of the workflow definitions
*/
List<WorkflowDef> getAllWorkflowDefsLatestVersions();
}
Original file line number Diff line number Diff line change
Expand Up @@ -154,4 +154,6 @@ void removeEventHandlerStatus(
List<EventHandler> getEventHandlersForEvent(
@NotEmpty(message = "EventName cannot be null or empty") String event,
boolean activeOnly);

List<WorkflowDef> getWorkflowDefsLatestVersions();
}
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,11 @@ public List<EventHandler> getEventHandlersForEvent(String event, boolean activeO
return eventHandlerDAO.getEventHandlersForEvent(event, activeOnly);
}

@Override
public List<WorkflowDef> getWorkflowDefsLatestVersions() {
return metadataDAO.getAllWorkflowDefsLatestVersions();
}

public Map<String, ? extends Iterable<WorkflowDefSummary>> getWorkflowNamesAndVersions() {
List<WorkflowDef> workflowDefs = metadataDAO.getAllWorkflowDefs();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,27 @@ public List<WorkflowDef> getAllWorkflowDefs() {
return workflows;
}

@Override
public List<WorkflowDef> getAllWorkflowDefsLatestVersions() {
List<WorkflowDef> workflows = new LinkedList<>();

// Get all definitions latest versions from WORKFLOW_DEF_NAMES
recordRedisDaoRequests("getAllWorkflowLatestVersionsDefs");
Set<String> wfNames = jedisProxy.smembers(nsKey(WORKFLOW_DEF_NAMES));
int size = 0;
// Place all workflows into the Priority Queue. The PQ will allow us to grab the latest
// version of the workflows.
for (String wfName : wfNames) {
WorkflowDef def = getLatestWorkflowDef(wfName).orElse(null);
if (def != null) {
workflows.add(def);
size += def.toString().length();
}
}
recordRedisDaoPayloadSize("getAllWorkflowLatestVersionsDefs", size, "n/a", "n/a");
return workflows;
}

private void _createOrUpdate(WorkflowDef workflowDef) {
// First set the workflow def
jedisProxy.hset(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.function.Function;
import java.util.stream.Collectors;

import org.junit.Before;
Expand Down Expand Up @@ -160,6 +162,45 @@ public void testWorkflowDefOperations() {
assertEquals(workflow.getVersion(), 3);
}

@Test
public void testGetAllWorkflowDefsLatestVersions() {
WorkflowDef def = new WorkflowDef();
def.setName("test1");
def.setVersion(1);
def.setDescription("description");
def.setCreatedBy("unit_test");
def.setCreateTime(1L);
def.setOwnerApp("ownerApp");
def.setUpdatedBy("unit_test2");
def.setUpdateTime(2L);
redisMetadataDAO.createWorkflowDef(def);

def.setName("test2");
redisMetadataDAO.createWorkflowDef(def);
def.setVersion(2);
redisMetadataDAO.createWorkflowDef(def);

def.setName("test3");
def.setVersion(1);
redisMetadataDAO.createWorkflowDef(def);
def.setVersion(2);
redisMetadataDAO.createWorkflowDef(def);
def.setVersion(3);
redisMetadataDAO.createWorkflowDef(def);

// Placed the values in a map because they might not be stored in order of defName.
// To test, needed to confirm that the versions are correct for the definitions.
Map<String, WorkflowDef> allMap =
redisMetadataDAO.getAllWorkflowDefsLatestVersions().stream()
.collect(Collectors.toMap(WorkflowDef::getName, Function.identity()));

assertNotNull(allMap);
assertEquals(3, allMap.size());
assertEquals(1, allMap.get("test1").getVersion());
assertEquals(2, allMap.get("test2").getVersion());
assertEquals(3, allMap.get("test3").getVersion());
}

@Test(expected = NotFoundException.class)
public void removeInvalidWorkflowDef() {
redisMetadataDAO.removeWorkflowDef("hello", 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,12 @@ public List<WorkflowDef> getAll() {
return metadataService.getWorkflowNamesAndVersions();
}

@Operation(summary = "Returns only the latest version of all workflow definitions")
@GetMapping("/workflow/latest-versions")
public List<WorkflowDef> getAllWorkflowsWithLatestVersions() {
return metadataService.getWorkflowDefsLatestVersions();
}

@DeleteMapping("/workflow/{name}/{version}")
@Operation(
summary =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,20 @@ public void testGetAllWorkflowDef() {
assertEquals(listOfWorkflowDef, metadataResource.getAll());
}

@Test
public void testGetAllWorkflowDefLatestVersions() {
WorkflowDef workflowDef = new WorkflowDef();
workflowDef.setName("test");
workflowDef.setVersion(1);
workflowDef.setDescription("test");

List<WorkflowDef> listOfWorkflowDef = new ArrayList<>();
listOfWorkflowDef.add(workflowDef);

when(mockMetadataService.getWorkflowDefsLatestVersions()).thenReturn(listOfWorkflowDef);
assertEquals(listOfWorkflowDef, metadataResource.getAllWorkflowsWithLatestVersions());
}

@Test
public void testUnregisterWorkflowDef() throws Exception {
metadataResource.unregisterWorkflowDef("test", 1);
Expand Down

0 comments on commit f013a53

Please sign in to comment.