Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

Commit

Permalink
Merge pull request #2233 from Netflix/task_limit
Browse files Browse the repository at this point in the history
Added check for max tasks limit
  • Loading branch information
aravindanr authored May 7, 2021
2 parents 1061bed + 34cdf83 commit 6c8c7b1
Show file tree
Hide file tree
Showing 9 changed files with 246 additions and 170 deletions.
3 changes: 2 additions & 1 deletion .github/release-drafter.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,14 @@ version-resolver:
minor:
labels:
- 'type: important'
- 'type: feature'

patch:
labels:
- 'type: bug'
- 'type: maintenance'
- 'type: docs'
- 'type: dependencies'
- 'type: feature'

exclude-labels:
- 'skip-changelog'
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.junit.Test;
import org.mockserver.client.MockServerClient;
import org.mockserver.model.MediaType;
import org.springframework.core.env.Environment;
import org.testcontainers.containers.MockServerContainer;
import org.testcontainers.utility.DockerImageName;

Expand Down Expand Up @@ -70,7 +71,7 @@ public class HttpTaskTest {

@ClassRule
public static MockServerContainer mockServer = new MockServerContainer(
DockerImageName.parse("mockserver/mockserver"));
DockerImageName.parse("mockserver/mockserver"));

@BeforeClass
public static void init() throws Exception {
Expand All @@ -84,58 +85,58 @@ public static void init() throws Exception {
};
MockServerClient client = new MockServerClient(mockServer.getHost(), mockServer.getServerPort());
client.when(
request()
.withPath("/post")
.withMethod("POST"))
.respond(request -> {
Map<String, Object> reqBody = objectMapper.readValue(request.getBody().toString(), mapOfObj);
Set<String> keys = reqBody.keySet();
Map<String, Object> respBody = new HashMap<>();
keys.forEach(k -> respBody.put(k, k));
return response()
.withContentType(MediaType.APPLICATION_JSON)
.withBody(objectMapper.writeValueAsString(respBody));
});
request()
.withPath("/post")
.withMethod("POST"))
.respond(request -> {
Map<String, Object> reqBody = objectMapper.readValue(request.getBody().toString(), mapOfObj);
Set<String> keys = reqBody.keySet();
Map<String, Object> respBody = new HashMap<>();
keys.forEach(k -> respBody.put(k, k));
return response()
.withContentType(MediaType.APPLICATION_JSON)
.withBody(objectMapper.writeValueAsString(respBody));
});
client.when(
request()
.withPath("/post2")
.withMethod("POST"))
.respond(response()
.withStatusCode(204));
request()
.withPath("/post2")
.withMethod("POST"))
.respond(response()
.withStatusCode(204));
client.when(
request()
.withPath("/failure")
.withMethod("GET"))
.respond(response()
.withStatusCode(500)
.withContentType(MediaType.TEXT_PLAIN)
.withBody(ERROR_RESPONSE));
request()
.withPath("/failure")
.withMethod("GET"))
.respond(response()
.withStatusCode(500)
.withContentType(MediaType.TEXT_PLAIN)
.withBody(ERROR_RESPONSE));
client.when(
request()
.withPath("/text")
.withMethod("GET"))
.respond(response()
.withBody(TEXT_RESPONSE));
request()
.withPath("/text")
.withMethod("GET"))
.respond(response()
.withBody(TEXT_RESPONSE));
client.when(
request()
.withPath("/numeric")
.withMethod("GET"))
.respond(response()
.withBody(String.valueOf(NUM_RESPONSE)));
request()
.withPath("/numeric")
.withMethod("GET"))
.respond(response()
.withBody(String.valueOf(NUM_RESPONSE)));
client.when(
request()
.withPath("/json")
.withMethod("GET"))
.respond(response()
.withContentType(MediaType.APPLICATION_JSON)
.withBody(JSON_RESPONSE));
request()
.withPath("/json")
.withMethod("GET"))
.respond(response()
.withContentType(MediaType.APPLICATION_JSON)
.withBody(JSON_RESPONSE));
}

@Before
public void setup() {
workflowExecutor = mock(WorkflowExecutor.class);
DefaultRestTemplateProvider defaultRestTemplateProvider =
new DefaultRestTemplateProvider(Duration.ofMillis(150), Duration.ofMillis(100));
new DefaultRestTemplateProvider(Duration.ofMillis(150), Duration.ofMillis(100));
httpTask = new HttpTask(defaultRestTemplateProvider, objectMapper);
}

Expand Down Expand Up @@ -370,10 +371,11 @@ public void testOptional() {
ExternalPayloadStorageUtils externalPayloadStorageUtils = mock(ExternalPayloadStorageUtils.class);
ParametersUtils parametersUtils = mock(ParametersUtils.class);
SystemTaskRegistry systemTaskRegistry = mock(SystemTaskRegistry.class);
Environment environment = mock(Environment.class);

new DeciderService(parametersUtils, metadataDAO, externalPayloadStorageUtils, systemTaskRegistry,
Collections.emptyMap(),
Duration.ofMinutes(60))
.decide(workflow);
Collections.emptyMap(),
Duration.ofMinutes(60),
environment).decide(workflow);
}
}
Original file line number Diff line number Diff line change
@@ -1,26 +1,18 @@
/*
*
* * Copyright 2021 Netflix, Inc.
* * <p>
* * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* * the License. You may obtain a copy of the License at
* * <p>
* * http://www.apache.org/licenses/LICENSE-2.0
* * <p>
* * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* * specific language governing permissions and limitations under the License.
*
* Copyright 2021 Netflix, Inc.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package com.netflix.conductor.core.config;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
Expand All @@ -31,12 +23,15 @@
import rx.Scheduler;
import rx.schedulers.Schedulers;

import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;

@Configuration(proxyBeanMethods = false)
@EnableScheduling
@EnableAsync
public class SchedulerConfiguration implements SchedulingConfigurer {

private static final Logger LOGGER = LoggerFactory.getLogger(SchedulerConfiguration.class);
public static final String SWEEPER_EXECUTOR_NAME = "WorkflowSweeperExecutor";

/**
Expand All @@ -58,8 +53,7 @@ public Scheduler scheduler(ConductorProperties properties) {
@Bean(SWEEPER_EXECUTOR_NAME)
public Executor sweeperExecutor(ConductorProperties properties) {
if (properties.getSweeperThreadCount() <= 0) {
throw new IllegalStateException("Cannot set workflow sweeper thread count to <=0. To disable workflow "
+ "sweeper, set conductor.workflow-reconciler.enabled=false.");
throw new IllegalStateException("conductor.app.sweeper-thread-count must be greater than 0.");
}
ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setNameFormat("sweeper-thread-%d")
Expand All @@ -70,7 +64,7 @@ public Executor sweeperExecutor(ConductorProperties properties) {
@Override
public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
ThreadPoolTaskScheduler threadPoolTaskScheduler = new ThreadPoolTaskScheduler();
threadPoolTaskScheduler.setPoolSize(2); // equal to the number of scheduled jobs
threadPoolTaskScheduler.setPoolSize(3); // equal to the number of scheduled jobs
threadPoolTaskScheduler.setThreadNamePrefix("scheduled-task-pool-");
threadPoolTaskScheduler.initialize();
taskRegistrar.setTaskScheduler(threadPoolTaskScheduler);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.core.env.Environment;
import org.springframework.stereotype.Service;

import java.time.Duration;
Expand Down Expand Up @@ -69,11 +70,15 @@ public class DeciderService {

private static final Logger LOGGER = LoggerFactory.getLogger(DeciderService.class);

@VisibleForTesting
static final String MAX_TASK_LIMIT = "conductor.app.max-task-limit";

private final ParametersUtils parametersUtils;
private final ExternalPayloadStorageUtils externalPayloadStorageUtils;
private final MetadataDAO metadataDAO;
private final SystemTaskRegistry systemTaskRegistry;
private final long taskPendingTimeThresholdMins;
private final Environment environment;

private final Map<TaskType, TaskMapper> taskMappers;

Expand All @@ -88,13 +93,15 @@ public DeciderService(ParametersUtils parametersUtils, MetadataDAO metadataDAO,
ExternalPayloadStorageUtils externalPayloadStorageUtils,
SystemTaskRegistry systemTaskRegistry,
@Qualifier("taskProcessorsMap") Map<TaskType, TaskMapper> taskMappers,
@Value("${conductor.app.taskPendingTimeThreshold:60m}") Duration taskPendingTimeThreshold) {
@Value("${conductor.app.taskPendingTimeThreshold:60m}") Duration taskPendingTimeThreshold,
Environment environment) {
this.metadataDAO = metadataDAO;
this.parametersUtils = parametersUtils;
this.taskMappers = taskMappers;
this.externalPayloadStorageUtils = externalPayloadStorageUtils;
this.taskPendingTimeThresholdMins = taskPendingTimeThreshold.toMinutes();
this.systemTaskRegistry = systemTaskRegistry;
this.environment = environment;
}

public DeciderOutcome decide(Workflow workflow) throws TerminateWorkflowException {
Expand Down Expand Up @@ -235,6 +242,20 @@ && checkForWorkflowCompletion(workflow))) {
outcome.isComplete = true;
}

// terminate workflows that exceed the threshold
if (environment.containsProperty(MAX_TASK_LIMIT)) {
//noinspection ConstantConditions
int maxTasksThreshold = environment.getProperty(MAX_TASK_LIMIT, int.class);
int currentTasks = workflow.getTasks().size();
if (currentTasks + tasksToBeScheduled.size() > maxTasksThreshold) {
String terminationReason = String.format("Sum of the tasks in the workflow %s " +
"and tasks to be scheduled %s exceed threshold %s", currentTasks,
tasksToBeScheduled.size(), maxTasksThreshold);
LOGGER.warn("{}, terminating {}", terminationReason, workflow.toShortString());
throw new TerminateWorkflowException(terminationReason, WorkflowStatus.TERMINATED);
}
}

return outcome;
}

Expand Down
Loading

0 comments on commit 6c8c7b1

Please sign in to comment.