diff --git a/server/build.gradle b/server/build.gradle index b69127d..2c9caa8 100644 --- a/server/build.gradle +++ b/server/build.gradle @@ -26,6 +26,8 @@ dependencies { implementation project(":orkes-conductor-archive") implementation project(":orkes-conductor-persistence") + implementation 'io.orkes.conductor:orkes-conductor-common-protos:0.9.2' + //aws implementation "com.amazonaws:aws-java-sdk-core:${versions.revAwsSdk}" diff --git a/server/src/main/java/com/netflix/conductor/core/execution/OrkesWorkflowExecutor.java b/server/src/main/java/com/netflix/conductor/core/execution/OrkesWorkflowExecutor.java index 8606da1..1daf196 100644 --- a/server/src/main/java/com/netflix/conductor/core/execution/OrkesWorkflowExecutor.java +++ b/server/src/main/java/com/netflix/conductor/core/execution/OrkesWorkflowExecutor.java @@ -66,11 +66,8 @@ public class OrkesWorkflowExecutor extends WorkflowExecutor { private final ExecutionDAOFacade orkesExecutionDAOFacade; private final SystemTaskRegistry systemTaskRegistry; private final ExecutorService taskUpdateExecutor; - private final RedisExecutionDAO executionDAO; - private final MetricsCollector metricsCollector; - public OrkesWorkflowExecutor( DeciderService deciderService, MetadataDAO metadataDAO, @@ -83,8 +80,7 @@ public OrkesWorkflowExecutor( @Lazy SystemTaskRegistry systemTaskRegistry, ParametersUtils parametersUtils, IDGenerator idGenerator, - RedisExecutionDAO executionDAO, - MetricsCollector metricsCollector) { + RedisExecutionDAO executionDAO) { super( deciderService, metadataDAO, @@ -102,7 +98,6 @@ public OrkesWorkflowExecutor( this.orkesExecutionDAOFacade = executionDAOFacade; this.systemTaskRegistry = systemTaskRegistry; this.executionDAO = executionDAO; - this.metricsCollector = metricsCollector; int threadPoolSize = Runtime.getRuntime().availableProcessors() * 10; this.taskUpdateExecutor = @@ -126,6 +121,8 @@ public boolean offer(Runnable runnable) { log.info("OrkesWorkflowExecutor initialized"); } + + @Override public void retry(String workflowId, boolean resumeSubworkflowTasks) { WorkflowModel workflowModel = orkesExecutionDAOFacade.getWorkflowModel(workflowId, true); diff --git a/server/src/main/java/io/orkes/conductor/execution/tasks/HttpSync.java b/server/src/main/java/io/orkes/conductor/execution/tasks/HttpSync.java index 5455432..8d037bb 100644 --- a/server/src/main/java/io/orkes/conductor/execution/tasks/HttpSync.java +++ b/server/src/main/java/io/orkes/conductor/execution/tasks/HttpSync.java @@ -35,6 +35,7 @@ public class HttpSync extends WorkflowSystemTask { public HttpSync(RestTemplateProvider restTemplateProvider, ObjectMapper objectMapper) { super(TASK_TYPE_HTTP); httpTask = new HttpTask(restTemplateProvider, objectMapper); + log.info("Using {}", restTemplateProvider); } @SuppressWarnings("unchecked") diff --git a/server/src/main/java/io/orkes/conductor/execution/tasks/OrkesRestTemplateProvider.java b/server/src/main/java/io/orkes/conductor/execution/tasks/OrkesRestTemplateProvider.java new file mode 100644 index 0000000..a22e701 --- /dev/null +++ b/server/src/main/java/io/orkes/conductor/execution/tasks/OrkesRestTemplateProvider.java @@ -0,0 +1,94 @@ +/* + * Copyright 2020 Netflix, Inc. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package io.orkes.conductor.execution.tasks; + +import com.netflix.conductor.tasks.http.HttpTask; +import com.netflix.conductor.tasks.http.providers.RestTemplateProvider; +import lombok.AllArgsConstructor; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.web.client.RestTemplateBuilder; +import org.springframework.context.annotation.Primary; +import org.springframework.http.client.ClientHttpRequestInterceptor; +import org.springframework.http.client.HttpComponentsClientHttpRequestFactory; +import org.springframework.stereotype.Component; +import org.springframework.web.client.RestTemplate; + +import java.time.Duration; +import java.util.List; +import java.util.Optional; + +/** + * Provider for a customized RestTemplateBuilder. This class provides a default {@link RestTemplateBuilder} which can be + * configured or extended as needed. + */ +@Component +@Primary +public class OrkesRestTemplateProvider implements RestTemplateProvider { + + @AllArgsConstructor + private static class RestTemplateHolder { + RestTemplate restTemplate; + HttpComponentsClientHttpRequestFactory requestFactory; + int readTimeout; + int connectTimeout; + } + + private final ThreadLocal threadLocalRestTemplate; + private final int defaultReadTimeout; + private final int defaultConnectTimeout; + + @Autowired + public OrkesRestTemplateProvider(@Value("${conductor.tasks.http.readTimeout:250ms}") Duration readTimeout, + @Value("${conductor.tasks.http.connectTimeout:250ms}") Duration connectTimeout, + Optional interceptor) { + this.defaultReadTimeout = (int) readTimeout.toMillis(); + this.defaultConnectTimeout = (int) connectTimeout.toMillis(); + this.threadLocalRestTemplate = ThreadLocal.withInitial(() -> + { + RestTemplate restTemplate = new RestTemplate(); + HttpComponentsClientHttpRequestFactory requestFactory = new HttpComponentsClientHttpRequestFactory(); + requestFactory.setReadTimeout(defaultReadTimeout); + requestFactory.setConnectTimeout(defaultConnectTimeout); + restTemplate.setRequestFactory(requestFactory); + interceptor.ifPresent(it -> addInterceptor(restTemplate, it)); + return new RestTemplateHolder(restTemplate, requestFactory, defaultReadTimeout, defaultConnectTimeout); + }); + + } + + @Override + public RestTemplate getRestTemplate(HttpTask.Input input) { + RestTemplateHolder holder = threadLocalRestTemplate.get(); + RestTemplate restTemplate = holder.restTemplate; + HttpComponentsClientHttpRequestFactory requestFactory = holder.requestFactory; + + int newReadTimeout = Optional.ofNullable(input.getReadTimeOut()).orElse(defaultReadTimeout); + int newConnectTimeout = Optional.ofNullable(input.getConnectionTimeOut()).orElse(defaultConnectTimeout); + if (newReadTimeout != holder.readTimeout || newConnectTimeout != holder.connectTimeout) { + holder.readTimeout = newReadTimeout; + holder.connectTimeout = newConnectTimeout; + requestFactory.setReadTimeout(newReadTimeout); + requestFactory.setConnectTimeout(newConnectTimeout); + } + + return restTemplate; + } + + private void addInterceptor(RestTemplate restTemplate, ClientHttpRequestInterceptor interceptor) { + List interceptors = restTemplate.getInterceptors(); + if (!interceptors.contains(interceptor)) { + interceptors.add(interceptor); + } + } +} diff --git a/server/src/main/java/io/orkes/conductor/execution/tasks/SubWorkflowSync.java b/server/src/main/java/io/orkes/conductor/execution/tasks/SubWorkflowSync.java index f9406ea..5efadd1 100644 --- a/server/src/main/java/io/orkes/conductor/execution/tasks/SubWorkflowSync.java +++ b/server/src/main/java/io/orkes/conductor/execution/tasks/SubWorkflowSync.java @@ -29,8 +29,6 @@ @Slf4j public class SubWorkflowSync extends WorkflowSystemTask { - private static final String SUB_WORKFLOW_ID = "subWorkflowId"; - private final SubWorkflow subWorkflow; private final ObjectMapper objectMapper; diff --git a/server/src/main/java/io/orkes/conductor/rest/WorkflowResourceSync.java b/server/src/main/java/io/orkes/conductor/rest/WorkflowResourceSync.java new file mode 100644 index 0000000..f9e39ad --- /dev/null +++ b/server/src/main/java/io/orkes/conductor/rest/WorkflowResourceSync.java @@ -0,0 +1,91 @@ +package io.orkes.conductor.rest; + +import com.google.common.util.concurrent.Uninterruptibles; +import com.netflix.conductor.common.metadata.workflow.StartWorkflowRequest; +import com.netflix.conductor.common.run.Workflow; +import com.netflix.conductor.service.WorkflowService; +import io.orkes.conductor.common.model.WorkflowRun; +import io.swagger.v3.oas.annotations.Operation; +import lombok.RequiredArgsConstructor; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; +import org.springframework.web.bind.annotation.*; + +import javax.annotation.PostConstruct; +import java.util.ArrayList; +import java.util.concurrent.*; + +import static com.netflix.conductor.rest.config.RequestMappingConstants.WORKFLOW; +import static org.springframework.http.MediaType.APPLICATION_JSON_VALUE; + +@RestController +@RequestMapping(WORKFLOW) +@Slf4j +@RequiredArgsConstructor +public class WorkflowResourceSync { + + public static final String REQUEST_ID_KEY = "_X-Request-Id"; + + private final WorkflowService workflowService; + + private final ScheduledExecutorService executionMonitor = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors() * 2); + + @PostConstruct + public void startMonitor() { + log.info("Starting execution monitors"); + } + + @PostMapping(value = "execute/{name}/{version}", produces = APPLICATION_JSON_VALUE) + @Operation(summary = "Execute a workflow synchronously", tags = "workflow-resource") + @SneakyThrows + public WorkflowRun executeWorkflow( + @PathVariable("name") String name, + @PathVariable(value = "version", required = false) Integer version, + @RequestParam(value = "requestId", required = true) String requestId, + @RequestParam(value = "waitUntilTaskRef", required = false) String waitUntilTaskRef, + @RequestBody StartWorkflowRequest request) { + + request.setName(name); + request.setVersion(version); + String workflowId = workflowService.startWorkflow(request); + request.getInput().put(REQUEST_ID_KEY, requestId); + Workflow workflow = workflowService.getExecutionStatus(workflowId, true); + if(workflow.getStatus().isTerminal() || workflow.getTasks().stream().anyMatch(t -> t.getReferenceTaskName().equalsIgnoreCase(waitUntilTaskRef))) { + return toWorkflowRun(workflow); + } + int maxTimeInMilis = 5_000; //5 sec + int sleepTime = 100; //millis + int loopCount = maxTimeInMilis / sleepTime; + for (int i = 0; i < loopCount; i++) { + Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS); + workflow = workflowService.getExecutionStatus(workflowId, true); + if(workflow.getStatus().isTerminal() || workflow.getTasks().stream().anyMatch(t -> t.getReferenceTaskName().equalsIgnoreCase(waitUntilTaskRef))) { + return toWorkflowRun(workflow); + } + } + workflow = workflowService.getExecutionStatus(workflowId, true); + return toWorkflowRun(workflow); + } + + public static WorkflowRun toWorkflowRun(Workflow workflow) { + WorkflowRun run = new WorkflowRun(); + + run.setWorkflowId(workflow.getWorkflowId()); + run.setRequestId((String) workflow.getInput().get(REQUEST_ID_KEY)); + run.setCorrelationId(workflow.getCorrelationId()); + run.setInput(workflow.getInput()); + run.setCreatedBy(workflow.getCreatedBy()); + run.setCreateTime(workflow.getCreateTime()); + run.setOutput(workflow.getOutput()); + run.setTasks(new ArrayList<>()); + workflow.getTasks().forEach(task -> run.getTasks().add(task)); + run.setPriority(workflow.getPriority()); + if(workflow.getUpdateTime() != null) { + run.setUpdateTime(workflow.getUpdateTime()); + } + run.setStatus(Workflow.WorkflowStatus.valueOf(workflow.getStatus().name())); + run.setVariables(workflow.getVariables()); + + return run; + } +}