Skip to content

Commit

Permalink
fix(schedule): remove slow sql #4130
Browse files Browse the repository at this point in the history
  • Loading branch information
guowl3 authored Jan 10, 2025
1 parent 085058b commit beabc1f
Show file tree
Hide file tree
Showing 5 changed files with 6 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

import java.util.Collections;
import java.util.Date;
import java.util.LinkedList;
import java.util.List;
import java.util.Optional;

Expand Down Expand Up @@ -77,17 +76,6 @@ public void updateExecutor() {
Assert.equals(executor, byId.get().getExecutor());
}

@Test
public void listByScheduleIdAndStatus() {
taskRepository.deleteAll();
createScheduleTask();
List<TaskStatus> statuses = new LinkedList<>();
statuses.add(TaskStatus.RUNNING);
statuses.add(TaskStatus.PREPARING);
List<ScheduleTaskEntity> byJobNameAndStatus = taskRepository.findByJobNameAndStatusIn("1", statuses);
Assert.equals(byJobNameAndStatus.size(), 1);
}

@Test
public void findByIdIn() {
ScheduleTaskEntity scheduleTask = createScheduleTask();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ public static List<TaskStatus> getProcessingStatus() {
return Arrays.asList(PREPARING, RUNNING);
}

public boolean isProcessing() {
return getProcessingStatus().contains(this);
}

public boolean isTerminated() {
return TaskStatus.CANCELED == this || TaskStatus.FAILED == this || TaskStatus.DONE == this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,6 @@ public interface ScheduleTaskRepository extends JpaRepository<ScheduleTaskEntity

Optional<ScheduleTaskEntity> findByIdAndJobName(Long id, String scheduleId);

List<ScheduleTaskEntity> findByJobNameAndStatusIn(String jobName, List<TaskStatus> statuses);

@Query(value = "select * from schedule_task where job_name=:jobName and job_group = :jobGroup order by id desc limit 1",
nativeQuery = true)
Optional<ScheduleTaskEntity> getLatestScheduleTaskByJobNameAndJobGroup(@Param("jobName") String jobName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -718,9 +718,8 @@ public void refreshScheduleStatus(Long scheduleId) {
if (status == ScheduleStatus.PAUSE) {
return;
}
int runningTask = scheduleTaskService.listTaskByJobNameAndStatus(scheduleId.toString(),
TaskStatus.getProcessingStatus()).size();
if (runningTask > 0) {
Optional<ScheduleTask> latestTask = getLatestTask(scheduleId);
if (latestTask.isPresent() && latestTask.get().getStatus().isProcessing()) {
status = ScheduleStatus.ENABLED;
} else {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -284,10 +284,6 @@ public List<ScheduleTask> listByJobNames(Set<String> jobNames) {
.collect(Collectors.toList());
}

public List<ScheduleTaskEntity> listTaskByJobNameAndStatus(String jobName, List<TaskStatus> statuses) {
return scheduleTaskRepository.findByJobNameAndStatusIn(jobName, statuses);
}

public ScheduleTask nullSafeGetByJobId(Long jobId) {
return findByJobId(jobId)
.orElseThrow(() -> new NotFoundException(ResourceType.ODC_SCHEDULE_TASK, "jobId", jobId));
Expand Down Expand Up @@ -323,24 +319,6 @@ public ScheduleTask nullSafeGetByIdAndScheduleId(Long id, Long scheduleId) {
.orElseThrow(() -> new NotFoundException(ResourceType.ODC_SCHEDULE_TASK, "id", id)));
}


public void correctScheduleTaskStatus(Long scheduleId) {
List<ScheduleTaskEntity> toBeCorrectedList = listTaskByJobNameAndStatus(
scheduleId.toString(), TaskStatus.getProcessingStatus());
// For the scenario where the task framework is switched from closed to open, it is necessary to
// correct
// the status of tasks that were not completed while in the closed state.
if (taskFrameworkEnabledProperties.isEnabled()) {
toBeCorrectedList =
toBeCorrectedList.stream().filter(o -> o.getJobId() == null).collect(Collectors.toList());
}
toBeCorrectedList.forEach(task -> {
updateStatusById(task.getId(), TaskStatus.CANCELED);
log.info("Task status correction successful,scheduleTaskId={}", task.getId());
});
}


@SkipAuthorize("odc internal usage")
public void triggerDataArchiveDelete(Long scheduleTaskId) {

Expand Down

0 comments on commit beabc1f

Please sign in to comment.