Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

Commit

Permalink
Object model separation (#2702)
Browse files Browse the repository at this point in the history
* introduce a domain specific object model

* service layer wip

* core and contribs wip

* remove repeated external storage access in state machine

* cassandra module changes

* redis persistence changes

* mysql persistence changes

* postgres persistence changes

* redis concurrency limit module changes

* updated IndexDAO APIs and corresponding ES implementations

* more changes per indexDAO api

* fix tests in es modules due to new contract

* fix unit tests as per new models

* rename objects and classes

* fix integ tests
  • Loading branch information
apanicker-nflx authored Jan 31, 2022
1 parent ac678a1 commit cd528e1
Show file tree
Hide file tree
Showing 279 changed files with 5,165 additions and 4,032 deletions.
11 changes: 11 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -184,3 +184,14 @@ configure(allprojects - project(':conductor-grpc')) {
}
}
}

["cassandra-persistence", "core", "redis-concurrency-limit", "test-harness"].each {
configure(project(":conductor-$it")) {
spotless {
groovy {
importOrder('java', 'javax', 'org', 'com.netflix', '', '\\#com.netflix', '\\#')
licenseHeaderFile("$rootDir/licenseheader.txt")
}
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020 Netflix, Inc.
* Copyright 2022 Netflix, Inc.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2021 Netflix, Inc.
* Copyright 2022 Netflix, Inc.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020 Netflix, Inc.
* Copyright 2022 Netflix, Inc.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020 Netflix, Inc.
* Copyright 2022 Netflix, Inc.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020 Netflix, Inc.
* Copyright 2022 Netflix, Inc.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
Expand Down Expand Up @@ -27,15 +27,15 @@
import com.netflix.conductor.cassandra.config.CassandraProperties;
import com.netflix.conductor.cassandra.util.Statements;
import com.netflix.conductor.common.metadata.events.EventExecution;
import com.netflix.conductor.common.metadata.tasks.Task;
import com.netflix.conductor.common.metadata.tasks.TaskDef;
import com.netflix.conductor.common.run.Workflow;
import com.netflix.conductor.common.utils.RetryUtil;
import com.netflix.conductor.core.exception.ApplicationException;
import com.netflix.conductor.core.exception.ApplicationException.Code;
import com.netflix.conductor.dao.ConcurrentExecutionLimitDAO;
import com.netflix.conductor.dao.ExecutionDAO;
import com.netflix.conductor.metrics.Monitors;
import com.netflix.conductor.model.TaskModel;
import com.netflix.conductor.model.WorkflowModel;

import com.datastax.driver.core.BatchStatement;
import com.datastax.driver.core.PreparedStatement;
Expand All @@ -56,7 +56,6 @@
import static com.netflix.conductor.cassandra.util.Constants.TOTAL_PARTITIONS_KEY;
import static com.netflix.conductor.cassandra.util.Constants.TOTAL_TASKS_KEY;
import static com.netflix.conductor.cassandra.util.Constants.WORKFLOW_ID_KEY;
import static com.netflix.conductor.common.metadata.tasks.Task.Status.IN_PROGRESS;

@Trace
public class CassandraExecutionDAO extends CassandraBaseDAO
Expand Down Expand Up @@ -172,11 +171,11 @@ public CassandraExecutionDAO(
}

@Override
public List<Task> getPendingTasksByWorkflow(String taskName, String workflowId) {
List<Task> tasks = getTasksForWorkflow(workflowId);
public List<TaskModel> getPendingTasksByWorkflow(String taskName, String workflowId) {
List<TaskModel> tasks = getTasksForWorkflow(workflowId);
return tasks.stream()
.filter(task -> taskName.equals(task.getTaskType()))
.filter(task -> IN_PROGRESS.equals(task.getStatus()))
.filter(task -> TaskModel.Status.IN_PROGRESS.equals(task.getStatus()))
.collect(Collectors.toList());
}

Expand All @@ -185,7 +184,7 @@ public List<Task> getPendingTasksByWorkflow(String taskName, String workflowId)
* Conductor
*/
@Override
public List<Task> getTasks(String taskType, String startKey, int count) {
public List<TaskModel> getTasks(String taskType, String startKey, int count) {
throw new UnsupportedOperationException(
"This method is not implemented in CassandraExecutionDAO. Please use ExecutionDAOFacade instead.");
}
Expand All @@ -198,7 +197,7 @@ public List<Task> getTasks(String taskType, String startKey, int count) {
* @param tasks tasks to be created
*/
@Override
public List<Task> createTasks(List<Task> tasks) {
public List<TaskModel> createTasks(List<TaskModel> tasks) {
validateTasks(tasks);
String workflowId = tasks.get(0).getWorkflowInstanceId();
try {
Expand Down Expand Up @@ -259,7 +258,7 @@ public List<Task> createTasks(List<Task> tasks) {
}

@Override
public void updateTask(Task task) {
public void updateTask(TaskModel task) {
try {
// TODO: calculate the shard number the task belongs to
String taskPayload = toJson(task);
Expand All @@ -276,7 +275,7 @@ public void updateTask(Task task) {
&& task.getTaskDefinition().get().concurrencyLimit() > 0) {
if (task.getStatus().isTerminal()) {
removeTaskFromLimit(task);
} else if (task.getStatus() == IN_PROGRESS) {
} else if (task.getStatus() == TaskModel.Status.IN_PROGRESS) {
addTaskToLimit(task);
}
}
Expand All @@ -296,7 +295,7 @@ public void updateTask(Task task) {
* Conductor
*/
@Override
public boolean exceedsLimit(Task task) {
public boolean exceedsLimit(TaskModel task) {
Optional<TaskDef> taskDefinition = task.getTaskDefinition();
if (taskDefinition.isEmpty()) {
return false;
Expand Down Expand Up @@ -342,7 +341,7 @@ public boolean exceedsLimit(Task task) {

@Override
public boolean removeTask(String taskId) {
Task task = getTask(taskId);
TaskModel task = getTask(taskId);
if (task == null) {
LOGGER.warn("No such task found by id {}", taskId);
return false;
Expand All @@ -351,7 +350,7 @@ public boolean removeTask(String taskId) {
}

@Override
public Task getTask(String taskId) {
public TaskModel getTask(String taskId) {
try {
String workflowId = lookupWorkflowIdFromTaskId(taskId);
if (workflowId == null) {
Expand All @@ -366,7 +365,8 @@ public Task getTask(String taskId) {
return Optional.ofNullable(resultSet.one())
.map(
row -> {
Task task = readValue(row.getString(PAYLOAD_KEY), Task.class);
TaskModel task =
readValue(row.getString(PAYLOAD_KEY), TaskModel.class);
recordCassandraDaoRequests(
"getTask", task.getTaskType(), task.getWorkflowType());
recordCassandraDaoPayloadSize(
Expand All @@ -388,7 +388,7 @@ public Task getTask(String taskId) {
}

@Override
public List<Task> getTasks(List<String> taskIds) {
public List<TaskModel> getTasks(List<String> taskIds) {
Preconditions.checkNotNull(taskIds);
Preconditions.checkArgument(taskIds.size() > 0, "Task ids list cannot be empty");
String workflowId = lookupWorkflowIdFromTaskId(taskIds.get(0));
Expand All @@ -405,20 +405,20 @@ public List<Task> getTasks(List<String> taskIds) {
* Conductor
*/
@Override
public List<Task> getPendingTasksForTaskType(String taskType) {
public List<TaskModel> getPendingTasksForTaskType(String taskType) {
throw new UnsupportedOperationException(
"This method is not implemented in CassandraExecutionDAO. Please use ExecutionDAOFacade instead.");
}

@Override
public List<Task> getTasksForWorkflow(String workflowId) {
public List<TaskModel> getTasksForWorkflow(String workflowId) {
return getWorkflow(workflowId, true).getTasks();
}

@Override
public String createWorkflow(Workflow workflow) {
public String createWorkflow(WorkflowModel workflow) {
try {
List<Task> tasks = workflow.getTasks();
List<TaskModel> tasks = workflow.getTasks();
workflow.setTasks(new LinkedList<>());
String payload = toJson(workflow);

Expand All @@ -441,9 +441,9 @@ public String createWorkflow(Workflow workflow) {
}

@Override
public String updateWorkflow(Workflow workflow) {
public String updateWorkflow(WorkflowModel workflow) {
try {
List<Task> tasks = workflow.getTasks();
List<TaskModel> tasks = workflow.getTasks();
workflow.setTasks(new LinkedList<>());
String payload = toJson(workflow);
recordCassandraDaoRequests("updateWorkflow", "n/a", workflow.getWorkflowName());
Expand All @@ -465,7 +465,7 @@ public String updateWorkflow(Workflow workflow) {

@Override
public boolean removeWorkflow(String workflowId) {
Workflow workflow = getWorkflow(workflowId, true);
WorkflowModel workflow = getWorkflow(workflowId, true);
boolean removed = false;
// TODO: calculate number of shards and iterate
if (workflow != null) {
Expand Down Expand Up @@ -508,21 +508,21 @@ public void removeFromPendingWorkflow(String workflowType, String workflowId) {
}

@Override
public Workflow getWorkflow(String workflowId) {
public WorkflowModel getWorkflow(String workflowId) {
return getWorkflow(workflowId, true);
}

@Override
public Workflow getWorkflow(String workflowId, boolean includeTasks) {
Workflow workflow = null;
public WorkflowModel getWorkflow(String workflowId, boolean includeTasks) {
WorkflowModel workflow = null;
try {
ResultSet resultSet;
if (includeTasks) {
resultSet =
session.execute(
selectWorkflowWithTasksStatement.bind(
UUID.fromString(workflowId), DEFAULT_SHARD_ID));
List<Task> tasks = new ArrayList<>();
List<TaskModel> tasks = new ArrayList<>();

List<Row> rows = resultSet.all();
if (rows.size() == 0) {
Expand All @@ -532,9 +532,9 @@ public Workflow getWorkflow(String workflowId, boolean includeTasks) {
for (Row row : rows) {
String entityKey = row.getString(ENTITY_KEY);
if (ENTITY_TYPE_WORKFLOW.equals(entityKey)) {
workflow = readValue(row.getString(PAYLOAD_KEY), Workflow.class);
workflow = readValue(row.getString(PAYLOAD_KEY), WorkflowModel.class);
} else if (ENTITY_TYPE_TASK.equals(entityKey)) {
Task task = readValue(row.getString(PAYLOAD_KEY), Task.class);
TaskModel task = readValue(row.getString(PAYLOAD_KEY), TaskModel.class);
tasks.add(task);
} else {
throw new ApplicationException(
Expand All @@ -547,7 +547,7 @@ public Workflow getWorkflow(String workflowId, boolean includeTasks) {

if (workflow != null) {
recordCassandraDaoRequests("getWorkflow", "n/a", workflow.getWorkflowName());
tasks.sort(Comparator.comparingInt(Task::getSeq));
tasks.sort(Comparator.comparingInt(TaskModel::getSeq));
workflow.setTasks(tasks);
}
} else {
Expand All @@ -557,10 +557,10 @@ public Workflow getWorkflow(String workflowId, boolean includeTasks) {
Optional.ofNullable(resultSet.one())
.map(
row -> {
Workflow wf =
WorkflowModel wf =
readValue(
row.getString(PAYLOAD_KEY),
Workflow.class);
WorkflowModel.class);
recordCassandraDaoRequests(
"getWorkflow", "n/a", wf.getWorkflowName());
return wf;
Expand Down Expand Up @@ -598,7 +598,7 @@ public List<String> getRunningWorkflowIds(String workflowName, int version) {
* Conductor
*/
@Override
public List<Workflow> getPendingWorkflowsByType(String workflowName, int version) {
public List<WorkflowModel> getPendingWorkflowsByType(String workflowName, int version) {
throw new UnsupportedOperationException(
"This method is not implemented in CassandraExecutionDAO. Please use ExecutionDAOFacade instead.");
}
Expand Down Expand Up @@ -628,7 +628,8 @@ public long getInProgressTaskCount(String taskDefName) {
* Conductor
*/
@Override
public List<Workflow> getWorkflowsByType(String workflowName, Long startTime, Long endTime) {
public List<WorkflowModel> getWorkflowsByType(
String workflowName, Long startTime, Long endTime) {
throw new UnsupportedOperationException(
"This method is not implemented in CassandraExecutionDAO. Please use ExecutionDAOFacade instead.");
}
Expand All @@ -638,7 +639,7 @@ public List<Workflow> getWorkflowsByType(String workflowName, Long startTime, Lo
* Conductor
*/
@Override
public List<Workflow> getWorkflowsByCorrelationId(
public List<WorkflowModel> getWorkflowsByCorrelationId(
String workflowName, String correlationId, boolean includeTasks) {
throw new UnsupportedOperationException(
"This method is not implemented in CassandraExecutionDAO. Please use ExecutionDAOFacade instead.");
Expand Down Expand Up @@ -741,7 +742,7 @@ List<EventExecution> getEventExecutions(
}

@Override
public void addTaskToLimit(Task task) {
public void addTaskToLimit(TaskModel task) {
try {
recordCassandraDaoRequests(
"addTaskToLimit", task.getTaskType(), task.getWorkflowType());
Expand Down Expand Up @@ -770,7 +771,7 @@ public void addTaskToLimit(Task task) {
}

@Override
public void removeTaskFromLimit(Task task) {
public void removeTaskFromLimit(TaskModel task) {
try {
recordCassandraDaoRequests(
"removeTaskFromLimit", task.getTaskType(), task.getWorkflowType());
Expand All @@ -797,7 +798,7 @@ public void removeTaskFromLimit(Task task) {
}
}

private boolean removeTask(Task task) {
private boolean removeTask(TaskModel task) {
// TODO: calculate shard number based on seq and maxTasksPerShard
try {
// get total tasks for this workflow
Expand Down Expand Up @@ -834,7 +835,7 @@ private boolean removeTask(Task task) {
}
}

private void removeTaskLookup(Task task) {
private void removeTaskLookup(TaskModel task) {
try {
recordCassandraDaoRequests(
"removeTaskLookup", task.getTaskType(), task.getWorkflowType());
Expand All @@ -854,7 +855,7 @@ private void removeTaskLookup(Task task) {
}

@VisibleForTesting
void validateTasks(List<Task> tasks) {
void validateTasks(List<TaskModel> tasks) {
Preconditions.checkNotNull(tasks, "Tasks object cannot be null");
Preconditions.checkArgument(!tasks.isEmpty(), "Tasks object cannot be empty");
tasks.forEach(
Expand All @@ -868,7 +869,7 @@ void validateTasks(List<Task> tasks) {
});

String workflowId = tasks.get(0).getWorkflowInstanceId();
Optional<Task> optionalTask =
Optional<TaskModel> optionalTask =
tasks.stream()
.filter(task -> !workflowId.equals(task.getWorkflowInstanceId()))
.findAny();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020 Netflix, Inc.
* Copyright 2022 Netflix, Inc.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020 Netflix, Inc.
* Copyright 2022 Netflix, Inc.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020 Netflix, Inc.
* Copyright 2022 Netflix, Inc.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
Expand Down
Loading

0 comments on commit cd528e1

Please sign in to comment.