From cd528e126ed36abedf38ac0eab07e18e1d476dad Mon Sep 17 00:00:00 2001
From: Anoop Panicker <34087882+apanicker-nflx@users.noreply.github.com>
Date: Mon, 31 Jan 2022 15:46:34 -0800
Subject: [PATCH] Object model separation (#2702)
* introduce a domain specific object model
* service layer wip
* core and contribs wip
* remove repeated external storage access in state machine
* cassandra module changes
* redis persistence changes
* mysql persistence changes
* postgres persistence changes
* redis concurrency limit module changes
* updated IndexDAO APIs and corresponding ES implementations
* more changes per indexDAO api
* fix tests in es modules due to new contract
* fix unit tests as per new models
* rename objects and classes
* fix integ tests
---
build.gradle | 11 +
.../config/CassandraConfiguration.java | 2 +-
.../cassandra/config/CassandraProperties.java | 2 +-
.../cassandra/dao/CassandraBaseDAO.java | 2 +-
.../dao/CassandraEventHandlerDAO.java | 2 +-
.../cassandra/dao/CassandraExecutionDAO.java | 83 +-
.../cassandra/dao/CassandraMetadataDAO.java | 2 +-
.../conductor/cassandra/util/Constants.java | 2 +-
.../conductor/cassandra/util/Statements.java | 2 +-
.../dao/CassandraEventHandlerDAOSpec.groovy | 22 +-
.../dao/CassandraExecutionDAOSpec.groovy | 96 +--
.../dao/CassandraMetadataDAOSpec.groovy | 22 +-
.../cassandra/dao/CassandraSpec.groovy | 41 +-
.../cassandra/util/StatementsSpec.groovy | 21 +-
.../conductor/client/http/TaskClient.java | 6 +-
.../conductor/common/metadata/tasks/Task.java | 2 +-
.../common/metadata/tasks/TaskResult.java | 2 +-
.../conductor/common/run/Workflow.java | 2 +-
.../contribs/dao/index/NoopIndexDAO.java | 14 +-
.../dao/index/NoopIndexDAOConfiguration.java | 2 +-
...rchivingWithTTLWorkflowStatusListener.java | 14 +-
...rchivingWorkflowListenerConfiguration.java | 4 +-
.../ArchivingWorkflowListenerProperties.java | 2 +-
.../ArchivingWorkflowStatusListener.java | 10 +-
.../ConductorQueueStatusPublisher.java | 18 +-
...ctorQueueStatusPublisherConfiguration.java | 6 +-
...nductorQueueStatusPublisherProperties.java | 2 +-
.../queue/sqs/SQSObservableQueue.java | 2 +-
.../config/SQSEventQueueConfiguration.java | 1 -
.../contribs/storage/S3PayloadStorage.java | 6 +-
.../storage/config/S3Configuration.java | 2 +-
.../http/DefaultRestTemplateProvider.java | 2 +-
.../contribs/tasks/http/HttpTask.java | 35 +-
.../tasks/http/RestTemplateProvider.java | 2 +-
.../contribs/tasks/json/JsonJqTransform.java | 14 +-
.../tasks/kafka/KafkaProducerManager.java | 2 +-
.../tasks/kafka/KafkaPublishTask.java | 22 +-
.../ArchivingWorkflowStatusListenerTest.java | 14 +-
.../sqs/DefaultEventQueueProcessorTest.java | 7 +-
.../http/DefaultRestTemplateProviderTest.java | 2 +-
.../contribs/tasks/http/HttpTaskTest.java | 685 ++++++++--------
.../tasks/json/JsonJqTransformTest.java | 18 +-
.../tasks/kafka/KafkaPublishTaskTest.java | 52 +-
.../netflix/conductor/annotations/Audit.java | 2 +-
.../netflix/conductor/annotations/Trace.java | 2 +-
.../core/LifecycleAwareComponent.java | 2 +-
.../conductor/core/WorkflowContext.java | 4 +-
.../ExecutionDAOFacade.java | 145 ++--
.../conductor/core/dal/ModelMapper.java | 192 +++++
.../core/events/ActionProcessor.java | 2 +-
.../core/events/DefaultEventProcessor.java | 2 +-
.../core/events/DefaultEventQueueManager.java | 3 +-
.../core/events/EventQueueManager.java | 2 +-
.../core/events/EventQueueProvider.java | 2 +-
.../conductor/core/events/EventQueues.java | 2 +-
.../core/events/ScriptEvaluator.java | 2 +-
.../core/events/SimpleActionProcessor.java | 26 +-
.../queue/ConductorEventQueueProvider.java | 3 +-
.../queue/ConductorObservableQueue.java | 2 +-
.../queue/DefaultEventQueueProcessor.java | 13 +-
.../conductor/core/events/queue/Message.java | 2 +-
.../core/events/queue/ObservableQueue.java | 2 +-
.../core/exception/ApplicationException.java | 5 +-
.../exception/TerminateWorkflowException.java | 23 +-
.../execution/AsyncSystemTaskExecutor.java | 49 +-
.../core/execution/DeciderService.java | 265 +++----
.../core/execution/WorkflowExecutor.java | 413 +++++-----
.../core/execution/evaluators/Evaluator.java | 2 +-
.../evaluators/JavascriptEvaluator.java | 3 +-
.../evaluators/ValueParamEvaluator.java | 4 +-
.../execution/mapper/DecisionTaskMapper.java | 31 +-
.../execution/mapper/DoWhileTaskMapper.java | 31 +-
.../execution/mapper/DynamicTaskMapper.java | 25 +-
.../execution/mapper/EventTaskMapper.java | 14 +-
.../mapper/ExclusiveJoinTaskMapper.java | 14 +-
.../mapper/ForkJoinDynamicTaskMapper.java | 87 ++-
.../execution/mapper/ForkJoinTaskMapper.java | 26 +-
.../core/execution/mapper/HTTPTaskMapper.java | 37 +-
.../execution/mapper/InlineTaskMapper.java | 27 +-
.../core/execution/mapper/JoinTaskMapper.java | 22 +-
.../mapper/JsonJQTransformTaskMapper.java | 21 +-
.../mapper/KafkaPublishTaskMapper.java | 26 +-
.../execution/mapper/LambdaTaskMapper.java | 21 +-
.../mapper/SetVariableTaskMapper.java | 14 +-
.../execution/mapper/SimpleTaskMapper.java | 20 +-
.../mapper/SubWorkflowTaskMapper.java | 24 +-
.../execution/mapper/SwitchTaskMapper.java | 31 +-
.../core/execution/mapper/TaskMapper.java | 6 +-
.../execution/mapper/TaskMapperContext.java | 12 +-
.../execution/mapper/TerminateTaskMapper.java | 14 +-
.../mapper/UserDefinedTaskMapper.java | 22 +-
.../core/execution/mapper/WaitTaskMapper.java | 17 +-
.../core/execution/tasks/Decision.java | 12 +-
.../core/execution/tasks/DoWhile.java | 52 +-
.../conductor/core/execution/tasks/Event.java | 24 +-
.../core/execution/tasks/ExclusiveJoin.java | 22 +-
.../core/execution/tasks/ExecutionConfig.java | 2 +-
.../conductor/core/execution/tasks/Fork.java | 2 +-
.../core/execution/tasks/Inline.java | 13 +-
.../tasks/IsolatedTaskQueueProducer.java | 2 +-
.../conductor/core/execution/tasks/Join.java | 18 +-
.../core/execution/tasks/Lambda.java | 15 +-
.../core/execution/tasks/SetVariable.java | 14 +-
.../core/execution/tasks/SubWorkflow.java | 45 +-
.../core/execution/tasks/Switch.java | 12 +-
.../execution/tasks/SystemTaskRegistry.java | 2 +-
.../execution/tasks/SystemTaskWorker.java | 2 +-
.../tasks/SystemTaskWorkerCoordinator.java | 2 +-
.../core/execution/tasks/Terminate.java | 13 +-
.../conductor/core/execution/tasks/Wait.java | 18 +-
.../execution/tasks/WorkflowSystemTask.java | 19 +-
.../core/listener/WorkflowStatusListener.java | 16 +-
.../listener/WorkflowStatusListenerStub.java | 10 +-
.../core/metadata/MetadataMapperService.java | 10 +-
.../reconciliation/WorkflowReconciler.java | 5 +-
.../reconciliation/WorkflowRepairService.java | 37 +-
.../core/reconciliation/WorkflowSweeper.java | 5 +-
.../core/storage/DummyPayloadStorage.java | 2 +-
.../com/netflix/conductor/core/sync/Lock.java | 6 +-
.../netflix/conductor/core/sync/NoopLock.java | 2 +-
.../utils/ExternalPayloadStorageUtils.java | 71 +-
.../conductor/core/utils/IDGenerator.java | 2 +-
.../conductor/core/utils/JsonUtils.java | 2 +-
.../conductor/core/utils/ParametersUtils.java | 77 +-
.../conductor/core/utils/QueueUtils.java | 11 +-
.../conductor/core/utils/SemaphoreUtil.java | 2 +-
.../netflix/conductor/core/utils/Utils.java | 4 +-
.../dao/ConcurrentExecutionLimitDAO.java | 10 +-
.../conductor/dao/EventHandlerDAO.java | 2 +-
.../netflix/conductor/dao/ExecutionDAO.java | 40 +-
.../com/netflix/conductor/dao/IndexDAO.java | 14 +-
.../netflix/conductor/dao/MetadataDAO.java | 2 +-
.../netflix/conductor/dao/PollDataDAO.java | 2 +-
.../com/netflix/conductor/dao/QueueDAO.java | 2 +-
.../conductor/dao/RateLimitingDAO.java | 10 +-
.../netflix/conductor/metrics/Monitors.java | 18 +-
.../conductor/metrics/WorkflowMonitor.java | 4 +-
.../netflix/conductor/model/TaskModel.java | 735 ++++++++++++++++++
.../conductor/model/WorkflowModel.java | 448 +++++++++++
.../conductor/service/AdminService.java | 6 +-
.../conductor/service/AdminServiceImpl.java | 7 +-
.../conductor/service/EventService.java | 2 +-
.../conductor/service/EventServiceImpl.java | 2 +-
.../service/ExecutionLockService.java | 2 +-
.../conductor/service/ExecutionService.java | 72 +-
.../conductor/service/MetadataService.java | 2 +-
.../service/MetadataServiceImpl.java | 3 +-
.../conductor/service/TaskService.java | 2 +-
.../conductor/service/TaskServiceImpl.java | 39 +-
.../service/WorkflowBulkService.java | 2 +-
.../service/WorkflowBulkServiceImpl.java | 2 +-
.../conductor/service/WorkflowService.java | 2 +-
.../service/WorkflowServiceImpl.java | 22 +-
.../validations/ValidationContext.java | 2 +-
.../WorkflowTaskTypeConstraint.java | 7 +-
.../WorkflowTaskValidConstraint.java | 6 +-
.../conductor/core/dal/ModelMapperSpec.groovy | 156 ++++
.../AsyncSystemTaskExecutorTest.groovy | 158 ++--
.../core/execution/tasks/EventSpec.groovy | 88 +--
.../IsolatedTaskQueueProducerSpec.groovy | 29 +-
.../ExecutionDAOFacadeTest.java | 65 +-
.../events/TestDefaultEventProcessor.java | 34 +-
.../events/TestSimpleActionProcessor.java | 24 +-
.../core/execution/TestDeciderOutcomes.java | 75 +-
.../core/execution/TestDeciderService.java | 347 ++++-----
.../core/execution/TestWorkflowExecutor.java | 731 ++++++++---------
.../execution/WorkflowSystemTaskStub.java | 11 +-
.../mapper/DecisionTaskMapperTest.java | 18 +-
.../mapper/DoWhileTaskMapperTest.java | 22 +-
.../mapper/DynamicTaskMapperTest.java | 14 +-
.../execution/mapper/EventTaskMapperTest.java | 14 +-
.../mapper/ForkJoinDynamicTaskMapperTest.java | 46 +-
.../mapper/ForkJoinTaskMapperTest.java | 20 +-
.../execution/mapper/HTTPTaskMapperTest.java | 14 +-
.../mapper/InlineTaskMapperTest.java | 14 +-
.../execution/mapper/JoinTaskMapperTest.java | 14 +-
.../mapper/JsonJQTransformTaskMapperTest.java | 14 +-
.../mapper/KafkaPublishTaskMapperTest.java | 14 +-
.../mapper/LambdaTaskMapperTest.java | 14 +-
.../mapper/SetVariableTaskMapperTest.java | 10 +-
.../mapper/SimpleTaskMapperTest.java | 12 +-
.../mapper/SubWorkflowTaskMapperTest.java | 30 +-
.../mapper/SwitchTaskMapperTest.java | 18 +-
.../mapper/TerminateTaskMapperTest.java | 10 +-
.../mapper/UserDefinedTaskMapperTest.java | 12 +-
.../execution/mapper/WaitTaskMapperTest.java | 10 +-
.../tasks/EventQueueResolutionTest.java | 31 +-
.../core/execution/tasks/InlineTest.java | 31 +-
.../core/execution/tasks/TestDoWhile.java | 65 +-
.../core/execution/tasks/TestLambda.java | 20 +-
.../core/execution/tasks/TestSubWorkflow.java | 108 +--
.../core/execution/tasks/TestTerminate.java | 54 +-
.../TestWorkflowRepairService.java | 57 +-
.../ExternalPayloadStorageUtilsTest.java | 34 +-
.../conductor/dao/ExecutionDAOTest.java | 111 ++-
.../service/ExecutionServiceTest.java | 67 +-
.../es6/config/ElasticSearchConditions.java | 2 +-
.../es6/config/ElasticSearchProperties.java | 2 +-
.../config/ElasticSearchV6Configuration.java | 2 +-
.../es6/dao/index/ElasticSearchDAOV6.java | 36 +-
.../es6/dao/index/ElasticSearchRestDAOV6.java | 44 +-
.../es6/dao/index/TestElasticSearchDAOV6.java | 79 +-
.../index/TestElasticSearchDAOV6Batch.java | 45 +-
.../dao/index/TestElasticSearchRestDAOV6.java | 110 ++-
.../TestElasticSearchRestDAOV6Batch.java | 45 +-
.../conductor/es6/utils/TestUtils.java | 20 +-
.../src/test/resources/task_summary.json | 17 +
.../src/test/resources/workflow.json | 77 --
.../src/test/resources/workflow_summary.json | 12 +
.../es7/dao/index/ElasticSearchRestDAOV7.java | 45 +-
.../dao/index/TestElasticSearchRestDAOV7.java | 137 ++--
.../TestElasticSearchRestDAOV7Batch.java | 44 +-
.../conductor/es7/utils/TestUtils.java | 22 +-
.../src/test/resources/task_summary.json | 17 +
.../src/test/resources/workflow.json | 77 --
.../src/test/resources/workflow_summary.json | 12 +
.../conductor/client/grpc/TaskClient.java | 2 +-
.../mysql/config/MySQLConfiguration.java | 3 +-
.../mysql/dao/MySQLExecutionDAO.java | 135 ++--
.../conductor/mysql/dao/MySQLMetadataDAO.java | 2 +-
.../conductor/mysql/dao/MySQLQueueDAO.java | 2 +-
.../mysql/dao/MySQLExecutionDAOTest.java | 8 +-
.../config/PostgresConfiguration.java | 3 +-
.../postgres/config/PostgresProperties.java | 2 +-
.../postgres/dao/PostgresBaseDAO.java | 2 +-
.../postgres/dao/PostgresExecutionDAO.java | 134 ++--
.../postgres/dao/PostgresMetadataDAO.java | 2 +-
.../postgres/dao/PostgresQueueDAO.java | 2 +-
.../dao/PostgresExecutionDAOTest.java | 10 +-
.../postgres/dao/PostgresMetadataDAOTest.java | 2 +-
.../postgres/dao/PostgresQueueDAOTest.java | 2 +-
.../RedisConcurrentExecutionLimitDAO.java | 24 +-
...edisConcurrentExecutionLimitDAOSpec.groovy | 43 +-
redis-lock/build.gradle | 1 -
.../conductor/redis/dao/BaseDynoDAO.java | 2 +-
.../conductor/redis/dao/DynoQueueDAO.java | 2 +-
.../redis/dao/RedisEventHandlerDAO.java | 2 +-
.../redis/dao/RedisExecutionDAO.java | 107 ++-
.../conductor/redis/dao/RedisMetadataDAO.java | 2 +-
.../conductor/redis/dao/RedisPollDataDAO.java | 2 +-
.../redis/dao/RedisRateLimitingDAO.java | 24 +-
.../dynoqueue/ConfigurationHostSupplier.java | 2 +-
.../dynoqueue/LocalhostHostSupplier.java | 2 +-
.../RedisQueuesShardingStrategyProvider.java | 2 +-
.../redis/dao/RedisExecutionDAOTest.java | 11 +-
.../redis/dao/RedisRateLimitDAOTest.java | 8 +-
test-harness/build.gradle | 7 -
.../AbstractResiliencySpecification.groovy | 2 +-
.../test/integration/DecisionTaskSpec.groovy | 2 +-
.../test/integration/DoWhileSpec.groovy | 4 +-
.../integration/DynamicForkJoinSpec.groovy | 2 +-
.../test/integration/EventTaskSpec.groovy | 2 +-
.../test/integration/ExclusiveJoinSpec.groovy | 2 +-
.../ExternalPayloadStorageSpec.groovy | 48 +-
.../test/integration/ForkJoinSpec.groovy | 2 +-
...rchicalForkJoinSubworkflowRerunSpec.groovy | 2 +-
...hicalForkJoinSubworkflowRestartSpec.groovy | 2 +-
...rchicalForkJoinSubworkflowRetrySpec.groovy | 2 +-
.../integration/JsonJQTransformSpec.groovy | 2 +-
.../integration/KafkaPublishTaskSpec.groovy | 2 +-
.../LambdaAndTerminateTaskSpec.groovy | 2 +-
.../NestedForkJoinSubWorkflowSpec.groovy | 2 +-
.../integration/SetVariableTaskSpec.groovy | 2 +-
.../integration/SimpleWorkflowSpec.groovy | 10 +-
.../integration/SubWorkflowRerunSpec.groovy | 2 +-
.../integration/SubWorkflowRestartSpec.groovy | 2 +-
.../integration/SubWorkflowRetrySpec.groovy | 2 +-
.../test/integration/SubWorkflowSpec.groovy | 2 +-
.../test/integration/SwitchTaskSpec.groovy | 2 +-
.../test/integration/SystemTaskSpec.groovy | 2 +-
.../integration/TaskLimitsWorkflowSpec.groovy | 5 +-
.../test/integration/WaitTaskSpec.groovy | 2 +-
.../WorkflowAndTaskConfigurationSpec.groovy | 12 +-
.../resiliency/QueueResiliencySpec.groovy | 2 +-
.../test/resiliency/TaskResiliencySpec.groovy | 2 +-
.../test/util/WorkflowTestUtil.groovy | 41 +-
...orkflowStatusPublisherIntegrationTest.java | 8 +-
.../conductor/test/utils/UserTask.java | 15 +-
zookeeper-lock/build.gradle | 1 -
279 files changed, 5165 insertions(+), 4032 deletions(-)
rename core/src/main/java/com/netflix/conductor/core/{orchestration => dal}/ExecutionDAOFacade.java (81%)
create mode 100644 core/src/main/java/com/netflix/conductor/core/dal/ModelMapper.java
create mode 100644 core/src/main/java/com/netflix/conductor/model/TaskModel.java
create mode 100644 core/src/main/java/com/netflix/conductor/model/WorkflowModel.java
create mode 100644 core/src/test/groovy/com/netflix/conductor/core/dal/ModelMapperSpec.groovy
rename core/src/test/java/com/netflix/conductor/core/{orchestration => dal}/ExecutionDAOFacadeTest.java (78%)
create mode 100644 es6-persistence/src/test/resources/task_summary.json
delete mode 100644 es6-persistence/src/test/resources/workflow.json
create mode 100644 es6-persistence/src/test/resources/workflow_summary.json
create mode 100644 es7-persistence/src/test/resources/task_summary.json
delete mode 100644 es7-persistence/src/test/resources/workflow.json
create mode 100644 es7-persistence/src/test/resources/workflow_summary.json
diff --git a/build.gradle b/build.gradle
index b350de13e6..75ea80bdca 100644
--- a/build.gradle
+++ b/build.gradle
@@ -184,3 +184,14 @@ configure(allprojects - project(':conductor-grpc')) {
}
}
}
+
+["cassandra-persistence", "core", "redis-concurrency-limit", "test-harness"].each {
+ configure(project(":conductor-$it")) {
+ spotless {
+ groovy {
+ importOrder('java', 'javax', 'org', 'com.netflix', '', '\\#com.netflix', '\\#')
+ licenseHeaderFile("$rootDir/licenseheader.txt")
+ }
+ }
+ }
+}
diff --git a/cassandra-persistence/src/main/java/com/netflix/conductor/cassandra/config/CassandraConfiguration.java b/cassandra-persistence/src/main/java/com/netflix/conductor/cassandra/config/CassandraConfiguration.java
index 352e5cfec1..2d2725f3e3 100644
--- a/cassandra-persistence/src/main/java/com/netflix/conductor/cassandra/config/CassandraConfiguration.java
+++ b/cassandra-persistence/src/main/java/com/netflix/conductor/cassandra/config/CassandraConfiguration.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2020 Netflix, Inc.
+ * Copyright 2022 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
diff --git a/cassandra-persistence/src/main/java/com/netflix/conductor/cassandra/config/CassandraProperties.java b/cassandra-persistence/src/main/java/com/netflix/conductor/cassandra/config/CassandraProperties.java
index 19286cad45..28d3eee972 100644
--- a/cassandra-persistence/src/main/java/com/netflix/conductor/cassandra/config/CassandraProperties.java
+++ b/cassandra-persistence/src/main/java/com/netflix/conductor/cassandra/config/CassandraProperties.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2021 Netflix, Inc.
+ * Copyright 2022 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
diff --git a/cassandra-persistence/src/main/java/com/netflix/conductor/cassandra/dao/CassandraBaseDAO.java b/cassandra-persistence/src/main/java/com/netflix/conductor/cassandra/dao/CassandraBaseDAO.java
index 70664c1694..c576be8e91 100644
--- a/cassandra-persistence/src/main/java/com/netflix/conductor/cassandra/dao/CassandraBaseDAO.java
+++ b/cassandra-persistence/src/main/java/com/netflix/conductor/cassandra/dao/CassandraBaseDAO.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2020 Netflix, Inc.
+ * Copyright 2022 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
diff --git a/cassandra-persistence/src/main/java/com/netflix/conductor/cassandra/dao/CassandraEventHandlerDAO.java b/cassandra-persistence/src/main/java/com/netflix/conductor/cassandra/dao/CassandraEventHandlerDAO.java
index ea0b12de9f..ce797cea9e 100644
--- a/cassandra-persistence/src/main/java/com/netflix/conductor/cassandra/dao/CassandraEventHandlerDAO.java
+++ b/cassandra-persistence/src/main/java/com/netflix/conductor/cassandra/dao/CassandraEventHandlerDAO.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2020 Netflix, Inc.
+ * Copyright 2022 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
diff --git a/cassandra-persistence/src/main/java/com/netflix/conductor/cassandra/dao/CassandraExecutionDAO.java b/cassandra-persistence/src/main/java/com/netflix/conductor/cassandra/dao/CassandraExecutionDAO.java
index 74e2016da8..4341186d70 100644
--- a/cassandra-persistence/src/main/java/com/netflix/conductor/cassandra/dao/CassandraExecutionDAO.java
+++ b/cassandra-persistence/src/main/java/com/netflix/conductor/cassandra/dao/CassandraExecutionDAO.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2020 Netflix, Inc.
+ * Copyright 2022 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
@@ -27,15 +27,15 @@
import com.netflix.conductor.cassandra.config.CassandraProperties;
import com.netflix.conductor.cassandra.util.Statements;
import com.netflix.conductor.common.metadata.events.EventExecution;
-import com.netflix.conductor.common.metadata.tasks.Task;
import com.netflix.conductor.common.metadata.tasks.TaskDef;
-import com.netflix.conductor.common.run.Workflow;
import com.netflix.conductor.common.utils.RetryUtil;
import com.netflix.conductor.core.exception.ApplicationException;
import com.netflix.conductor.core.exception.ApplicationException.Code;
import com.netflix.conductor.dao.ConcurrentExecutionLimitDAO;
import com.netflix.conductor.dao.ExecutionDAO;
import com.netflix.conductor.metrics.Monitors;
+import com.netflix.conductor.model.TaskModel;
+import com.netflix.conductor.model.WorkflowModel;
import com.datastax.driver.core.BatchStatement;
import com.datastax.driver.core.PreparedStatement;
@@ -56,7 +56,6 @@
import static com.netflix.conductor.cassandra.util.Constants.TOTAL_PARTITIONS_KEY;
import static com.netflix.conductor.cassandra.util.Constants.TOTAL_TASKS_KEY;
import static com.netflix.conductor.cassandra.util.Constants.WORKFLOW_ID_KEY;
-import static com.netflix.conductor.common.metadata.tasks.Task.Status.IN_PROGRESS;
@Trace
public class CassandraExecutionDAO extends CassandraBaseDAO
@@ -172,11 +171,11 @@ public CassandraExecutionDAO(
}
@Override
- public List getPendingTasksByWorkflow(String taskName, String workflowId) {
- List tasks = getTasksForWorkflow(workflowId);
+ public List getPendingTasksByWorkflow(String taskName, String workflowId) {
+ List tasks = getTasksForWorkflow(workflowId);
return tasks.stream()
.filter(task -> taskName.equals(task.getTaskType()))
- .filter(task -> IN_PROGRESS.equals(task.getStatus()))
+ .filter(task -> TaskModel.Status.IN_PROGRESS.equals(task.getStatus()))
.collect(Collectors.toList());
}
@@ -185,7 +184,7 @@ public List getPendingTasksByWorkflow(String taskName, String workflowId)
* Conductor
*/
@Override
- public List getTasks(String taskType, String startKey, int count) {
+ public List getTasks(String taskType, String startKey, int count) {
throw new UnsupportedOperationException(
"This method is not implemented in CassandraExecutionDAO. Please use ExecutionDAOFacade instead.");
}
@@ -198,7 +197,7 @@ public List getTasks(String taskType, String startKey, int count) {
* @param tasks tasks to be created
*/
@Override
- public List createTasks(List tasks) {
+ public List createTasks(List tasks) {
validateTasks(tasks);
String workflowId = tasks.get(0).getWorkflowInstanceId();
try {
@@ -259,7 +258,7 @@ public List createTasks(List tasks) {
}
@Override
- public void updateTask(Task task) {
+ public void updateTask(TaskModel task) {
try {
// TODO: calculate the shard number the task belongs to
String taskPayload = toJson(task);
@@ -276,7 +275,7 @@ public void updateTask(Task task) {
&& task.getTaskDefinition().get().concurrencyLimit() > 0) {
if (task.getStatus().isTerminal()) {
removeTaskFromLimit(task);
- } else if (task.getStatus() == IN_PROGRESS) {
+ } else if (task.getStatus() == TaskModel.Status.IN_PROGRESS) {
addTaskToLimit(task);
}
}
@@ -296,7 +295,7 @@ public void updateTask(Task task) {
* Conductor
*/
@Override
- public boolean exceedsLimit(Task task) {
+ public boolean exceedsLimit(TaskModel task) {
Optional taskDefinition = task.getTaskDefinition();
if (taskDefinition.isEmpty()) {
return false;
@@ -342,7 +341,7 @@ public boolean exceedsLimit(Task task) {
@Override
public boolean removeTask(String taskId) {
- Task task = getTask(taskId);
+ TaskModel task = getTask(taskId);
if (task == null) {
LOGGER.warn("No such task found by id {}", taskId);
return false;
@@ -351,7 +350,7 @@ public boolean removeTask(String taskId) {
}
@Override
- public Task getTask(String taskId) {
+ public TaskModel getTask(String taskId) {
try {
String workflowId = lookupWorkflowIdFromTaskId(taskId);
if (workflowId == null) {
@@ -366,7 +365,8 @@ public Task getTask(String taskId) {
return Optional.ofNullable(resultSet.one())
.map(
row -> {
- Task task = readValue(row.getString(PAYLOAD_KEY), Task.class);
+ TaskModel task =
+ readValue(row.getString(PAYLOAD_KEY), TaskModel.class);
recordCassandraDaoRequests(
"getTask", task.getTaskType(), task.getWorkflowType());
recordCassandraDaoPayloadSize(
@@ -388,7 +388,7 @@ public Task getTask(String taskId) {
}
@Override
- public List getTasks(List taskIds) {
+ public List getTasks(List taskIds) {
Preconditions.checkNotNull(taskIds);
Preconditions.checkArgument(taskIds.size() > 0, "Task ids list cannot be empty");
String workflowId = lookupWorkflowIdFromTaskId(taskIds.get(0));
@@ -405,20 +405,20 @@ public List getTasks(List taskIds) {
* Conductor
*/
@Override
- public List getPendingTasksForTaskType(String taskType) {
+ public List getPendingTasksForTaskType(String taskType) {
throw new UnsupportedOperationException(
"This method is not implemented in CassandraExecutionDAO. Please use ExecutionDAOFacade instead.");
}
@Override
- public List getTasksForWorkflow(String workflowId) {
+ public List getTasksForWorkflow(String workflowId) {
return getWorkflow(workflowId, true).getTasks();
}
@Override
- public String createWorkflow(Workflow workflow) {
+ public String createWorkflow(WorkflowModel workflow) {
try {
- List tasks = workflow.getTasks();
+ List tasks = workflow.getTasks();
workflow.setTasks(new LinkedList<>());
String payload = toJson(workflow);
@@ -441,9 +441,9 @@ public String createWorkflow(Workflow workflow) {
}
@Override
- public String updateWorkflow(Workflow workflow) {
+ public String updateWorkflow(WorkflowModel workflow) {
try {
- List tasks = workflow.getTasks();
+ List tasks = workflow.getTasks();
workflow.setTasks(new LinkedList<>());
String payload = toJson(workflow);
recordCassandraDaoRequests("updateWorkflow", "n/a", workflow.getWorkflowName());
@@ -465,7 +465,7 @@ public String updateWorkflow(Workflow workflow) {
@Override
public boolean removeWorkflow(String workflowId) {
- Workflow workflow = getWorkflow(workflowId, true);
+ WorkflowModel workflow = getWorkflow(workflowId, true);
boolean removed = false;
// TODO: calculate number of shards and iterate
if (workflow != null) {
@@ -508,13 +508,13 @@ public void removeFromPendingWorkflow(String workflowType, String workflowId) {
}
@Override
- public Workflow getWorkflow(String workflowId) {
+ public WorkflowModel getWorkflow(String workflowId) {
return getWorkflow(workflowId, true);
}
@Override
- public Workflow getWorkflow(String workflowId, boolean includeTasks) {
- Workflow workflow = null;
+ public WorkflowModel getWorkflow(String workflowId, boolean includeTasks) {
+ WorkflowModel workflow = null;
try {
ResultSet resultSet;
if (includeTasks) {
@@ -522,7 +522,7 @@ public Workflow getWorkflow(String workflowId, boolean includeTasks) {
session.execute(
selectWorkflowWithTasksStatement.bind(
UUID.fromString(workflowId), DEFAULT_SHARD_ID));
- List tasks = new ArrayList<>();
+ List tasks = new ArrayList<>();
List rows = resultSet.all();
if (rows.size() == 0) {
@@ -532,9 +532,9 @@ public Workflow getWorkflow(String workflowId, boolean includeTasks) {
for (Row row : rows) {
String entityKey = row.getString(ENTITY_KEY);
if (ENTITY_TYPE_WORKFLOW.equals(entityKey)) {
- workflow = readValue(row.getString(PAYLOAD_KEY), Workflow.class);
+ workflow = readValue(row.getString(PAYLOAD_KEY), WorkflowModel.class);
} else if (ENTITY_TYPE_TASK.equals(entityKey)) {
- Task task = readValue(row.getString(PAYLOAD_KEY), Task.class);
+ TaskModel task = readValue(row.getString(PAYLOAD_KEY), TaskModel.class);
tasks.add(task);
} else {
throw new ApplicationException(
@@ -547,7 +547,7 @@ public Workflow getWorkflow(String workflowId, boolean includeTasks) {
if (workflow != null) {
recordCassandraDaoRequests("getWorkflow", "n/a", workflow.getWorkflowName());
- tasks.sort(Comparator.comparingInt(Task::getSeq));
+ tasks.sort(Comparator.comparingInt(TaskModel::getSeq));
workflow.setTasks(tasks);
}
} else {
@@ -557,10 +557,10 @@ public Workflow getWorkflow(String workflowId, boolean includeTasks) {
Optional.ofNullable(resultSet.one())
.map(
row -> {
- Workflow wf =
+ WorkflowModel wf =
readValue(
row.getString(PAYLOAD_KEY),
- Workflow.class);
+ WorkflowModel.class);
recordCassandraDaoRequests(
"getWorkflow", "n/a", wf.getWorkflowName());
return wf;
@@ -598,7 +598,7 @@ public List getRunningWorkflowIds(String workflowName, int version) {
* Conductor
*/
@Override
- public List getPendingWorkflowsByType(String workflowName, int version) {
+ public List getPendingWorkflowsByType(String workflowName, int version) {
throw new UnsupportedOperationException(
"This method is not implemented in CassandraExecutionDAO. Please use ExecutionDAOFacade instead.");
}
@@ -628,7 +628,8 @@ public long getInProgressTaskCount(String taskDefName) {
* Conductor
*/
@Override
- public List getWorkflowsByType(String workflowName, Long startTime, Long endTime) {
+ public List getWorkflowsByType(
+ String workflowName, Long startTime, Long endTime) {
throw new UnsupportedOperationException(
"This method is not implemented in CassandraExecutionDAO. Please use ExecutionDAOFacade instead.");
}
@@ -638,7 +639,7 @@ public List getWorkflowsByType(String workflowName, Long startTime, Lo
* Conductor
*/
@Override
- public List getWorkflowsByCorrelationId(
+ public List getWorkflowsByCorrelationId(
String workflowName, String correlationId, boolean includeTasks) {
throw new UnsupportedOperationException(
"This method is not implemented in CassandraExecutionDAO. Please use ExecutionDAOFacade instead.");
@@ -741,7 +742,7 @@ List getEventExecutions(
}
@Override
- public void addTaskToLimit(Task task) {
+ public void addTaskToLimit(TaskModel task) {
try {
recordCassandraDaoRequests(
"addTaskToLimit", task.getTaskType(), task.getWorkflowType());
@@ -770,7 +771,7 @@ public void addTaskToLimit(Task task) {
}
@Override
- public void removeTaskFromLimit(Task task) {
+ public void removeTaskFromLimit(TaskModel task) {
try {
recordCassandraDaoRequests(
"removeTaskFromLimit", task.getTaskType(), task.getWorkflowType());
@@ -797,7 +798,7 @@ public void removeTaskFromLimit(Task task) {
}
}
- private boolean removeTask(Task task) {
+ private boolean removeTask(TaskModel task) {
// TODO: calculate shard number based on seq and maxTasksPerShard
try {
// get total tasks for this workflow
@@ -834,7 +835,7 @@ private boolean removeTask(Task task) {
}
}
- private void removeTaskLookup(Task task) {
+ private void removeTaskLookup(TaskModel task) {
try {
recordCassandraDaoRequests(
"removeTaskLookup", task.getTaskType(), task.getWorkflowType());
@@ -854,7 +855,7 @@ private void removeTaskLookup(Task task) {
}
@VisibleForTesting
- void validateTasks(List tasks) {
+ void validateTasks(List tasks) {
Preconditions.checkNotNull(tasks, "Tasks object cannot be null");
Preconditions.checkArgument(!tasks.isEmpty(), "Tasks object cannot be empty");
tasks.forEach(
@@ -868,7 +869,7 @@ void validateTasks(List tasks) {
});
String workflowId = tasks.get(0).getWorkflowInstanceId();
- Optional optionalTask =
+ Optional optionalTask =
tasks.stream()
.filter(task -> !workflowId.equals(task.getWorkflowInstanceId()))
.findAny();
diff --git a/cassandra-persistence/src/main/java/com/netflix/conductor/cassandra/dao/CassandraMetadataDAO.java b/cassandra-persistence/src/main/java/com/netflix/conductor/cassandra/dao/CassandraMetadataDAO.java
index a9f9fe44c0..10bbe2dd51 100644
--- a/cassandra-persistence/src/main/java/com/netflix/conductor/cassandra/dao/CassandraMetadataDAO.java
+++ b/cassandra-persistence/src/main/java/com/netflix/conductor/cassandra/dao/CassandraMetadataDAO.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2020 Netflix, Inc.
+ * Copyright 2022 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
diff --git a/cassandra-persistence/src/main/java/com/netflix/conductor/cassandra/util/Constants.java b/cassandra-persistence/src/main/java/com/netflix/conductor/cassandra/util/Constants.java
index f5eb9f7dfe..473c23132c 100644
--- a/cassandra-persistence/src/main/java/com/netflix/conductor/cassandra/util/Constants.java
+++ b/cassandra-persistence/src/main/java/com/netflix/conductor/cassandra/util/Constants.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2020 Netflix, Inc.
+ * Copyright 2022 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
diff --git a/cassandra-persistence/src/main/java/com/netflix/conductor/cassandra/util/Statements.java b/cassandra-persistence/src/main/java/com/netflix/conductor/cassandra/util/Statements.java
index 5c538c41e7..a6ea122538 100644
--- a/cassandra-persistence/src/main/java/com/netflix/conductor/cassandra/util/Statements.java
+++ b/cassandra-persistence/src/main/java/com/netflix/conductor/cassandra/util/Statements.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2020 Netflix, Inc.
+ * Copyright 2022 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
diff --git a/cassandra-persistence/src/test/groovy/com/netflix/conductor/cassandra/dao/CassandraEventHandlerDAOSpec.groovy b/cassandra-persistence/src/test/groovy/com/netflix/conductor/cassandra/dao/CassandraEventHandlerDAOSpec.groovy
index 912d36c65c..214f3722de 100644
--- a/cassandra-persistence/src/test/groovy/com/netflix/conductor/cassandra/dao/CassandraEventHandlerDAOSpec.groovy
+++ b/cassandra-persistence/src/test/groovy/com/netflix/conductor/cassandra/dao/CassandraEventHandlerDAOSpec.groovy
@@ -1,20 +1,20 @@
/*
- * Copyright 2021 Netflix, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
+ * Copyright 2022 Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
*/
-
package com.netflix.conductor.cassandra.dao
import com.netflix.conductor.common.metadata.events.EventExecution
import com.netflix.conductor.common.metadata.events.EventHandler
+
import spock.lang.Subject
class CassandraEventHandlerDAOSpec extends CassandraSpec {
diff --git a/cassandra-persistence/src/test/groovy/com/netflix/conductor/cassandra/dao/CassandraExecutionDAOSpec.groovy b/cassandra-persistence/src/test/groovy/com/netflix/conductor/cassandra/dao/CassandraExecutionDAOSpec.groovy
index 11727500ae..2a3b81c390 100644
--- a/cassandra-persistence/src/test/groovy/com/netflix/conductor/cassandra/dao/CassandraExecutionDAOSpec.groovy
+++ b/cassandra-persistence/src/test/groovy/com/netflix/conductor/cassandra/dao/CassandraExecutionDAOSpec.groovy
@@ -1,26 +1,26 @@
/*
- * Copyright 2021 Netflix, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License" you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
+ * Copyright 2022 Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
*/
-
package com.netflix.conductor.cassandra.dao
import com.netflix.conductor.common.metadata.events.EventExecution
-import com.netflix.conductor.common.metadata.tasks.Task
import com.netflix.conductor.common.metadata.tasks.TaskDef
import com.netflix.conductor.common.metadata.workflow.WorkflowDef
import com.netflix.conductor.common.metadata.workflow.WorkflowTask
-import com.netflix.conductor.common.run.Workflow
import com.netflix.conductor.core.exception.ApplicationException
import com.netflix.conductor.core.utils.IDGenerator
+import com.netflix.conductor.model.TaskModel
+import com.netflix.conductor.model.WorkflowModel
+
import spock.lang.Subject
import static com.netflix.conductor.common.metadata.events.EventExecution.Status.COMPLETED
@@ -40,8 +40,8 @@ class CassandraExecutionDAOSpec extends CassandraSpec {
def tasks = []
// create tasks for a workflow and add to list
- Task task1 = new Task(workflowInstanceId: 'uuid', taskId: 'task1id', referenceTaskName: 'task1')
- Task task2 = new Task(workflowInstanceId: 'uuid', taskId: 'task2id', referenceTaskName: 'task2')
+ TaskModel task1 = new TaskModel(workflowInstanceId: 'uuid', taskId: 'task1id', referenceTaskName: 'task1')
+ TaskModel task2 = new TaskModel(workflowInstanceId: 'uuid', taskId: 'task2id', referenceTaskName: 'task2')
tasks << task1 << task2
when:
@@ -52,7 +52,7 @@ class CassandraExecutionDAOSpec extends CassandraSpec {
and:
// add a task from a different workflow to the list
- Task task3 = new Task(workflowInstanceId: 'other-uuid', taskId: 'task3id', referenceTaskName: 'task3')
+ TaskModel task3 = new TaskModel(workflowInstanceId: 'other-uuid', taskId: 'task3id', referenceTaskName: 'task3')
tasks << task3
when:
@@ -69,11 +69,11 @@ class CassandraExecutionDAOSpec extends CassandraSpec {
WorkflowDef workflowDef = new WorkflowDef()
workflowDef.name = "def1"
workflowDef.setVersion(1)
- Workflow workflow = new Workflow()
+ WorkflowModel workflow = new WorkflowModel()
workflow.setWorkflowDefinition(workflowDef)
workflow.setWorkflowId(workflowId)
workflow.setInput(new HashMap<>())
- workflow.setStatus(Workflow.WorkflowStatus.RUNNING)
+ workflow.setStatus(WorkflowModel.Status.RUNNING)
workflow.setCreateTime(System.currentTimeMillis())
when:
@@ -85,14 +85,14 @@ class CassandraExecutionDAOSpec extends CassandraSpec {
when:
// read the workflow from the datastore
- Workflow found = executionDAO.getWorkflow(workflowId)
+ WorkflowModel found = executionDAO.getWorkflow(workflowId)
then:
workflow == found
and:
// update the workflow
- workflow.setStatus(Workflow.WorkflowStatus.COMPLETED)
+ workflow.setStatus(WorkflowModel.Status.COMPLETED)
executionDAO.updateWorkflow(workflow)
when:
@@ -120,18 +120,18 @@ class CassandraExecutionDAOSpec extends CassandraSpec {
given: 'we create a workflow'
String workflowId = IDGenerator.generate()
WorkflowDef workflowDef = new WorkflowDef(name: 'def1', version: 1)
- Workflow workflow = new Workflow(workflowDefinition: workflowDef, workflowId: workflowId, input: new HashMap(), status: Workflow.WorkflowStatus.RUNNING, createTime: System.currentTimeMillis())
+ WorkflowModel workflow = new WorkflowModel(workflowDefinition: workflowDef, workflowId: workflowId, input: new HashMap(), status: WorkflowModel.Status.RUNNING, createTime: System.currentTimeMillis())
executionDAO.createWorkflow(workflow)
and: 'create tasks for this workflow'
- Task task1 = new Task(workflowInstanceId: workflowId, taskType: 'task1', referenceTaskName: 'task1', status: Task.Status.SCHEDULED, taskId: IDGenerator.generate())
- Task task2 = new Task(workflowInstanceId: workflowId, taskType: 'task2', referenceTaskName: 'task2', status: Task.Status.SCHEDULED, taskId: IDGenerator.generate())
- Task task3 = new Task(workflowInstanceId: workflowId, taskType: 'task3', referenceTaskName: 'task3', status: Task.Status.SCHEDULED, taskId: IDGenerator.generate())
+ TaskModel task1 = new TaskModel(workflowInstanceId: workflowId, taskType: 'task1', referenceTaskName: 'task1', status: TaskModel.Status.SCHEDULED, taskId: IDGenerator.generate())
+ TaskModel task2 = new TaskModel(workflowInstanceId: workflowId, taskType: 'task2', referenceTaskName: 'task2', status: TaskModel.Status.SCHEDULED, taskId: IDGenerator.generate())
+ TaskModel task3 = new TaskModel(workflowInstanceId: workflowId, taskType: 'task3', referenceTaskName: 'task3', status: TaskModel.Status.SCHEDULED, taskId: IDGenerator.generate())
def taskList = [task1, task2, task3]
when: 'add the tasks to the datastore'
- List tasks = executionDAO.createTasks(taskList)
+ List tasks = executionDAO.createTasks(taskList)
then:
tasks != null
@@ -177,7 +177,7 @@ class CassandraExecutionDAOSpec extends CassandraSpec {
fetchedTasks != null && fetchedTasks.size() == 3
when: 'read workflow with tasks'
- Workflow found = executionDAO.getWorkflow(workflowId, true)
+ WorkflowModel found = executionDAO.getWorkflow(workflowId, true)
then:
found != null
@@ -192,21 +192,21 @@ class CassandraExecutionDAOSpec extends CassandraSpec {
given: 'we create a workflow'
String workflowId = IDGenerator.generate()
WorkflowDef workflowDef = new WorkflowDef(name: 'def1', version: 1)
- Workflow workflow = new Workflow(workflowDefinition: workflowDef, workflowId: workflowId, input: new HashMap(), status: Workflow.WorkflowStatus.RUNNING, createTime: System.currentTimeMillis())
+ WorkflowModel workflow = new WorkflowModel(workflowDefinition: workflowDef, workflowId: workflowId, input: new HashMap(), status: WorkflowModel.Status.RUNNING, createTime: System.currentTimeMillis())
executionDAO.createWorkflow(workflow)
and: 'create tasks for this workflow'
- Task task1 = new Task(workflowInstanceId: workflowId, taskType: 'task1', referenceTaskName: 'task1', status: Task.Status.SCHEDULED, taskId: IDGenerator.generate())
- Task task2 = new Task(workflowInstanceId: workflowId, taskType: 'task2', referenceTaskName: 'task2', status: Task.Status.SCHEDULED, taskId: IDGenerator.generate())
- Task task3 = new Task(workflowInstanceId: workflowId, taskType: 'task3', referenceTaskName: 'task3', status: Task.Status.SCHEDULED, taskId: IDGenerator.generate())
+ TaskModel task1 = new TaskModel(workflowInstanceId: workflowId, taskType: 'task1', referenceTaskName: 'task1', status: TaskModel.Status.SCHEDULED, taskId: IDGenerator.generate())
+ TaskModel task2 = new TaskModel(workflowInstanceId: workflowId, taskType: 'task2', referenceTaskName: 'task2', status: TaskModel.Status.SCHEDULED, taskId: IDGenerator.generate())
+ TaskModel task3 = new TaskModel(workflowInstanceId: workflowId, taskType: 'task3', referenceTaskName: 'task3', status: TaskModel.Status.SCHEDULED, taskId: IDGenerator.generate())
and: 'add the tasks to the datastore'
executionDAO.createTasks([task1, task2, task3])
and: 'change the status of those tasks'
- task1.setStatus(Task.Status.IN_PROGRESS)
- task2.setStatus(Task.Status.COMPLETED)
- task3.setStatus(Task.Status.FAILED)
+ task1.setStatus(TaskModel.Status.IN_PROGRESS)
+ task2.setStatus(TaskModel.Status.COMPLETED)
+ task3.setStatus(TaskModel.Status.FAILED)
when: 'update the tasks'
executionDAO.updateTask(task1)
@@ -214,12 +214,12 @@ class CassandraExecutionDAOSpec extends CassandraSpec {
executionDAO.updateTask(task3)
then:
- executionDAO.getTask(task1.taskId).status == Task.Status.IN_PROGRESS
- executionDAO.getTask(task2.taskId).status == Task.Status.COMPLETED
- executionDAO.getTask(task3.taskId).status == Task.Status.FAILED
+ executionDAO.getTask(task1.taskId).status == TaskModel.Status.IN_PROGRESS
+ executionDAO.getTask(task2.taskId).status == TaskModel.Status.COMPLETED
+ executionDAO.getTask(task3.taskId).status == TaskModel.Status.FAILED
when: 'get pending tasks for the workflow'
- List pendingTasks = executionDAO.getPendingTasksByWorkflow(task1.getTaskType(), workflowId)
+ List pendingTasks = executionDAO.getPendingTasksByWorkflow(task1.getTaskType(), workflowId)
then:
pendingTasks != null && pendingTasks.size() == 1
@@ -230,13 +230,13 @@ class CassandraExecutionDAOSpec extends CassandraSpec {
given: 'we create a workflow'
String workflowId = IDGenerator.generate()
WorkflowDef workflowDef = new WorkflowDef(name: 'def1', version: 1)
- Workflow workflow = new Workflow(workflowDefinition: workflowDef, workflowId: workflowId, input: new HashMap(), status: Workflow.WorkflowStatus.RUNNING, createTime: System.currentTimeMillis())
+ WorkflowModel workflow = new WorkflowModel(workflowDefinition: workflowDef, workflowId: workflowId, input: new HashMap(), status: WorkflowModel.Status.RUNNING, createTime: System.currentTimeMillis())
executionDAO.createWorkflow(workflow)
and: 'create tasks for this workflow'
- Task task1 = new Task(workflowInstanceId: workflowId, taskType: 'task1', referenceTaskName: 'task1', status: Task.Status.SCHEDULED, taskId: IDGenerator.generate())
- Task task2 = new Task(workflowInstanceId: workflowId, taskType: 'task2', referenceTaskName: 'task2', status: Task.Status.SCHEDULED, taskId: IDGenerator.generate())
- Task task3 = new Task(workflowInstanceId: workflowId, taskType: 'task3', referenceTaskName: 'task3', status: Task.Status.SCHEDULED, taskId: IDGenerator.generate())
+ TaskModel task1 = new TaskModel(workflowInstanceId: workflowId, taskType: 'task1', referenceTaskName: 'task1', status: TaskModel.Status.SCHEDULED, taskId: IDGenerator.generate())
+ TaskModel task2 = new TaskModel(workflowInstanceId: workflowId, taskType: 'task2', referenceTaskName: 'task2', status: TaskModel.Status.SCHEDULED, taskId: IDGenerator.generate())
+ TaskModel task3 = new TaskModel(workflowInstanceId: workflowId, taskType: 'task3', referenceTaskName: 'task3', status: TaskModel.Status.SCHEDULED, taskId: IDGenerator.generate())
and: 'add the tasks to the datastore'
executionDAO.createTasks([task1, task2, task3])
@@ -284,23 +284,23 @@ class CassandraExecutionDAOSpec extends CassandraSpec {
WorkflowTask workflowTask = new WorkflowTask(taskDefinition: taskDef)
workflowTask.setTaskDefinition(taskDef)
- Task task = new Task()
+ TaskModel task = new TaskModel()
task.taskDefName = taskDefName
task.taskId = taskId
task.workflowInstanceId = IDGenerator.generate()
task.setWorkflowTask(workflowTask)
task.setTaskType("test_task")
task.setWorkflowType("test_workflow")
- task.setStatus(Task.Status.SCHEDULED)
+ task.setStatus(TaskModel.Status.SCHEDULED)
- Task newTask = new Task()
+ TaskModel newTask = new TaskModel()
newTask.setTaskDefName(taskDefName)
newTask.setTaskId(IDGenerator.generate())
newTask.setWorkflowInstanceId(IDGenerator.generate())
newTask.setWorkflowTask(workflowTask)
newTask.setTaskType("test_task")
newTask.setWorkflowType("test_workflow")
- newTask.setStatus(Task.Status.SCHEDULED)
+ newTask.setStatus(TaskModel.Status.SCHEDULED)
when: // no tasks are IN_PROGRESS
executionDAO.addTaskToLimit(task)
@@ -309,7 +309,7 @@ class CassandraExecutionDAOSpec extends CassandraSpec {
!executionDAO.exceedsLimit(task)
when: // set a task to IN_PROGRESS
- task.setStatus(Task.Status.IN_PROGRESS)
+ task.setStatus(TaskModel.Status.IN_PROGRESS)
executionDAO.addTaskToLimit(task)
then: // same task is checked
@@ -319,14 +319,14 @@ class CassandraExecutionDAOSpec extends CassandraSpec {
executionDAO.exceedsLimit(newTask)
when: // set IN_PROGRESS task to COMPLETED
- task.setStatus(Task.Status.COMPLETED)
+ task.setStatus(TaskModel.Status.COMPLETED)
executionDAO.removeTaskFromLimit(task)
then: // check new task again
!executionDAO.exceedsLimit(newTask)
when: // set new task to IN_PROGRESS
- newTask.setStatus(Task.Status.IN_PROGRESS)
+ newTask.setStatus(TaskModel.Status.IN_PROGRESS)
executionDAO.addTaskToLimit(newTask)
then: // check new task again
diff --git a/cassandra-persistence/src/test/groovy/com/netflix/conductor/cassandra/dao/CassandraMetadataDAOSpec.groovy b/cassandra-persistence/src/test/groovy/com/netflix/conductor/cassandra/dao/CassandraMetadataDAOSpec.groovy
index 27049c2c21..5c06efe550 100644
--- a/cassandra-persistence/src/test/groovy/com/netflix/conductor/cassandra/dao/CassandraMetadataDAOSpec.groovy
+++ b/cassandra-persistence/src/test/groovy/com/netflix/conductor/cassandra/dao/CassandraMetadataDAOSpec.groovy
@@ -1,20 +1,20 @@
/*
- * Copyright 2021 Netflix, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
+ * Copyright 2022 Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
*/
-
package com.netflix.conductor.cassandra.dao
import com.netflix.conductor.common.metadata.tasks.TaskDef
import com.netflix.conductor.common.metadata.workflow.WorkflowDef
+
import spock.lang.Subject
class CassandraMetadataDAOSpec extends CassandraSpec {
diff --git a/cassandra-persistence/src/test/groovy/com/netflix/conductor/cassandra/dao/CassandraSpec.groovy b/cassandra-persistence/src/test/groovy/com/netflix/conductor/cassandra/dao/CassandraSpec.groovy
index d9531f2b36..a5393210bb 100644
--- a/cassandra-persistence/src/test/groovy/com/netflix/conductor/cassandra/dao/CassandraSpec.groovy
+++ b/cassandra-persistence/src/test/groovy/com/netflix/conductor/cassandra/dao/CassandraSpec.groovy
@@ -1,34 +1,35 @@
/*
- * Copyright 2021 Netflix, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
+ * Copyright 2022 Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
*/
-
package com.netflix.conductor.cassandra.dao
-import com.datastax.driver.core.ConsistencyLevel
-import com.datastax.driver.core.Session
-import com.fasterxml.jackson.databind.ObjectMapper
-import com.netflix.conductor.cassandra.config.CassandraProperties
-import com.netflix.conductor.cassandra.util.Statements
-import com.netflix.conductor.common.config.TestObjectMapperConfiguration
-import groovy.transform.PackageScope
+import java.time.Duration
+
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.test.context.ContextConfiguration
import org.testcontainers.containers.CassandraContainer
import org.testcontainers.spock.Testcontainers
+
+import com.netflix.conductor.cassandra.config.CassandraProperties
+import com.netflix.conductor.cassandra.util.Statements
+import com.netflix.conductor.common.config.TestObjectMapperConfiguration
+
+import com.datastax.driver.core.ConsistencyLevel
+import com.datastax.driver.core.Session
+import com.fasterxml.jackson.databind.ObjectMapper
+import groovy.transform.PackageScope
import spock.lang.Shared
import spock.lang.Specification
-import java.time.Duration
-
@ContextConfiguration(classes = [TestObjectMapperConfiguration.class])
@Testcontainers
@PackageScope
diff --git a/cassandra-persistence/src/test/groovy/com/netflix/conductor/cassandra/util/StatementsSpec.groovy b/cassandra-persistence/src/test/groovy/com/netflix/conductor/cassandra/util/StatementsSpec.groovy
index f674688b9e..f826a36208 100644
--- a/cassandra-persistence/src/test/groovy/com/netflix/conductor/cassandra/util/StatementsSpec.groovy
+++ b/cassandra-persistence/src/test/groovy/com/netflix/conductor/cassandra/util/StatementsSpec.groovy
@@ -1,16 +1,15 @@
/*
- * Copyright 2021 Netflix, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
+ * Copyright 2022 Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
*/
-
package com.netflix.conductor.cassandra.util
import spock.lang.Specification
diff --git a/client/src/main/java/com/netflix/conductor/client/http/TaskClient.java b/client/src/main/java/com/netflix/conductor/client/http/TaskClient.java
index 3e7ca3699b..88700d2b83 100644
--- a/client/src/main/java/com/netflix/conductor/client/http/TaskClient.java
+++ b/client/src/main/java/com/netflix/conductor/client/http/TaskClient.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2020 Netflix, Inc.
+ * Copyright 2022 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
@@ -248,12 +248,12 @@ public void evaluateAndUploadLargePayload(TaskResult taskResult, String taskType
MetricsContainer.recordTaskResultPayloadSize(taskType, taskResultSize);
long payloadSizeThreshold =
- conductorClientConfiguration.getTaskOutputPayloadThresholdKB() * 1024;
+ conductorClientConfiguration.getTaskOutputPayloadThresholdKB() * 1024L;
if (taskResultSize > payloadSizeThreshold) {
if (!conductorClientConfiguration.isExternalPayloadStorageEnabled()
|| taskResultSize
> conductorClientConfiguration.getTaskOutputMaxPayloadThresholdKB()
- * 1024) {
+ * 1024L) {
taskResult.setReasonForIncompletion(
String.format(
"The TaskResult payload size: %d is greater than the permissible %d MB",
diff --git a/common/src/main/java/com/netflix/conductor/common/metadata/tasks/Task.java b/common/src/main/java/com/netflix/conductor/common/metadata/tasks/Task.java
index d2957f62b6..82a6a1af21 100644
--- a/common/src/main/java/com/netflix/conductor/common/metadata/tasks/Task.java
+++ b/common/src/main/java/com/netflix/conductor/common/metadata/tasks/Task.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2021 Netflix, Inc.
+ * Copyright 2022 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
diff --git a/common/src/main/java/com/netflix/conductor/common/metadata/tasks/TaskResult.java b/common/src/main/java/com/netflix/conductor/common/metadata/tasks/TaskResult.java
index cf4e5c246c..d449176203 100644
--- a/common/src/main/java/com/netflix/conductor/common/metadata/tasks/TaskResult.java
+++ b/common/src/main/java/com/netflix/conductor/common/metadata/tasks/TaskResult.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2020 Netflix, Inc.
+ * Copyright 2022 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
diff --git a/common/src/main/java/com/netflix/conductor/common/run/Workflow.java b/common/src/main/java/com/netflix/conductor/common/run/Workflow.java
index dc9d410820..946ab90ed4 100644
--- a/common/src/main/java/com/netflix/conductor/common/run/Workflow.java
+++ b/common/src/main/java/com/netflix/conductor/common/run/Workflow.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2021 Netflix, Inc.
+ * Copyright 2022 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
diff --git a/contribs/src/main/java/com/netflix/conductor/contribs/dao/index/NoopIndexDAO.java b/contribs/src/main/java/com/netflix/conductor/contribs/dao/index/NoopIndexDAO.java
index dbbcfbd670..4b5f22c6b2 100644
--- a/contribs/src/main/java/com/netflix/conductor/contribs/dao/index/NoopIndexDAO.java
+++ b/contribs/src/main/java/com/netflix/conductor/contribs/dao/index/NoopIndexDAO.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2020 Netflix, Inc.
+ * Copyright 2022 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
@@ -17,10 +17,10 @@
import java.util.concurrent.CompletableFuture;
import com.netflix.conductor.common.metadata.events.EventExecution;
-import com.netflix.conductor.common.metadata.tasks.Task;
import com.netflix.conductor.common.metadata.tasks.TaskExecLog;
import com.netflix.conductor.common.run.SearchResult;
-import com.netflix.conductor.common.run.Workflow;
+import com.netflix.conductor.common.run.TaskSummary;
+import com.netflix.conductor.common.run.WorkflowSummary;
import com.netflix.conductor.core.events.queue.Message;
import com.netflix.conductor.dao.IndexDAO;
@@ -34,18 +34,18 @@ public class NoopIndexDAO implements IndexDAO {
public void setup() {}
@Override
- public void indexWorkflow(Workflow workflow) {}
+ public void indexWorkflow(WorkflowSummary workflowSummary) {}
@Override
- public CompletableFuture asyncIndexWorkflow(Workflow workflow) {
+ public CompletableFuture asyncIndexWorkflow(WorkflowSummary workflowSummary) {
return CompletableFuture.completedFuture(null);
}
@Override
- public void indexTask(Task task) {}
+ public void indexTask(TaskSummary taskSummary) {}
@Override
- public CompletableFuture asyncIndexTask(Task task) {
+ public CompletableFuture asyncIndexTask(TaskSummary taskSummary) {
return CompletableFuture.completedFuture(null);
}
diff --git a/contribs/src/main/java/com/netflix/conductor/contribs/dao/index/NoopIndexDAOConfiguration.java b/contribs/src/main/java/com/netflix/conductor/contribs/dao/index/NoopIndexDAOConfiguration.java
index 0835d50d0d..b9feec8de2 100644
--- a/contribs/src/main/java/com/netflix/conductor/contribs/dao/index/NoopIndexDAOConfiguration.java
+++ b/contribs/src/main/java/com/netflix/conductor/contribs/dao/index/NoopIndexDAOConfiguration.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2020 Netflix, Inc.
+ * Copyright 2022 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
diff --git a/contribs/src/main/java/com/netflix/conductor/contribs/listener/archive/ArchivingWithTTLWorkflowStatusListener.java b/contribs/src/main/java/com/netflix/conductor/contribs/listener/archive/ArchivingWithTTLWorkflowStatusListener.java
index e80bfcd2da..69b50d6a38 100644
--- a/contribs/src/main/java/com/netflix/conductor/contribs/listener/archive/ArchivingWithTTLWorkflowStatusListener.java
+++ b/contribs/src/main/java/com/netflix/conductor/contribs/listener/archive/ArchivingWithTTLWorkflowStatusListener.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2020 Netflix, Inc.
+ * Copyright 2022 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
@@ -20,10 +20,10 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.netflix.conductor.common.run.Workflow;
+import com.netflix.conductor.core.dal.ExecutionDAOFacade;
import com.netflix.conductor.core.listener.WorkflowStatusListener;
-import com.netflix.conductor.core.orchestration.ExecutionDAOFacade;
import com.netflix.conductor.metrics.Monitors;
+import com.netflix.conductor.model.WorkflowModel;
public class ArchivingWithTTLWorkflowStatusListener implements WorkflowStatusListener {
@@ -75,7 +75,7 @@ public void shutdownExecutorService() {
}
@Override
- public void onWorkflowCompleted(Workflow workflow) {
+ public void onWorkflowCompleted(WorkflowModel workflow) {
LOGGER.info("Archiving workflow {} on completion ", workflow.getWorkflowId());
if (delayArchiveSeconds > 0) {
scheduledThreadPoolExecutor.schedule(
@@ -90,7 +90,7 @@ public void onWorkflowCompleted(Workflow workflow) {
}
@Override
- public void onWorkflowTerminated(Workflow workflow) {
+ public void onWorkflowTerminated(WorkflowModel workflow) {
LOGGER.info("Archiving workflow {} on termination", workflow.getWorkflowId());
if (delayArchiveSeconds > 0) {
scheduledThreadPoolExecutor.schedule(
@@ -108,10 +108,10 @@ private class DelayArchiveWorkflow implements Runnable {
private final String workflowId;
private final String workflowName;
- private final Workflow.WorkflowStatus status;
+ private final WorkflowModel.Status status;
private final ExecutionDAOFacade executionDAOFacade;
- DelayArchiveWorkflow(Workflow workflow, ExecutionDAOFacade executionDAOFacade) {
+ DelayArchiveWorkflow(WorkflowModel workflow, ExecutionDAOFacade executionDAOFacade) {
this.workflowId = workflow.getWorkflowId();
this.workflowName = workflow.getWorkflowName();
this.status = workflow.getStatus();
diff --git a/contribs/src/main/java/com/netflix/conductor/contribs/listener/archive/ArchivingWorkflowListenerConfiguration.java b/contribs/src/main/java/com/netflix/conductor/contribs/listener/archive/ArchivingWorkflowListenerConfiguration.java
index 482e63d739..c98c0c173b 100644
--- a/contribs/src/main/java/com/netflix/conductor/contribs/listener/archive/ArchivingWorkflowListenerConfiguration.java
+++ b/contribs/src/main/java/com/netflix/conductor/contribs/listener/archive/ArchivingWorkflowListenerConfiguration.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2020 Netflix, Inc.
+ * Copyright 2022 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
@@ -17,8 +17,8 @@
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
+import com.netflix.conductor.core.dal.ExecutionDAOFacade;
import com.netflix.conductor.core.listener.WorkflowStatusListener;
-import com.netflix.conductor.core.orchestration.ExecutionDAOFacade;
@Configuration
@EnableConfigurationProperties(ArchivingWorkflowListenerProperties.class)
diff --git a/contribs/src/main/java/com/netflix/conductor/contribs/listener/archive/ArchivingWorkflowListenerProperties.java b/contribs/src/main/java/com/netflix/conductor/contribs/listener/archive/ArchivingWorkflowListenerProperties.java
index 90076089ff..dfd57d3017 100644
--- a/contribs/src/main/java/com/netflix/conductor/contribs/listener/archive/ArchivingWorkflowListenerProperties.java
+++ b/contribs/src/main/java/com/netflix/conductor/contribs/listener/archive/ArchivingWorkflowListenerProperties.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2020 Netflix, Inc.
+ * Copyright 2022 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
diff --git a/contribs/src/main/java/com/netflix/conductor/contribs/listener/archive/ArchivingWorkflowStatusListener.java b/contribs/src/main/java/com/netflix/conductor/contribs/listener/archive/ArchivingWorkflowStatusListener.java
index f84c99f705..f1fe98cca8 100644
--- a/contribs/src/main/java/com/netflix/conductor/contribs/listener/archive/ArchivingWorkflowStatusListener.java
+++ b/contribs/src/main/java/com/netflix/conductor/contribs/listener/archive/ArchivingWorkflowStatusListener.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2020 Netflix, Inc.
+ * Copyright 2022 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
@@ -15,10 +15,10 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.netflix.conductor.common.run.Workflow;
+import com.netflix.conductor.core.dal.ExecutionDAOFacade;
import com.netflix.conductor.core.listener.WorkflowStatusListener;
-import com.netflix.conductor.core.orchestration.ExecutionDAOFacade;
import com.netflix.conductor.metrics.Monitors;
+import com.netflix.conductor.model.WorkflowModel;
/**
* Provides default implementation of workflow archiving immediately after workflow is completed or
@@ -37,14 +37,14 @@ public ArchivingWorkflowStatusListener(ExecutionDAOFacade executionDAOFacade) {
}
@Override
- public void onWorkflowCompleted(Workflow workflow) {
+ public void onWorkflowCompleted(WorkflowModel workflow) {
LOGGER.info("Archiving workflow {} on completion ", workflow.getWorkflowId());
this.executionDAOFacade.removeWorkflow(workflow.getWorkflowId(), true);
Monitors.recordWorkflowArchived(workflow.getWorkflowName(), workflow.getStatus());
}
@Override
- public void onWorkflowTerminated(Workflow workflow) {
+ public void onWorkflowTerminated(WorkflowModel workflow) {
LOGGER.info("Archiving workflow {} on termination", workflow.getWorkflowId());
this.executionDAOFacade.removeWorkflow(workflow.getWorkflowId(), true);
Monitors.recordWorkflowArchived(workflow.getWorkflowName(), workflow.getStatus());
diff --git a/contribs/src/main/java/com/netflix/conductor/contribs/listener/conductorqueue/ConductorQueueStatusPublisher.java b/contribs/src/main/java/com/netflix/conductor/contribs/listener/conductorqueue/ConductorQueueStatusPublisher.java
index 45060f322d..c11ef06103 100644
--- a/contribs/src/main/java/com/netflix/conductor/contribs/listener/conductorqueue/ConductorQueueStatusPublisher.java
+++ b/contribs/src/main/java/com/netflix/conductor/contribs/listener/conductorqueue/ConductorQueueStatusPublisher.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2020 Netflix, Inc.
+ * Copyright 2022 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
@@ -17,11 +17,12 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.netflix.conductor.common.run.Workflow;
import com.netflix.conductor.common.run.WorkflowSummary;
+import com.netflix.conductor.core.dal.ModelMapper;
import com.netflix.conductor.core.events.queue.Message;
import com.netflix.conductor.core.listener.WorkflowStatusListener;
import com.netflix.conductor.dao.QueueDAO;
+import com.netflix.conductor.model.WorkflowModel;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
@@ -35,6 +36,7 @@ public class ConductorQueueStatusPublisher implements WorkflowStatusListener {
private static final Logger LOGGER =
LoggerFactory.getLogger(ConductorQueueStatusPublisher.class);
private final QueueDAO queueDAO;
+ private final ModelMapper modelMapper;
private final ObjectMapper objectMapper;
private final String successStatusQueue;
@@ -43,9 +45,11 @@ public class ConductorQueueStatusPublisher implements WorkflowStatusListener {
public ConductorQueueStatusPublisher(
QueueDAO queueDAO,
+ ModelMapper modelMapper,
ObjectMapper objectMapper,
ConductorQueueStatusPublisherProperties properties) {
this.queueDAO = queueDAO;
+ this.modelMapper = modelMapper;
this.objectMapper = objectMapper;
this.successStatusQueue = properties.getSuccessQueue();
this.failureStatusQueue = properties.getFailureQueue();
@@ -53,26 +57,26 @@ public ConductorQueueStatusPublisher(
}
@Override
- public void onWorkflowCompleted(Workflow workflow) {
+ public void onWorkflowCompleted(WorkflowModel workflow) {
LOGGER.info("Publishing callback of workflow {} on completion ", workflow.getWorkflowId());
queueDAO.push(successStatusQueue, Collections.singletonList(workflowToMessage(workflow)));
}
@Override
- public void onWorkflowTerminated(Workflow workflow) {
+ public void onWorkflowTerminated(WorkflowModel workflow) {
LOGGER.info("Publishing callback of workflow {} on termination", workflow.getWorkflowId());
queueDAO.push(failureStatusQueue, Collections.singletonList(workflowToMessage(workflow)));
}
@Override
- public void onWorkflowFinalized(Workflow workflow) {
+ public void onWorkflowFinalized(WorkflowModel workflow) {
LOGGER.info("Publishing callback of workflow {} on finalization", workflow.getWorkflowId());
queueDAO.push(finalizeStatusQueue, Collections.singletonList(workflowToMessage(workflow)));
}
- private Message workflowToMessage(Workflow workflow) {
+ private Message workflowToMessage(WorkflowModel workflow) {
String jsonWfSummary;
- WorkflowSummary summary = new WorkflowSummary(workflow);
+ WorkflowSummary summary = new WorkflowSummary(modelMapper.getWorkflow(workflow));
try {
jsonWfSummary = objectMapper.writeValueAsString(summary);
} catch (JsonProcessingException e) {
diff --git a/contribs/src/main/java/com/netflix/conductor/contribs/listener/conductorqueue/ConductorQueueStatusPublisherConfiguration.java b/contribs/src/main/java/com/netflix/conductor/contribs/listener/conductorqueue/ConductorQueueStatusPublisherConfiguration.java
index 13d8d0a934..12dda6c444 100644
--- a/contribs/src/main/java/com/netflix/conductor/contribs/listener/conductorqueue/ConductorQueueStatusPublisherConfiguration.java
+++ b/contribs/src/main/java/com/netflix/conductor/contribs/listener/conductorqueue/ConductorQueueStatusPublisherConfiguration.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2020 Netflix, Inc.
+ * Copyright 2022 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
@@ -17,6 +17,7 @@
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
+import com.netflix.conductor.core.dal.ModelMapper;
import com.netflix.conductor.core.listener.WorkflowStatusListener;
import com.netflix.conductor.dao.QueueDAO;
@@ -32,8 +33,9 @@ public class ConductorQueueStatusPublisherConfiguration {
@Bean
public WorkflowStatusListener getWorkflowStatusListener(
QueueDAO queueDAO,
+ ModelMapper modelMapper,
ConductorQueueStatusPublisherProperties properties,
ObjectMapper objectMapper) {
- return new ConductorQueueStatusPublisher(queueDAO, objectMapper, properties);
+ return new ConductorQueueStatusPublisher(queueDAO, modelMapper, objectMapper, properties);
}
}
diff --git a/contribs/src/main/java/com/netflix/conductor/contribs/listener/conductorqueue/ConductorQueueStatusPublisherProperties.java b/contribs/src/main/java/com/netflix/conductor/contribs/listener/conductorqueue/ConductorQueueStatusPublisherProperties.java
index e04d2fc2f0..ea9a53f743 100644
--- a/contribs/src/main/java/com/netflix/conductor/contribs/listener/conductorqueue/ConductorQueueStatusPublisherProperties.java
+++ b/contribs/src/main/java/com/netflix/conductor/contribs/listener/conductorqueue/ConductorQueueStatusPublisherProperties.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2020 Netflix, Inc.
+ * Copyright 2022 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
diff --git a/contribs/src/main/java/com/netflix/conductor/contribs/queue/sqs/SQSObservableQueue.java b/contribs/src/main/java/com/netflix/conductor/contribs/queue/sqs/SQSObservableQueue.java
index 8ff19a0617..e4603791de 100644
--- a/contribs/src/main/java/com/netflix/conductor/contribs/queue/sqs/SQSObservableQueue.java
+++ b/contribs/src/main/java/com/netflix/conductor/contribs/queue/sqs/SQSObservableQueue.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2020 Netflix, Inc.
+ * Copyright 2022 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
diff --git a/contribs/src/main/java/com/netflix/conductor/contribs/queue/sqs/config/SQSEventQueueConfiguration.java b/contribs/src/main/java/com/netflix/conductor/contribs/queue/sqs/config/SQSEventQueueConfiguration.java
index 6de5573159..412bbb8cb9 100644
--- a/contribs/src/main/java/com/netflix/conductor/contribs/queue/sqs/config/SQSEventQueueConfiguration.java
+++ b/contribs/src/main/java/com/netflix/conductor/contribs/queue/sqs/config/SQSEventQueueConfiguration.java
@@ -32,7 +32,6 @@
import com.amazonaws.services.sqs.AmazonSQSClient;
import rx.Scheduler;
-@SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection")
@Configuration
@EnableConfigurationProperties(SQSEventQueueProperties.class)
@ConditionalOnProperty(name = "conductor.event-queues.sqs.enabled", havingValue = "true")
diff --git a/contribs/src/main/java/com/netflix/conductor/contribs/storage/S3PayloadStorage.java b/contribs/src/main/java/com/netflix/conductor/contribs/storage/S3PayloadStorage.java
index eb9d7dd97d..e79bb6ebca 100644
--- a/contribs/src/main/java/com/netflix/conductor/contribs/storage/S3PayloadStorage.java
+++ b/contribs/src/main/java/com/netflix/conductor/contribs/storage/S3PayloadStorage.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2020 Netflix, Inc.
+ * Copyright 2022 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
@@ -38,7 +38,9 @@
/**
* An implementation of {@link ExternalPayloadStorage} using AWS S3 for storing large JSON payload
- * data. The S3 client assumes that access to S3 is configured on the instance.
+ * data.
+ *
+ *
NOTE: The S3 client assumes that access to S3 is configured on the instance.
*
* @see DefaultAWSCredentialsProviderChain
diff --git a/contribs/src/main/java/com/netflix/conductor/contribs/storage/config/S3Configuration.java b/contribs/src/main/java/com/netflix/conductor/contribs/storage/config/S3Configuration.java
index bb9ca31132..a52ec750cb 100644
--- a/contribs/src/main/java/com/netflix/conductor/contribs/storage/config/S3Configuration.java
+++ b/contribs/src/main/java/com/netflix/conductor/contribs/storage/config/S3Configuration.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2020 Netflix, Inc.
+ * Copyright 2022 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
diff --git a/contribs/src/main/java/com/netflix/conductor/contribs/tasks/http/DefaultRestTemplateProvider.java b/contribs/src/main/java/com/netflix/conductor/contribs/tasks/http/DefaultRestTemplateProvider.java
index 35a9a15e39..670f260638 100644
--- a/contribs/src/main/java/com/netflix/conductor/contribs/tasks/http/DefaultRestTemplateProvider.java
+++ b/contribs/src/main/java/com/netflix/conductor/contribs/tasks/http/DefaultRestTemplateProvider.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2020 Netflix, Inc.
+ * Copyright 2022 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
diff --git a/contribs/src/main/java/com/netflix/conductor/contribs/tasks/http/HttpTask.java b/contribs/src/main/java/com/netflix/conductor/contribs/tasks/http/HttpTask.java
index 2fce5ac87f..f0f42eaf4d 100644
--- a/contribs/src/main/java/com/netflix/conductor/contribs/tasks/http/HttpTask.java
+++ b/contribs/src/main/java/com/netflix/conductor/contribs/tasks/http/HttpTask.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2021 Netflix, Inc.
+ * Copyright 2022 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
@@ -21,22 +21,17 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.http.HttpEntity;
-import org.springframework.http.HttpHeaders;
-import org.springframework.http.HttpMethod;
-import org.springframework.http.MediaType;
-import org.springframework.http.ResponseEntity;
+import org.springframework.http.*;
import org.springframework.stereotype.Component;
import org.springframework.util.MultiValueMap;
import org.springframework.web.client.RestClientException;
import org.springframework.web.client.RestTemplate;
-import com.netflix.conductor.common.metadata.tasks.Task;
-import com.netflix.conductor.common.metadata.tasks.Task.Status;
-import com.netflix.conductor.common.run.Workflow;
import com.netflix.conductor.core.execution.WorkflowExecutor;
import com.netflix.conductor.core.execution.tasks.WorkflowSystemTask;
import com.netflix.conductor.core.utils.Utils;
+import com.netflix.conductor.model.TaskModel;
+import com.netflix.conductor.model.WorkflowModel;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
@@ -79,12 +74,12 @@ public HttpTask(
}
@Override
- public void start(Workflow workflow, Task task, WorkflowExecutor executor) {
+ public void start(WorkflowModel workflow, TaskModel task, WorkflowExecutor executor) {
Object request = task.getInputData().get(requestParameter);
task.setWorkerId(Utils.getServerId());
if (request == null) {
task.setReasonForIncompletion(MISSING_REQUEST);
- task.setStatus(Status.FAILED);
+ task.setStatus(TaskModel.Status.FAILED);
return;
}
@@ -93,14 +88,14 @@ public void start(Workflow workflow, Task task, WorkflowExecutor executor) {
String reason =
"Missing HTTP URI. See documentation for HttpTask for required input parameters";
task.setReasonForIncompletion(reason);
- task.setStatus(Status.FAILED);
+ task.setStatus(TaskModel.Status.FAILED);
return;
}
if (input.getMethod() == null) {
String reason = "No HTTP method specified";
task.setReasonForIncompletion(reason);
- task.setStatus(Status.FAILED);
+ task.setStatus(TaskModel.Status.FAILED);
return;
}
@@ -113,9 +108,9 @@ public void start(Workflow workflow, Task task, WorkflowExecutor executor) {
task.getTaskId());
if (response.statusCode > 199 && response.statusCode < 300) {
if (isAsyncComplete(task)) {
- task.setStatus(Status.IN_PROGRESS);
+ task.setStatus(TaskModel.Status.IN_PROGRESS);
} else {
- task.setStatus(Status.COMPLETED);
+ task.setStatus(TaskModel.Status.COMPLETED);
}
} else {
if (response.body != null) {
@@ -123,7 +118,7 @@ public void start(Workflow workflow, Task task, WorkflowExecutor executor) {
} else {
task.setReasonForIncompletion("No response from the remote service");
}
- task.setStatus(Status.FAILED);
+ task.setStatus(TaskModel.Status.FAILED);
}
//noinspection ConstantConditions
if (response != null) {
@@ -139,7 +134,7 @@ public void start(Workflow workflow, Task task, WorkflowExecutor executor) {
input.getVipAddress(),
task.getWorkflowInstanceId(),
e);
- task.setStatus(Status.FAILED);
+ task.setStatus(TaskModel.Status.FAILED);
task.setReasonForIncompletion(
"Failed to invoke " + getTaskType() + " task due to: " + e);
task.getOutputData().put("response", e.toString());
@@ -206,13 +201,13 @@ private Object extractBody(String responseBody) {
}
@Override
- public boolean execute(Workflow workflow, Task task, WorkflowExecutor executor) {
+ public boolean execute(WorkflowModel workflow, TaskModel task, WorkflowExecutor executor) {
return false;
}
@Override
- public void cancel(Workflow workflow, Task task, WorkflowExecutor executor) {
- task.setStatus(Status.CANCELED);
+ public void cancel(WorkflowModel workflow, TaskModel task, WorkflowExecutor executor) {
+ task.setStatus(TaskModel.Status.CANCELED);
}
@Override
diff --git a/contribs/src/main/java/com/netflix/conductor/contribs/tasks/http/RestTemplateProvider.java b/contribs/src/main/java/com/netflix/conductor/contribs/tasks/http/RestTemplateProvider.java
index 665be8caab..95c648b079 100644
--- a/contribs/src/main/java/com/netflix/conductor/contribs/tasks/http/RestTemplateProvider.java
+++ b/contribs/src/main/java/com/netflix/conductor/contribs/tasks/http/RestTemplateProvider.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2020 Netflix, Inc.
+ * Copyright 2022 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
diff --git a/contribs/src/main/java/com/netflix/conductor/contribs/tasks/json/JsonJqTransform.java b/contribs/src/main/java/com/netflix/conductor/contribs/tasks/json/JsonJqTransform.java
index 9ec9f2f43d..26520edaf4 100644
--- a/contribs/src/main/java/com/netflix/conductor/contribs/tasks/json/JsonJqTransform.java
+++ b/contribs/src/main/java/com/netflix/conductor/contribs/tasks/json/JsonJqTransform.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2020 Netflix, Inc.
+ * Copyright 2022 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
@@ -22,10 +22,10 @@
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
-import com.netflix.conductor.common.metadata.tasks.Task;
-import com.netflix.conductor.common.run.Workflow;
import com.netflix.conductor.core.execution.WorkflowExecutor;
import com.netflix.conductor.core.execution.tasks.WorkflowSystemTask;
+import com.netflix.conductor.model.TaskModel;
+import com.netflix.conductor.model.WorkflowModel;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
@@ -58,7 +58,7 @@ public JsonJqTransform(ObjectMapper objectMapper) {
}
@Override
- public void start(Workflow workflow, Task task, WorkflowExecutor executor) {
+ public void start(WorkflowModel workflow, TaskModel task, WorkflowExecutor executor) {
final Map taskInput = task.getInputData();
final Map taskOutput = task.getOutputData();
@@ -67,7 +67,7 @@ public void start(Workflow workflow, Task task, WorkflowExecutor executor) {
if (queryExpression == null) {
task.setReasonForIncompletion(
"Missing '" + QUERY_EXPRESSION_PARAMETER + "' in input parameters");
- task.setStatus(Task.Status.FAILED);
+ task.setStatus(TaskModel.Status.FAILED);
return;
}
@@ -79,7 +79,7 @@ public void start(Workflow workflow, Task task, WorkflowExecutor executor) {
final List result = query.apply(childScope, input);
- task.setStatus(Task.Status.COMPLETED);
+ task.setStatus(TaskModel.Status.COMPLETED);
if (result == null) {
taskOutput.put(OUTPUT_RESULT, null);
taskOutput.put(OUTPUT_RESULT_LIST, null);
@@ -96,7 +96,7 @@ public void start(Workflow workflow, Task task, WorkflowExecutor executor) {
task.getTaskId(),
workflow.getWorkflowId(),
e);
- task.setStatus(Task.Status.FAILED);
+ task.setStatus(TaskModel.Status.FAILED);
final String message = extractFirstValidMessage(e);
task.setReasonForIncompletion(message);
taskOutput.put(OUTPUT_ERROR, message);
diff --git a/contribs/src/main/java/com/netflix/conductor/contribs/tasks/kafka/KafkaProducerManager.java b/contribs/src/main/java/com/netflix/conductor/contribs/tasks/kafka/KafkaProducerManager.java
index db442a8452..4095af478e 100644
--- a/contribs/src/main/java/com/netflix/conductor/contribs/tasks/kafka/KafkaProducerManager.java
+++ b/contribs/src/main/java/com/netflix/conductor/contribs/tasks/kafka/KafkaProducerManager.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2020 Netflix, Inc.
+ * Copyright 2022 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
diff --git a/contribs/src/main/java/com/netflix/conductor/contribs/tasks/kafka/KafkaPublishTask.java b/contribs/src/main/java/com/netflix/conductor/contribs/tasks/kafka/KafkaPublishTask.java
index 21f779ca6a..8ec91a0396 100644
--- a/contribs/src/main/java/com/netflix/conductor/contribs/tasks/kafka/KafkaPublishTask.java
+++ b/contribs/src/main/java/com/netflix/conductor/contribs/tasks/kafka/KafkaPublishTask.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2021 Netflix, Inc.
+ * Copyright 2022 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
@@ -34,11 +34,11 @@
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
-import com.netflix.conductor.common.metadata.tasks.Task;
-import com.netflix.conductor.common.run.Workflow;
import com.netflix.conductor.core.execution.WorkflowExecutor;
import com.netflix.conductor.core.execution.tasks.WorkflowSystemTask;
import com.netflix.conductor.core.utils.Utils;
+import com.netflix.conductor.model.TaskModel;
+import com.netflix.conductor.model.WorkflowModel;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
@@ -76,7 +76,7 @@ public KafkaPublishTask(KafkaProducerManager clientManager, ObjectMapper objectM
}
@Override
- public void start(Workflow workflow, Task task, WorkflowExecutor executor) {
+ public void start(WorkflowModel workflow, TaskModel task, WorkflowExecutor executor) {
long taskStartMillis = Instant.now().toEpochMilli();
task.setWorkerId(Utils.getServerId());
@@ -109,9 +109,9 @@ public void start(Workflow workflow, Task task, WorkflowExecutor executor) {
try {
recordMetaDataFuture.get();
if (isAsyncComplete(task)) {
- task.setStatus(Task.Status.IN_PROGRESS);
+ task.setStatus(TaskModel.Status.IN_PROGRESS);
} else {
- task.setStatus(Task.Status.COMPLETED);
+ task.setStatus(TaskModel.Status.COMPLETED);
}
long timeTakenToCompleteTask = Instant.now().toEpochMilli() - taskStartMillis;
LOGGER.debug("Published message {}, Time taken {}", input, timeTakenToCompleteTask);
@@ -133,9 +133,9 @@ public void start(Workflow workflow, Task task, WorkflowExecutor executor) {
}
}
- private void markTaskAsFailed(Task task, String reasonForIncompletion) {
+ private void markTaskAsFailed(TaskModel task, String reasonForIncompletion) {
task.setReasonForIncompletion(reasonForIncompletion);
- task.setStatus(Task.Status.FAILED);
+ task.setStatus(TaskModel.Status.FAILED);
}
/**
@@ -195,13 +195,13 @@ Object getKey(Input input) {
}
@Override
- public boolean execute(Workflow workflow, Task task, WorkflowExecutor executor) {
+ public boolean execute(WorkflowModel workflow, TaskModel task, WorkflowExecutor executor) {
return false;
}
@Override
- public void cancel(Workflow workflow, Task task, WorkflowExecutor executor) {
- task.setStatus(Task.Status.CANCELED);
+ public void cancel(WorkflowModel workflow, TaskModel task, WorkflowExecutor executor) {
+ task.setStatus(TaskModel.Status.CANCELED);
}
@Override
diff --git a/contribs/src/test/java/com/netflix/conductor/contribs/listener/ArchivingWorkflowStatusListenerTest.java b/contribs/src/test/java/com/netflix/conductor/contribs/listener/ArchivingWorkflowStatusListenerTest.java
index 3d58db5be5..9a89e86deb 100644
--- a/contribs/src/test/java/com/netflix/conductor/contribs/listener/ArchivingWorkflowStatusListenerTest.java
+++ b/contribs/src/test/java/com/netflix/conductor/contribs/listener/ArchivingWorkflowStatusListenerTest.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2021 Netflix, Inc.
+ * Copyright 2022 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
@@ -19,24 +19,22 @@
import org.mockito.Mockito;
import com.netflix.conductor.common.metadata.workflow.WorkflowDef;
-import com.netflix.conductor.common.run.Workflow;
import com.netflix.conductor.contribs.listener.archive.ArchivingWorkflowStatusListener;
-import com.netflix.conductor.core.orchestration.ExecutionDAOFacade;
+import com.netflix.conductor.core.dal.ExecutionDAOFacade;
+import com.netflix.conductor.model.WorkflowModel;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.*;
/** @author pavel.halabala */
public class ArchivingWorkflowStatusListenerTest {
- Workflow workflow;
+ WorkflowModel workflow;
ExecutionDAOFacade executionDAOFacade;
ArchivingWorkflowStatusListener listener;
@Before
public void before() {
- workflow = new Workflow();
+ workflow = new WorkflowModel();
WorkflowDef def = new WorkflowDef();
def.setName("name1");
def.setVersion(1);
diff --git a/contribs/src/test/java/com/netflix/conductor/contribs/queue/sqs/DefaultEventQueueProcessorTest.java b/contribs/src/test/java/com/netflix/conductor/contribs/queue/sqs/DefaultEventQueueProcessorTest.java
index 1b6c619db7..f661e62b33 100644
--- a/contribs/src/test/java/com/netflix/conductor/contribs/queue/sqs/DefaultEventQueueProcessorTest.java
+++ b/contribs/src/test/java/com/netflix/conductor/contribs/queue/sqs/DefaultEventQueueProcessorTest.java
@@ -30,6 +30,7 @@
import com.netflix.conductor.common.config.TestObjectMapperConfiguration;
import com.netflix.conductor.common.metadata.tasks.Task;
import com.netflix.conductor.common.metadata.tasks.Task.Status;
+import com.netflix.conductor.common.metadata.tasks.TaskResult;
import com.netflix.conductor.common.run.Workflow;
import com.netflix.conductor.core.events.queue.DefaultEventQueueProcessor;
import com.netflix.conductor.core.events.queue.Message;
@@ -63,7 +64,7 @@ public class DefaultEventQueueProcessorTest {
@Autowired private ObjectMapper objectMapper;
private static final List messages = new LinkedList<>();
- private static final List updatedTasks = new LinkedList<>();
+ private static final List updatedTasks = new LinkedList<>();
@Before
public void init() {
@@ -129,11 +130,11 @@ public static void setup() {
doAnswer(
(Answer)
invocation -> {
- updatedTasks.add(invocation.getArgument(0, Task.class));
+ updatedTasks.add(invocation.getArgument(0, TaskResult.class));
return null;
})
.when(executionService)
- .updateTask(any(Task.class));
+ .updateTask(any(TaskResult.class));
}
@Test
diff --git a/contribs/src/test/java/com/netflix/conductor/contribs/tasks/http/DefaultRestTemplateProviderTest.java b/contribs/src/test/java/com/netflix/conductor/contribs/tasks/http/DefaultRestTemplateProviderTest.java
index 480e97588b..31e5ef2d45 100644
--- a/contribs/src/test/java/com/netflix/conductor/contribs/tasks/http/DefaultRestTemplateProviderTest.java
+++ b/contribs/src/test/java/com/netflix/conductor/contribs/tasks/http/DefaultRestTemplateProviderTest.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2020 Netflix, Inc.
+ * Copyright 2022 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
diff --git a/contribs/src/test/java/com/netflix/conductor/contribs/tasks/http/HttpTaskTest.java b/contribs/src/test/java/com/netflix/conductor/contribs/tasks/http/HttpTaskTest.java
index f87eb6022b..065b492353 100644
--- a/contribs/src/test/java/com/netflix/conductor/contribs/tasks/http/HttpTaskTest.java
+++ b/contribs/src/test/java/com/netflix/conductor/contribs/tasks/http/HttpTaskTest.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2021 Netflix, Inc.
+ * Copyright 2022 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
@@ -12,343 +12,356 @@
*/
package com.netflix.conductor.contribs.tasks.http;
-import org.junit.Ignore;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import org.junit.*;
+import org.mockserver.client.MockServerClient;
+import org.mockserver.model.MediaType;
+import org.testcontainers.containers.MockServerContainer;
+import org.testcontainers.utility.DockerImageName;
+
+import com.netflix.conductor.common.metadata.tasks.TaskType;
+import com.netflix.conductor.common.metadata.workflow.WorkflowDef;
+import com.netflix.conductor.common.metadata.workflow.WorkflowTask;
+import com.netflix.conductor.core.execution.DeciderService;
+import com.netflix.conductor.core.execution.WorkflowExecutor;
+import com.netflix.conductor.core.execution.tasks.SystemTaskRegistry;
+import com.netflix.conductor.core.utils.ExternalPayloadStorageUtils;
+import com.netflix.conductor.core.utils.ParametersUtils;
+import com.netflix.conductor.dao.MetadataDAO;
+import com.netflix.conductor.model.TaskModel;
+import com.netflix.conductor.model.WorkflowModel;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.mock;
+import static org.mockserver.model.HttpRequest.request;
+import static org.mockserver.model.HttpResponse.response;
@SuppressWarnings("unchecked")
-@Ignore // Test causes "OutOfMemoryError" error during build
+// @Ignore // Test causes "OutOfMemoryError" error during build
public class HttpTaskTest {
- // private static final String ERROR_RESPONSE = "Something went wrong!";
- // private static final String TEXT_RESPONSE = "Text Response";
- // private static final double NUM_RESPONSE = 42.42d;
- //
- // private HttpTask httpTask;
- // private WorkflowExecutor workflowExecutor;
- // private final Workflow workflow = new Workflow();
- //
- // private static final ObjectMapper objectMapper = new ObjectMapper();
- // private static String JSON_RESPONSE;
- //
- // @ClassRule
- // public static MockServerContainer mockServer = new MockServerContainer(
- // DockerImageName.parse("mockserver/mockserver"));
- //
- // @BeforeClass
- // public static void init() throws Exception {
- // Map map = new HashMap<>();
- // map.put("key", "value1");
- // map.put("num", 42);
- // map.put("SomeKey", null);
- // JSON_RESPONSE = objectMapper.writeValueAsString(map);
- //
- // final TypeReference