Skip to content

Commit

Permalink
[INLONG-11544][Manager] Optimize the configuration of the Manager sch…
Browse files Browse the repository at this point in the history
…edule module
  • Loading branch information
ZKpLo committed Nov 26, 2024
1 parent ace3362 commit 19b1a36
Show file tree
Hide file tree
Showing 8 changed files with 54 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,8 @@ private void initConnection() throws Exception {
new AirflowConnectionGetter(airflowConfig.getConnectionId()));
if (!response.isSuccess()) {
AirflowConnection newConn = new AirflowConnection(airflowConfig.getConnectionId(), "HTTP", "",
airflowConfig.getHost(), airflowConfig.getInlongUsername(), SUBMIT_OFFLINE_JOB_URI,
airflowConfig.getPort(), airflowConfig.getInlongPassword(), "");
airflowConfig.getInlongManagerHost(), airflowConfig.getInlongUsername(), SUBMIT_OFFLINE_JOB_URI,
airflowConfig.getInlongManagerPort(), airflowConfig.getInlongPassword(), "");
response = serverClient.sendRequest(new AirflowConnectionCreator(newConn));
LOGGER.info("AirflowConnection registration response: {}", response.toString());
if (!response.isSuccess()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.inlong.manager.schedule.airflow.config;

import org.apache.inlong.manager.client.api.ClientConfiguration;
import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.schedule.airflow.AirflowServerClient;
import org.apache.inlong.manager.schedule.airflow.interceptor.AirflowAuthInterceptor;
import org.apache.inlong.manager.schedule.airflow.interceptor.LoggingInterceptor;
Expand All @@ -27,22 +28,28 @@
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
import okhttp3.OkHttpClient;
import org.eclipse.jetty.util.StringUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import javax.annotation.PostConstruct;

@Data
@Configuration
@NoArgsConstructor
@AllArgsConstructor
@EqualsAndHashCode(callSuper = true)
public class AirflowConfig extends ClientConfiguration {

@Value("${schedule.engine.inlong.manager.host:127.0.0.1}")
private String host;
private static final Logger LOGGER = LoggerFactory.getLogger(AirflowConfig.class);
@Value("${schedule.engine.inlong.manager.url:127.0.0.1:8083}")
private String inlongManagerUrl;

@Value("${server.port:8083}")
private int port;
private String inlongManagerHost;
private int inlongManagerPort;

@Value("${default.admin.user:admin}")
private String inlongUsername;
Expand All @@ -68,6 +75,22 @@ public class AirflowConfig extends ClientConfiguration {
@Value("${schedule.engine.airflow.baseUrl:http://localhost:8080/}")
private String baseUrl;

@PostConstruct
public void init() {
try {
if (StringUtil.isNotBlank(inlongManagerUrl)) {
String[] urlInfo = inlongManagerUrl.split(InlongConstants.COLON);
if (urlInfo.length == 2) {
this.inlongManagerHost = urlInfo[0];
this.inlongManagerPort = Integer.parseInt(urlInfo[1]);
}
}
LOGGER.info("Init AirflowConfig success for manager url ={}", this.inlongManagerUrl);
} catch (Exception e) {
LOGGER.error("Init AirflowConfig failed for manager url={}: ", this.inlongManagerUrl, e);
}
}

@Bean
public OkHttpClient okHttpClient() {
return new OkHttpClient.Builder()
Expand All @@ -79,6 +102,7 @@ public OkHttpClient okHttpClient() {
.retryOnConnectionFailure(true)
.build();
}

@Bean
public AirflowServerClient airflowServerClient(OkHttpClient okHttpClient, AirflowConfig airflowConfig) {
return new AirflowServerClient(okHttpClient, airflowConfig);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.inlong.manager.schedule.dolphinscheduler;

import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.pojo.schedule.ScheduleInfo;
import org.apache.inlong.manager.schedule.ScheduleEngine;
import org.apache.inlong.manager.schedule.exception.DolphinScheduleException;
Expand Down Expand Up @@ -56,11 +57,8 @@ public class DolphinScheduleEngine implements ScheduleEngine {

private static final Logger LOGGER = LoggerFactory.getLogger(DolphinScheduleEngine.class);

@Value("${schedule.engine.inlong.manager.host:127.0.0.1}")
private String host;

@Value("${server.port:8083}")
private int port;
@Value("${schedule.engine.inlong.manager.url:127.0.0.1:8083}")
private String inlongManagerUrl;

@Value("${default.admin.user:admin}")
private String username;
Expand All @@ -86,10 +84,10 @@ public void init() {
this.projectBaseUrl = dolphinUrl + DS_PROJECT_URL;
}

public DolphinScheduleEngine(String host, int port, String username, String password, String dolphinUrl,
public DolphinScheduleEngine(String inlongManagerHost, int inlongManagerPort, String username, String password,
String dolphinUrl,
String token) {
this.host = host;
this.port = port;
this.inlongManagerUrl = inlongManagerHost + InlongConstants.COLON + inlongManagerPort;
this.username = username;
this.password = password;
this.dolphinUrl = dolphinUrl;
Expand Down Expand Up @@ -161,7 +159,7 @@ public boolean handleRegister(ScheduleInfo scheduleInfo) {
long offset = DolphinScheduleUtils.calculateOffset(scheduleInfo);
processDefCode =
dolphinScheduleOperator.createProcessDef(processDefUrl, token, processName, processDesc, taskCode,
host, port,
inlongManagerUrl,
username, password, offset, scheduleInfo.getInlongGroupId());
LOGGER.info("Create process definition success, process definition code: {}", processDefCode);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,11 +92,11 @@ public long genTaskCode(String url, String token) {
/**
* Creates a process definition in DolphinScheduler.
*/
public long createProcessDef(String url, String token, String name, String desc, long taskCode, String host,
int port, String username, String password, long offset, String groupId) {
public long createProcessDef(String url, String token, String name, String desc, long taskCode,
String inlongManagerUrl, String username, String password, long offset, String groupId) {
try {
return DolphinScheduleUtils.createProcessDef(url, token, name, desc, taskCode, host,
port, username, password, offset, groupId);
return DolphinScheduleUtils.createProcessDef(url, token, name, desc, taskCode, inlongManagerUrl, username,
password, offset, groupId);
} catch (Exception e) {
LOGGER.error("Unexpected wrong in creating process definition: ", e);
throw new DolphinScheduleException(UNEXPECTED_ERROR,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -296,8 +296,8 @@ public static long genTaskCode(String url, String token) {
* @return The process definition code (ID) if creation is successful, or 0 if an error occurs.
*/
public static long createProcessDef(String url, String token, String name, String desc,
long taskCode, String host,
int port, String username, String password, long offset, String groupId) throws Exception {
long taskCode, String inlongManagerUrl, String username, String password, long offset, String groupId)
throws Exception {
try {
Map<String, String> header = buildHeader(token);

Expand All @@ -306,7 +306,7 @@ public static long createProcessDef(String url, String token, String name, Strin
String taskRelationJson = MAPPER.writeValueAsString(Collections.singletonList(taskRelation));

DSTaskParams taskParams = new DSTaskParams();
taskParams.setRawScript(buildScript(host, port, username, password, offset, groupId));
taskParams.setRawScript(buildScript(inlongManagerUrl, username, password, offset, groupId));

DSTaskDefinition taskDefinition = new DSTaskDefinition();
taskDefinition.setCode(taskCode);
Expand Down Expand Up @@ -774,10 +774,10 @@ private static JsonObject executeHttpRequest(String url, String method, Map<Stri
* When process definition schedule run, the shell node run,
* Call back in inlong, sending a request with parameters required
*/
private static String buildScript(String host, int port, String username, String password, long offset,
private static String buildScript(String inlongManagerUrl, String username, String password, long offset,
String groupId) {
LOGGER.info("build script for host: {}, port: {}, username: {}, password: {}, offset: {}, groupId: {}", host,
port, username, password, offset, groupId);
LOGGER.info("build script for Inlong Manager Url: {}, username: {}, password: {}, offset: {}, groupId: {}",
inlongManagerUrl, username, password, offset, groupId);
return "#!/bin/bash\n\n" +

// Get current timestamp
Expand All @@ -789,7 +789,7 @@ private static String buildScript(String host, int port, String username, String

// Set URL
"# Set URL and HTTP method\n" +
"url=\"http://" + host + ":" + port + SHELL_REQUEST_API +
"url=\"http://" + inlongManagerUrl + SHELL_REQUEST_API +
"?username=" + username + "&password=" + password + "\"\n" +
"echo \"get url: ${url}\"\n" +

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,8 @@ dirty.log.clean.interval.minutes=5
dirty.log.retention.minutes=10
dirty.log.db.table=inlong_iceberg::dirty_data_achive_iceberg

# Please confirm it is the actual address of manager
schedule.engine.inlong.manager.host=
# Inlong Manager URL accessible by the scheduler
schedule.engine.inlong.manager.url=127.0.0.1:8083

# DolphinScheduler related config
schedule.engine.dolphinscheduler.url=
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,8 @@ dirty.log.clean.interval.minutes=5
dirty.log.retention.minutes=10
dirty.log.db.table=inlong_iceberg::dirty_data_achive_iceberg

# Please confirm it is the actual address of manager
schedule.engine.inlong.manager.host=
# Inlong Manager URL accessible by the scheduler
schedule.engine.inlong.manager.url=127.0.0.1:8083

# DolphinScheduler related config
schedule.engine.dolphinscheduler.url=
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,8 @@ dirty.log.clean.interval.minutes=5
dirty.log.retention.minutes=10
dirty.log.db.table=inlong_iceberg::dirty_data_achive_iceberg

# Please confirm it is the actual address of manager
schedule.engine.inlong.manager.host=
# Inlong Manager URL accessible by the scheduler
schedule.engine.inlong.manager.url=127.0.0.1:8083

# DolphinScheduler related config
schedule.engine.dolphinscheduler.url=
Expand Down

0 comments on commit 19b1a36

Please sign in to comment.