Skip to content

Commit

Permalink
feat: improve exception handling, test with dependency injection
Browse files Browse the repository at this point in the history
  • Loading branch information
emptyOVO committed Nov 17, 2024
1 parent a9e606a commit c713678
Show file tree
Hide file tree
Showing 8 changed files with 721 additions and 378 deletions.
24 changes: 24 additions & 0 deletions inlong-manager/manager-schedule/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -79,5 +79,29 @@
<version>${testcontainers.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-test</artifactId>
<version>2.6.15</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<exclusions>
<exclusion>
<groupId>com.vaadin.external.google</groupId>
<artifactId>android-json</artifactId>
</exclusion>
<exclusion>
<groupId>com.jayway.jsonpath</groupId>
<artifactId>json-path</artifactId>
</exclusion>
<exclusion>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;
import javax.annotation.Resource;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

Expand Down Expand Up @@ -71,11 +74,18 @@ public class DolphinScheduleEngine implements ScheduleEngine {
@Value("${inlong.schedule.dolphinscheduler.token:default_token_value}")
private String token;

@Resource
private DolphinScheduleOperator dolphinScheduleOperator;

private long projectCode;
private final String projectBaseUrl;
private final DolphinScheduleUtils dsUtils;
private String projectBaseUrl;
private final Map<Long, String> scheduledProcessMap;

@PostConstruct
public void init() {
this.projectBaseUrl = dolphinUrl + DS_PROJECT_URL;
}

public DolphinScheduleEngine(String host, int port, String username, String password, String dolphinUrl,
String token) {
this.host = host;
Expand All @@ -84,27 +94,11 @@ public DolphinScheduleEngine(String host, int port, String username, String pass
this.password = password;
this.dolphinUrl = dolphinUrl;
this.token = token;
this.projectBaseUrl = dolphinUrl + DS_PROJECT_URL;
try {
LOGGER.info("Dolphin Scheduler engine http client initialized");
this.dsUtils = new DolphinScheduleUtils();
this.scheduledProcessMap = new ConcurrentHashMap<>();
} catch (Exception e) {
LOGGER.error("Failed to init dolphin scheduler: ", e);
throw new DolphinScheduleException(String.format("Failed to init dolphin scheduler: %s", e.getMessage()));
}
this.scheduledProcessMap = new ConcurrentHashMap<>();
}

public DolphinScheduleEngine() {
this.projectBaseUrl = dolphinUrl + DS_PROJECT_URL;
try {
LOGGER.info("Dolphin Scheduler engine http client initialized");
this.dsUtils = new DolphinScheduleUtils();
this.scheduledProcessMap = new ConcurrentHashMap<>();
} catch (Exception e) {
LOGGER.error("Failed to init dolphin scheduler: ", e);
throw new DolphinScheduleException(String.format("Failed to init dolphin scheduler: %s", e.getMessage()));
}
this.scheduledProcessMap = new ConcurrentHashMap<>();
}

/**
Expand All @@ -114,19 +108,20 @@ public DolphinScheduleEngine() {
@Override
public void start() {
LOGGER.info("Starting dolphin scheduler engine, Checking project exists...");
long code = dsUtils.checkAndGetUniqueId(projectBaseUrl, token, DS_DEFAULT_PROJECT_NAME);
long code = dolphinScheduleOperator.checkAndGetUniqueId(projectBaseUrl, token, DS_DEFAULT_PROJECT_NAME);
if (code != 0) {
LOGGER.info("Project exists, project code: {}", code);
this.projectCode = code;

LOGGER.info("Starting synchronize existing process definition");
String queryProcessDefUrl = projectBaseUrl + "/" + projectCode + DS_PROCESS_URL + DS_PROCESS_QUERY_URL;
scheduledProcessMap.putAll(dsUtils.queryAllProcessDef(queryProcessDefUrl, token));
scheduledProcessMap.putAll(dolphinScheduleOperator.queryAllProcessDef(queryProcessDefUrl, token));

} else {
LOGGER.info("There is no inlong offline project exists, default project will be created");
this.projectCode =
dsUtils.creatNewProject(projectBaseUrl, token, DS_DEFAULT_PROJECT_NAME, DS_DEFAULT_PROJECT_DESC);
dolphinScheduleOperator.creatProject(projectBaseUrl, token, DS_DEFAULT_PROJECT_NAME,
DS_DEFAULT_PROJECT_DESC);
}
}

Expand All @@ -145,36 +140,38 @@ public boolean handleRegister(ScheduleInfo scheduleInfo) {
LOGGER.info("Dolphin Scheduler handle register begin for {}, Checking process definition id uniqueness...",
scheduleInfo.getInlongGroupId());
try {
long processDefCode = dsUtils.checkAndGetUniqueId(processDefUrl, token, processName);
long processDefCode = dolphinScheduleOperator.checkAndGetUniqueId(processDefUrl, token, processName);

boolean online = false;
if (processDefCode != 0 || scheduledProcessMap.containsKey(processDefCode)) {

// process definition already exists, delete and rebuild
LOGGER.info("Process definition exists, process definition id: {}, deleting...", processDefCode);
if (dsUtils.releaseProcessDef(processDefUrl, processDefCode, token, DS_OFFLINE_STATE)) {
dsUtils.deleteProcessDef(processDefUrl, token, processDefCode);
if (dolphinScheduleOperator.releaseProcessDef(processDefUrl, processDefCode, token, DS_OFFLINE_STATE)) {
dolphinScheduleOperator.deleteProcessDef(processDefUrl, token, processDefCode);
scheduledProcessMap.remove(processDefCode);
}
}
String taskCodeUrl = projectBaseUrl + "/" + projectCode + DS_TASK_CODE_URL;

long taskCode = dsUtils.genTaskCode(taskCodeUrl, token);
long taskCode = dolphinScheduleOperator.genTaskCode(taskCodeUrl, token);
LOGGER.info("Generate task code for process definition success, task code: {}", taskCode);

long offset = dsUtils.calculateOffset(scheduleInfo);
long offset = DolphinScheduleUtils.calculateOffset(scheduleInfo);
processDefCode =
dsUtils.createProcessDef(processDefUrl, token, processName, processDesc, taskCode, host, port,
dolphinScheduleOperator.createProcessDef(processDefUrl, token, processName, processDesc, taskCode,
host, port,
username, password, offset, scheduleInfo.getInlongGroupId());
LOGGER.info("Create process definition success, process definition code: {}", processDefCode);

if (dsUtils.releaseProcessDef(processDefUrl, processDefCode, token, DS_ONLINE_STATE)) {
if (dolphinScheduleOperator.releaseProcessDef(processDefUrl, processDefCode, token, DS_ONLINE_STATE)) {
LOGGER.info("Release process definition success, release status: {}", DS_ONLINE_STATE);

int scheduleId = dsUtils.createScheduleForProcessDef(scheduleUrl, processDefCode, token, scheduleInfo);
int scheduleId = dolphinScheduleOperator.createScheduleForProcessDef(scheduleUrl, processDefCode, token,
scheduleInfo);
LOGGER.info("Create schedule for process definition success, schedule info: {}", scheduleInfo);

online = dsUtils.onlineScheduleForProcessDef(scheduleUrl, scheduleId, token);
online = dolphinScheduleOperator.onlineScheduleForProcessDef(scheduleUrl, scheduleId, token);
LOGGER.info("Online schedule for process definition, status: {}", online);
}

Expand All @@ -200,13 +197,13 @@ public boolean handleUnregister(String groupId) {
LOGGER.info("Dolphin Scheduler handle Unregister begin for {}, Checking process definition id uniqueness...",
groupId);
try {
long processDefCode = dsUtils.checkAndGetUniqueId(processDefUrl, token, processName);
long processDefCode = dolphinScheduleOperator.checkAndGetUniqueId(processDefUrl, token, processName);
if (processDefCode != 0 || scheduledProcessMap.containsKey(processDefCode)) {

LOGGER.info("Deleting process definition, process definition id: {}", processDefCode);
if (dsUtils.releaseProcessDef(processDefUrl, processDefCode, token, DS_OFFLINE_STATE)) {
if (dolphinScheduleOperator.releaseProcessDef(processDefUrl, processDefCode, token, DS_OFFLINE_STATE)) {

dsUtils.deleteProcessDef(processDefUrl, token, processDefCode);
dolphinScheduleOperator.deleteProcessDef(processDefUrl, token, processDefCode);
scheduledProcessMap.remove(processDefCode);
LOGGER.info("Process definition deleted");
}
Expand Down Expand Up @@ -249,17 +246,17 @@ public void stop() {
try {

String queryProcessDefUrl = projectBaseUrl + "/" + projectCode + DS_PROCESS_URL + DS_PROCESS_QUERY_URL;
Map<Long, String> allProcessDef = dsUtils.queryAllProcessDef(queryProcessDefUrl, token);
Map<Long, String> allProcessDef = dolphinScheduleOperator.queryAllProcessDef(queryProcessDefUrl, token);

for (Long processDefCode : allProcessDef.keySet()) {

LOGGER.info("delete process definition id: {}", processDefCode);
dsUtils.releaseProcessDef(processDefUrl, processDefCode, token, DS_OFFLINE_STATE);
dsUtils.deleteProcessDef(processDefUrl, token, processDefCode);
dolphinScheduleOperator.releaseProcessDef(processDefUrl, processDefCode, token, DS_OFFLINE_STATE);
dolphinScheduleOperator.deleteProcessDef(processDefUrl, token, processDefCode);
scheduledProcessMap.remove(processDefCode);
}

dsUtils.deleteProject(projectBaseUrl, token, projectCode);
dolphinScheduleOperator.deleteProject(projectBaseUrl, token, projectCode);
LOGGER.info("Dolphin scheduler engine stopped");

} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.manager.schedule.dolphinscheduler;

import org.apache.inlong.manager.pojo.schedule.ScheduleInfo;
import org.apache.inlong.manager.schedule.exception.DolphinScheduleException;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;

import java.util.Map;

import static org.apache.inlong.manager.schedule.exception.DolphinScheduleException.UNEXPECTED_ERROR;

/**
* DolphinScheduler operator, This class includes methods for creating, updating, and deleting projects,
* tasks, and process definitions in DolphinScheduler.
*/
@Service
public class DolphinScheduleOperator {

private static final Logger LOGGER = LoggerFactory.getLogger(DolphinScheduleOperator.class);

/**
* Checks the uniqueness of a DolphinScheduler project ID based on the given search value.
*/
public long checkAndGetUniqueId(String url, String token, String searchVal) {
try {
return DolphinScheduleUtils.checkAndGetUniqueId(url, token, searchVal);
} catch (Exception e) {
LOGGER.error("Unexpected wrong in check id uniqueness: ", e);
throw new DolphinScheduleException(UNEXPECTED_ERROR,
String.format("Unexpected wrong in check id uniqueness: %s", e.getMessage()));
}
}

/**
* Creates a new project in DolphinScheduler.
*/
public long creatProject(String url, String token, String projectName, String description) {
try {
return DolphinScheduleUtils.creatProject(url, token, projectName, description);
} catch (Exception e) {
LOGGER.error("Unexpected error while creating new project: ", e);
throw new DolphinScheduleException(UNEXPECTED_ERROR,
String.format("Unexpected error while creating new project: %s", e.getMessage()));
}
}

/**
* Query all process definition in DolphinScheduler project.
*/
public Map<Long, String> queryAllProcessDef(String url, String token) {
try {
return DolphinScheduleUtils.queryAllProcessDef(url, token);
} catch (Exception e) {
LOGGER.error("Unexpected error while querying process definition: ", e);
throw new DolphinScheduleException(UNEXPECTED_ERROR,
String.format("Unexpected error while querying process definition: %s", e.getMessage()));
}
}

/**
* Generates a new task code in DolphinScheduler.
*/
public long genTaskCode(String url, String token) {
try {
return DolphinScheduleUtils.genTaskCode(url, token);
} catch (Exception e) {
LOGGER.error("Unexpected wrong in generating task code: ", e);
throw new DolphinScheduleException(UNEXPECTED_ERROR,
String.format("Unexpected wrong in generating task code: %s", e.getMessage()));
}
}

/**
* Creates a process definition in DolphinScheduler.
*/
public long createProcessDef(String url, String token, String name, String desc, long taskCode, String host,
int port, String username, String password, long offset, String groupId) {
try {
return DolphinScheduleUtils.createProcessDef(url, token, name, desc, taskCode, host,
port, username, password, offset, groupId);
} catch (Exception e) {
LOGGER.error("Unexpected wrong in creating process definition: ", e);
throw new DolphinScheduleException(UNEXPECTED_ERROR,
String.format("Unexpected wrong in creating process definition: %s", e.getMessage()));
}
}

/**
* Releases a process definition in DolphinScheduler.
*/
public boolean releaseProcessDef(String processDefUrl, long processDefCode, String token, String status) {
try {
return DolphinScheduleUtils.releaseProcessDef(processDefUrl, processDefCode, token, status);
} catch (Exception e) {
LOGGER.error("Unexpected wrong in release process definition: ", e);
throw new DolphinScheduleException(UNEXPECTED_ERROR,
String.format("Unexpected wrong in release process definition: %s", e.getMessage()));
}
}

/**
* Create a schedule for process definition in DolphinScheduler.
*/
public int createScheduleForProcessDef(String url, long processDefCode, String token, ScheduleInfo scheduleInfo) {
try {
return DolphinScheduleUtils.createScheduleForProcessDef(url, processDefCode, token,
scheduleInfo);
} catch (Exception e) {
LOGGER.error("Unexpected wrong in creating schedule for process definition: ", e);
throw new DolphinScheduleException(UNEXPECTED_ERROR,
String.format("Unexpected wrong in creating schedule for process definition: %s", e.getMessage()));
}
}

/**
* Online the schedule for process definition in DolphinScheduler.
*/
public boolean onlineScheduleForProcessDef(String scheduleUrl, int scheduleId, String token) {
try {
return DolphinScheduleUtils.onlineScheduleForProcessDef(scheduleUrl, scheduleId, token);
} catch (Exception e) {
LOGGER.error("Unexpected wrong in online process definition: ", e);
throw new DolphinScheduleException(UNEXPECTED_ERROR,
String.format("Unexpected wrong in online process definition: %s", e.getMessage()));
}
}

/**
* Delete the process definition in DolphinScheduler.
*/
public void deleteProcessDef(String processDefUrl, String token, long processDefCode) {
try {
DolphinScheduleUtils.delete(processDefUrl, token, processDefCode);
} catch (Exception e) {
LOGGER.error("Unexpected wrong in deleting process definition: ", e);
throw new DolphinScheduleException(UNEXPECTED_ERROR,
String.format("Unexpected wrong in deleting process definition: %s", e.getMessage()));
}
}

/**
* Delete the project in DolphinScheduler.
*/
public void deleteProject(String projectBaseUrl, String token, long projectCode) {
try {
DolphinScheduleUtils.delete(projectBaseUrl, token, projectCode);
} catch (Exception e) {
LOGGER.error("Unexpected wrong in deleting project definition: ", e);
throw new DolphinScheduleException(UNEXPECTED_ERROR,
String.format("Unexpected wrong in deleting project definition: %s", e.getMessage()));
}
}

}
Loading

0 comments on commit c713678

Please sign in to comment.