diff --git a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/AirflowScheduleEngine.java b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/AirflowScheduleEngine.java index 792307e6ae..80d67f2281 100644 --- a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/AirflowScheduleEngine.java +++ b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/AirflowScheduleEngine.java @@ -97,8 +97,8 @@ private void initConnection() throws Exception { new AirflowConnectionGetter(airflowConfig.getConnectionId())); if (!response.isSuccess()) { AirflowConnection newConn = new AirflowConnection(airflowConfig.getConnectionId(), "HTTP", "", - airflowConfig.getHost(), airflowConfig.getInlongUsername(), SUBMIT_OFFLINE_JOB_URI, - airflowConfig.getPort(), airflowConfig.getInlongPassword(), ""); + airflowConfig.getInlongManagerHost(), airflowConfig.getInlongUsername(), SUBMIT_OFFLINE_JOB_URI, + airflowConfig.getInlongManagerPort(), airflowConfig.getInlongPassword(), ""); response = serverClient.sendRequest(new AirflowConnectionCreator(newConn)); LOGGER.info("AirflowConnection registration response: {}", response.toString()); if (!response.isSuccess()) { diff --git a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/config/AirflowConfig.java b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/config/AirflowConfig.java index 489712abe9..1a9c966a6d 100644 --- a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/config/AirflowConfig.java +++ b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/config/AirflowConfig.java @@ -18,6 +18,7 @@ package org.apache.inlong.manager.schedule.airflow.config; import org.apache.inlong.manager.client.api.ClientConfiguration; +import org.apache.inlong.manager.common.consts.InlongConstants; import org.apache.inlong.manager.schedule.airflow.AirflowServerClient; import org.apache.inlong.manager.schedule.airflow.interceptor.AirflowAuthInterceptor; import org.apache.inlong.manager.schedule.airflow.interceptor.LoggingInterceptor; @@ -27,10 +28,15 @@ import lombok.EqualsAndHashCode; import lombok.NoArgsConstructor; import okhttp3.OkHttpClient; +import org.eclipse.jetty.util.StringUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import javax.annotation.PostConstruct; + @Data @Configuration @NoArgsConstructor @@ -38,11 +44,12 @@ @EqualsAndHashCode(callSuper = true) public class AirflowConfig extends ClientConfiguration { - @Value("${schedule.engine.inlong.manager.host:127.0.0.1}") - private String host; + private static final Logger LOGGER = LoggerFactory.getLogger(AirflowConfig.class); + @Value("${schedule.engine.inlong.manager.url:127.0.0.1:8083}") + private String inlongManagerUrl; - @Value("${server.port:8083}") - private int port; + private String inlongManagerHost; + private int inlongManagerPort; @Value("${default.admin.user:admin}") private String inlongUsername; @@ -68,6 +75,22 @@ public class AirflowConfig extends ClientConfiguration { @Value("${schedule.engine.airflow.baseUrl:http://localhost:8080/}") private String baseUrl; + @PostConstruct + public void init() { + try { + if (StringUtil.isNotBlank(inlongManagerUrl)) { + String[] urlInfo = inlongManagerUrl.split(InlongConstants.COLON); + if (urlInfo.length == 2) { + this.inlongManagerHost = urlInfo[0]; + this.inlongManagerPort = Integer.parseInt(urlInfo[1]); + } + } + LOGGER.info("Init AirflowConfig success for manager url ={}", this.inlongManagerUrl); + } catch (Exception e) { + LOGGER.error("Init AirflowConfig failed for manager url={}: ", this.inlongManagerUrl, e); + } + } + @Bean public OkHttpClient okHttpClient() { return new OkHttpClient.Builder() @@ -79,6 +102,7 @@ public OkHttpClient okHttpClient() { .retryOnConnectionFailure(true) .build(); } + @Bean public AirflowServerClient airflowServerClient(OkHttpClient okHttpClient, AirflowConfig airflowConfig) { return new AirflowServerClient(okHttpClient, airflowConfig); diff --git a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleEngine.java b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleEngine.java index 5123068eab..2e63859880 100644 --- a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleEngine.java +++ b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleEngine.java @@ -17,6 +17,7 @@ package org.apache.inlong.manager.schedule.dolphinscheduler; +import org.apache.inlong.manager.common.consts.InlongConstants; import org.apache.inlong.manager.pojo.schedule.ScheduleInfo; import org.apache.inlong.manager.schedule.ScheduleEngine; import org.apache.inlong.manager.schedule.exception.DolphinScheduleException; @@ -56,11 +57,8 @@ public class DolphinScheduleEngine implements ScheduleEngine { private static final Logger LOGGER = LoggerFactory.getLogger(DolphinScheduleEngine.class); - @Value("${schedule.engine.inlong.manager.host:127.0.0.1}") - private String host; - - @Value("${server.port:8083}") - private int port; + @Value("${schedule.engine.inlong.manager.url:127.0.0.1:8083}") + private String inlongManagerUrl; @Value("${default.admin.user:admin}") private String username; @@ -86,10 +84,10 @@ public void init() { this.projectBaseUrl = dolphinUrl + DS_PROJECT_URL; } - public DolphinScheduleEngine(String host, int port, String username, String password, String dolphinUrl, + public DolphinScheduleEngine(String inlongManagerHost, int inlongManagerPort, String username, String password, + String dolphinUrl, String token) { - this.host = host; - this.port = port; + this.inlongManagerUrl = inlongManagerHost + InlongConstants.COLON + inlongManagerPort; this.username = username; this.password = password; this.dolphinUrl = dolphinUrl; @@ -161,7 +159,7 @@ public boolean handleRegister(ScheduleInfo scheduleInfo) { long offset = DolphinScheduleUtils.calculateOffset(scheduleInfo); processDefCode = dolphinScheduleOperator.createProcessDef(processDefUrl, token, processName, processDesc, taskCode, - host, port, + inlongManagerUrl, username, password, offset, scheduleInfo.getInlongGroupId()); LOGGER.info("Create process definition success, process definition code: {}", processDefCode); diff --git a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleOperator.java b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleOperator.java index e317478c64..8a7d9cbe2b 100644 --- a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleOperator.java +++ b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleOperator.java @@ -92,11 +92,11 @@ public long genTaskCode(String url, String token) { /** * Creates a process definition in DolphinScheduler. */ - public long createProcessDef(String url, String token, String name, String desc, long taskCode, String host, - int port, String username, String password, long offset, String groupId) { + public long createProcessDef(String url, String token, String name, String desc, long taskCode, + String inlongManagerUrl, String username, String password, long offset, String groupId) { try { - return DolphinScheduleUtils.createProcessDef(url, token, name, desc, taskCode, host, - port, username, password, offset, groupId); + return DolphinScheduleUtils.createProcessDef(url, token, name, desc, taskCode, inlongManagerUrl, username, + password, offset, groupId); } catch (Exception e) { LOGGER.error("Unexpected wrong in creating process definition: ", e); throw new DolphinScheduleException(UNEXPECTED_ERROR, diff --git a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleUtils.java b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleUtils.java index 5fd6dd3629..5eddb72378 100644 --- a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleUtils.java +++ b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleUtils.java @@ -296,8 +296,8 @@ public static long genTaskCode(String url, String token) { * @return The process definition code (ID) if creation is successful, or 0 if an error occurs. */ public static long createProcessDef(String url, String token, String name, String desc, - long taskCode, String host, - int port, String username, String password, long offset, String groupId) throws Exception { + long taskCode, String inlongManagerUrl, String username, String password, long offset, String groupId) + throws Exception { try { Map header = buildHeader(token); @@ -306,7 +306,7 @@ public static long createProcessDef(String url, String token, String name, Strin String taskRelationJson = MAPPER.writeValueAsString(Collections.singletonList(taskRelation)); DSTaskParams taskParams = new DSTaskParams(); - taskParams.setRawScript(buildScript(host, port, username, password, offset, groupId)); + taskParams.setRawScript(buildScript(inlongManagerUrl, username, password, offset, groupId)); DSTaskDefinition taskDefinition = new DSTaskDefinition(); taskDefinition.setCode(taskCode); @@ -774,10 +774,10 @@ private static JsonObject executeHttpRequest(String url, String method, Map