Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
00b8dc6
Add dispatch timeout checking logic
Dec 15, 2025
bb089f3
add dispatch-timeout-checker switch
Dec 17, 2025
fbd7edf
add dispatch-timeout-checker switch
Dec 17, 2025
ab84035
update ExceptionUtils
Dec 17, 2025
f193af6
add test for dispatch timeout checker
Dec 17, 2025
ab750bb
Merge branch 'apache:dev' into Improvement-17795
njnu-seafish Dec 19, 2025
584ad2d
add TaskFatalLifecycleEvent and handler
Dec 19, 2025
877db06
add TaskFatalLifecycleEvent and handler
Dec 19, 2025
cb44db7
Add dispatch timeout checking logic
Dec 15, 2025
1aa12df
add dispatch-timeout-checker switch
Dec 17, 2025
0c29a82
add dispatch-timeout-checker switch
Dec 17, 2025
6ad77d1
update ExceptionUtils
Dec 17, 2025
b760003
add test for dispatch timeout checker
Dec 17, 2025
39b40c1
add TaskFatalLifecycleEvent and handler
Dec 19, 2025
d7bf4fa
add TaskFatalLifecycleEvent and handler
Dec 19, 2025
0c8c93e
Merge branch 'apache:dev' into Improvement-17795
njnu-seafish Dec 22, 2025
5e1dd52
Merge branch 'Improvement-17795' of github.com:njnu-seafish/dolphinsc…
Dec 22, 2025
5b2722d
update TaskDispatchPolicy
Dec 22, 2025
33c16e8
add it test case
Dec 23, 2025
3c25528
Merge branch 'dev' into Improvement-17795
njnu-seafish Dec 23, 2025
99a5d7c
update test
Dec 23, 2025
9966047
Merge branch 'Improvement-17795' of github.com:njnu-seafish/dolphinsc…
Dec 23, 2025
42a5aa2
Merge branch 'apache:dev' into Improvement-17795
njnu-seafish Jan 8, 2026
879ba5e
update test
Jan 8, 2026
77a5dfa
Merge branch 'dev' into Improvement-17795
njnu-seafish Jan 8, 2026
9a6d664
Merge branch 'dev' into Improvement-17795
ruanwenjun Jan 12, 2026
cca935f
Merge branch 'apache:dev' into Improvement-17795
njnu-seafish Jan 12, 2026
e49549e
update test
Jan 12, 2026
b5a703b
update test
Jan 12, 2026
9f97478
Remove this test case and track the fix in a separate issue
Jan 13, 2026
ddb7eb7
add it test case by using SHELL task
Jan 13, 2026
93e398c
update test comment
Jan 13, 2026
6864a2e
Merge branch 'dev' into Improvement-17795
njnu-seafish Jan 13, 2026
f7c7fe1
update task instance state
Jan 13, 2026
7437249
remove masterContainer.assertAllResourceReleased if workflow is running
Jan 14, 2026
99d2640
update test
Jan 14, 2026
a6302d3
Merge branch 'dev' into Improvement-17795
njnu-seafish Jan 14, 2026
7a01fab
Resolve merge conflicts from dev branch
Jan 14, 2026
8c4700a
update test
Jan 15, 2026
d6f8ccd
update test
Jan 15, 2026
5a47e02
Merge branch 'apache:dev' into Improvement-17795
njnu-seafish Jan 22, 2026
743dcbc
remove redundant tests and comments
Jan 22, 2026
62cfb30
remove redundant comments
Jan 22, 2026
edf818c
Merge branch 'apache:dev' into Improvement-17795
njnu-seafish Jan 23, 2026
f772a55
update comment and test
Jan 23, 2026
5f19c7b
update test
Jan 26, 2026
e12af5d
Merge branch 'dev' into Improvement-17795
njnu-seafish Jan 27, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ public class MasterConfig implements Validator {
*/
private String masterRegistryPath;

private TaskDispatchPolicy taskDispatchPolicy = new TaskDispatchPolicy();

@Override
public boolean supports(Class<?> clazz) {
return MasterConfig.class.isAssignableFrom(clazz);
Expand All @@ -97,6 +99,18 @@ public void validate(Object target, Errors errors) {
if (masterConfig.getWorkerGroupRefreshInterval().getSeconds() < 10) {
errors.rejectValue("worker-group-refresh-interval", null, "should >= 10s");
}

TaskDispatchPolicy dispatchPolicy = masterConfig.getTaskDispatchPolicy();
if (dispatchPolicy.isDispatchTimeoutEnabled()) {
if (dispatchPolicy.getMaxTaskDispatchDuration() == null) {
errors.rejectValue("dispatch-timeout-checker.max-task-dispatch-duration", null,
"must be specified when dispatch timeout checker is enabled");
} else if (dispatchPolicy.getMaxTaskDispatchDuration().toMillis() <= 0) {
errors.rejectValue("dispatch-timeout-checker.max-task-dispatch-duration", null,
"must be a positive duration (e.g., '10m', '30m', '1h')");
}
}

if (StringUtils.isEmpty(masterConfig.getMasterAddress())) {
masterConfig.setMasterAddress(NetUtils.getAddr(masterConfig.getListenPort()));
}
Expand All @@ -122,6 +136,7 @@ private void printConfig() {
"\n command-fetch-strategy: " + commandFetchStrategy +
"\n worker-load-balancer-configuration-properties: "
+ workerLoadBalancerConfigurationProperties +
"\n taskDispatchPolicy: " + taskDispatchPolicy +
"\n****************************Master Configuration**************************************";
log.info(config);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.dolphinscheduler.server.master.config;

import java.time.Duration;

import lombok.Data;

/**
* Configuration for the master's task dispatch policy.
* When enabled, tasks that remain in the dispatch queue longer than
* {@link #maxTaskDispatchDuration} will be marked as failed to prevent indefinite queuing.
*/
@Data
public class TaskDispatchPolicy {

/**
* Indicates whether the dispatch timeout checking mechanism is enabled.
*/
private boolean dispatchTimeoutEnabled = false;

/**
* The maximum allowed duration a task may wait in the dispatch queue before being assigned to a worker.
* Tasks that exceed this duration will be marked as failed.
* Examples: {@code "10m"}, {@code "30m"}, {@code "1h"}.
*/
private Duration maxTaskDispatchDuration;
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,13 @@
import org.apache.dolphinscheduler.extract.base.utils.Host;
import org.apache.dolphinscheduler.extract.worker.IPhysicalTaskExecutorOperator;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.server.master.cluster.ClusterManager;
import org.apache.dolphinscheduler.server.master.cluster.loadbalancer.IWorkerLoadBalancer;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable;
import org.apache.dolphinscheduler.server.master.exception.dispatch.NoAvailableWorkerException;
import org.apache.dolphinscheduler.server.master.exception.dispatch.TaskDispatchException;
import org.apache.dolphinscheduler.server.master.exception.dispatch.WorkerGroupNotFoundException;
import org.apache.dolphinscheduler.task.executor.eventbus.ITaskExecutorLifecycleEventReporter;
import org.apache.dolphinscheduler.task.executor.operations.TaskExecutorDispatchRequest;
import org.apache.dolphinscheduler.task.executor.operations.TaskExecutorDispatchResponse;
Expand Down Expand Up @@ -55,18 +58,26 @@ public class PhysicalTaskExecutorClientDelegator implements ITaskExecutorClientD
@Autowired
private IWorkerLoadBalancer workerLoadBalancer;

@Autowired
private ClusterManager clusterManager;

@Override
public void dispatch(final ITaskExecutionRunnable taskExecutionRunnable) throws TaskDispatchException {
final TaskExecutionContext taskExecutionContext = taskExecutionRunnable.getTaskExecutionContext();
final String taskName = taskExecutionContext.getTaskName();
final String workerGroup = taskExecutionContext.getWorkerGroup();

// workerGroup not exist
if (!clusterManager.getWorkerClusters().containsWorkerGroup(workerGroup)) {
throw new WorkerGroupNotFoundException(workerGroup);
}

// select an available worker from the worker group; throws NoAvailableWorkerException if none is available.
final String physicalTaskExecutorAddress = workerLoadBalancer
.select(taskExecutionContext.getWorkerGroup())
.select(workerGroup)
.map(Host::of)
.map(Host::getAddress)
.orElseThrow(() -> new TaskDispatchException(
String.format("Cannot find the host to dispatch Task[id=%s, name=%s, workerGroup=%s]",
taskExecutionContext.getTaskInstanceId(), taskName,
taskExecutionContext.getWorkerGroup())));
.orElseThrow(() -> new NoAvailableWorkerException(workerGroup));

taskExecutionContext.setHost(physicalTaskExecutorAddress);
taskExecutionRunnable.getTaskInstance().setHost(physicalTaskExecutorAddress);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,16 @@
package org.apache.dolphinscheduler.server.master.engine.task.dispatcher;

import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
import org.apache.dolphinscheduler.server.master.config.TaskDispatchPolicy;
import org.apache.dolphinscheduler.server.master.engine.task.client.ITaskExecutorClient;
import org.apache.dolphinscheduler.server.master.engine.task.dispatcher.event.TaskDispatchableEvent;
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskFailedLifecycleEvent;
import org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable;
import org.apache.dolphinscheduler.task.executor.log.TaskExecutorMDCUtils;

import java.util.Date;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
Expand All @@ -48,11 +52,22 @@ public class WorkerGroupDispatcher extends BaseDaemonThread {

private final AtomicBoolean runningFlag = new AtomicBoolean(false);

public WorkerGroupDispatcher(String workerGroupName, ITaskExecutorClient taskExecutorClient) {
private final TaskDispatchPolicy taskDispatchPolicy;

private final long maxTaskDispatchMillis;

public WorkerGroupDispatcher(String workerGroupName, ITaskExecutorClient taskExecutorClient,
TaskDispatchPolicy taskDispatchPolicy) {
super("WorkerGroupTaskDispatcher-" + workerGroupName);
this.taskExecutorClient = taskExecutorClient;
this.workerGroupEventBus = new TaskDispatchableEventBus<>();
this.waitingDispatchTaskIds = ConcurrentHashMap.newKeySet();
this.taskDispatchPolicy = taskDispatchPolicy;
if (taskDispatchPolicy.isDispatchTimeoutEnabled()) {
this.maxTaskDispatchMillis = taskDispatchPolicy.getMaxTaskDispatchDuration().toMillis();
} else {
this.maxTaskDispatchMillis = 0L;
}
log.info("Initialize WorkerGroupDispatcher: {}", this.getName());
}

Expand Down Expand Up @@ -84,26 +99,54 @@ public void run() {
}

private void doDispatchTask(ITaskExecutionRunnable taskExecutionRunnable) {
final int taskInstanceId = taskExecutionRunnable.getId();
final TaskExecutionContext taskExecutionContext = taskExecutionRunnable.getTaskExecutionContext();
try {
if (!waitingDispatchTaskIds.remove(taskExecutionRunnable.getId())) {
if (!waitingDispatchTaskIds.remove(taskInstanceId)) {
log.info(
"The task: {} doesn't exist in waitingDispatchTaskIds(it might be paused or killed), will skip dispatch",
taskExecutionRunnable.getId());
taskInstanceId);
return;
}
taskExecutorClient.dispatch(taskExecutionRunnable);
} catch (Exception e) {
} catch (Exception ex) {
if (taskDispatchPolicy.isDispatchTimeoutEnabled()) {
// If a dispatch timeout occurs, the task will not be put back into the queue.
long elapsed = System.currentTimeMillis() - taskExecutionContext.getFirstDispatchTime();
if (elapsed > maxTaskDispatchMillis) {
onDispatchTimeout(taskExecutionRunnable, ex, elapsed, maxTaskDispatchMillis);
return;
}
}

// If dispatch failed, will put the task back to the queue
// The task will be dispatched after waiting time.
// the waiting time will increase multiple of times, but will not exceed 60 seconds
long waitingTimeMills = Math.min(
long waitingTimeMillis = Math.min(
taskExecutionRunnable.getTaskExecutionContext().increaseDispatchFailTimes() * 1_000L, 60_000L);
dispatchTask(taskExecutionRunnable, waitingTimeMills);
log.error("Dispatch Task: {} failed will retry after: {}/ms", taskExecutionRunnable.getId(),
waitingTimeMills, e);
dispatchTask(taskExecutionRunnable, waitingTimeMillis);
log.warn("Dispatch Task: {} failed will retry after: {}/ms", taskInstanceId,
waitingTimeMillis, ex);
}
}

/**
* Marks a task as permanently failed due to dispatch timeout.
* Once called, the task is considered permanently failed and will not be retried.
*/
private void onDispatchTimeout(ITaskExecutionRunnable taskExecutionRunnable, Exception ex,
long elapsed, long timeout) {
String taskName = taskExecutionRunnable.getName();
log.error("Task: {} dispatch timeout after {}ms (limit: {}ms)",
taskName, elapsed, timeout, ex);

final TaskFailedLifecycleEvent taskFailedEvent = TaskFailedLifecycleEvent.builder()
.taskExecutionRunnable(taskExecutionRunnable)
.endTime(new Date())
.build();
taskExecutionRunnable.getWorkflowEventBus().publish(taskFailedEvent);
}

/**
* Adds a task to the worker group queue.
* This method wraps the given task execution object into a priority and delay-based task entry and adds it to the worker group queue.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.dolphinscheduler.server.master.engine.task.dispatcher;

import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.engine.task.client.ITaskExecutorClient;
import org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable;

Expand All @@ -41,8 +42,11 @@ public class WorkerGroupDispatcherCoordinator implements AutoCloseable {

private final ConcurrentHashMap<String, WorkerGroupDispatcher> workerGroupDispatcherMap;

public WorkerGroupDispatcherCoordinator() {
private final MasterConfig masterConfig;

public WorkerGroupDispatcherCoordinator(final MasterConfig masterConfig) {
workerGroupDispatcherMap = new ConcurrentHashMap<>();
this.masterConfig = masterConfig;
}

public void start() {
Expand Down Expand Up @@ -99,7 +103,8 @@ public void close() throws Exception {

private WorkerGroupDispatcher getOrCreateWorkerGroupDispatcher(String workerGroup) {
return workerGroupDispatcherMap.computeIfAbsent(workerGroup, wg -> {
WorkerGroupDispatcher workerGroupDispatcher = new WorkerGroupDispatcher(wg, taskExecutorClient);
WorkerGroupDispatcher workerGroupDispatcher =
new WorkerGroupDispatcher(wg, taskExecutorClient, masterConfig.getTaskDispatchPolicy());
workerGroupDispatcher.start();
return workerGroupDispatcher;
});
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.dolphinscheduler.server.master.exception.dispatch;

public class NoAvailableWorkerException extends TaskDispatchException {

public NoAvailableWorkerException(String workerGroup) {
super("Cannot find available worker under worker group: " + workerGroup);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package org.apache.dolphinscheduler.server.master.utils;

import org.apache.dolphinscheduler.server.master.exception.TaskExecutionContextCreateException;
import org.apache.dolphinscheduler.server.master.exception.dispatch.NoAvailableWorkerException;
import org.apache.dolphinscheduler.server.master.exception.dispatch.WorkerGroupNotFoundException;

import org.springframework.dao.DataAccessResourceFailureException;

Expand All @@ -30,4 +32,12 @@ public static boolean isDatabaseConnectedFailedException(Throwable e) {
public static boolean isTaskExecutionContextCreateException(Throwable e) {
return e instanceof TaskExecutionContextCreateException;
}

public static boolean isWorkerGroupNotFoundException(Throwable e) {
return e instanceof WorkerGroupNotFoundException;
}

public static boolean isNoAvailableWorkerException(Throwable e) {
return e instanceof NoAvailableWorkerException;
}
}
5 changes: 5 additions & 0 deletions dolphinscheduler-master/src/main/resources/application.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,11 @@ master:
# Master max concurrent workflow instances, when the master's workflow instance count exceeds this value, master server will be marked as busy.
max-concurrent-workflow-instances: 2147483647
worker-group-refresh-interval: 5m
# Task dispatch timeout check (currently disabled).
# When enabled, tasks not dispatched within this duration are marked as failed.
task-dispatch-policy:
dispatch-timeout-enabled: false
max-task-dispatch-duration: 1h
command-fetch-strategy:
type: ID_SLOT_BASED
config:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import org.apache.dolphinscheduler.server.master.cluster.loadbalancer.WorkerLoadBalancerConfigurationProperties;
import org.apache.dolphinscheduler.server.master.cluster.loadbalancer.WorkerLoadBalancerType;

import java.time.Duration;

import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMockMvc;
Expand Down Expand Up @@ -72,4 +74,13 @@ public void getWorkerLoadBalancerConfigurationProperties() {
assertThat(dynamicWeightConfigProperties.getCpuUsageWeight()).isEqualTo(30);
assertThat(dynamicWeightConfigProperties.getTaskThreadPoolUsageWeight()).isEqualTo(30);
}

@Test
public void getTaskDispatchPolicy() {
TaskDispatchPolicy policy = masterConfig.getTaskDispatchPolicy();

assertThat(policy).isNotNull();
assertThat(policy.isDispatchTimeoutEnabled()).isFalse();
assertThat(policy.getMaxTaskDispatchDuration()).isEqualTo(Duration.ofHours(1));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,25 +24,33 @@

import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.engine.task.client.ITaskExecutorClient;
import org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
import org.springframework.test.util.ReflectionTestUtils;

@ExtendWith(MockitoExtension.class)
class WorkerGroupDispatcherCoordinatorTest {

@InjectMocks
private WorkerGroupDispatcherCoordinator workerGroupDispatcherCoordinator;

@Mock
private ITaskExecutorClient taskExecutorClient;

@BeforeEach
void setUp() {
MasterConfig masterConfig = new MasterConfig();
workerGroupDispatcherCoordinator = new WorkerGroupDispatcherCoordinator(masterConfig);
ReflectionTestUtils.setField(workerGroupDispatcherCoordinator, "taskExecutorClient", taskExecutorClient);
}

@Test
void addTaskToWorkerGroup_NewWorkerGroup_ShouldAddTask() {
String workerGroup = "newGroup";
Expand Down
Loading