Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[INLONG-11544][Manager] Optimize the configuration of the Manager schedule module #11545

Merged
merged 1 commit into from
Nov 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,8 @@ private void initConnection() throws Exception {
new AirflowConnectionGetter(airflowConfig.getConnectionId()));
if (!response.isSuccess()) {
AirflowConnection newConn = new AirflowConnection(airflowConfig.getConnectionId(), "HTTP", "",
airflowConfig.getHost(), airflowConfig.getInlongUsername(), SUBMIT_OFFLINE_JOB_URI,
airflowConfig.getPort(), airflowConfig.getInlongPassword(), "");
airflowConfig.getInlongManagerHost(), airflowConfig.getInlongUsername(), SUBMIT_OFFLINE_JOB_URI,
airflowConfig.getInlongManagerPort(), airflowConfig.getInlongPassword(), "");
response = serverClient.sendRequest(new AirflowConnectionCreator(newConn));
LOGGER.info("AirflowConnection registration response: {}", response.toString());
if (!response.isSuccess()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,22 +27,30 @@
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
import okhttp3.OkHttpClient;
import org.eclipse.jetty.util.StringUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import javax.annotation.PostConstruct;

import java.net.URL;

@Data
@Configuration
@NoArgsConstructor
@AllArgsConstructor
@EqualsAndHashCode(callSuper = true)
public class AirflowConfig extends ClientConfiguration {

@Value("${schedule.engine.inlong.manager.host:127.0.0.1}")
private String host;
private static final Logger LOGGER = LoggerFactory.getLogger(AirflowConfig.class);
@Value("${schedule.engine.inlong.manager.url:http://127.0.0.1:8083}")
private String inlongManagerUrl;

@Value("${server.port:8083}")
private int port;
private String inlongManagerHost;
private int inlongManagerPort;

@Value("${default.admin.user:admin}")
private String inlongUsername;
Expand All @@ -68,6 +76,23 @@ public class AirflowConfig extends ClientConfiguration {
@Value("${schedule.engine.airflow.baseUrl:http://localhost:8080/}")
private String baseUrl;

@PostConstruct
public void init() {
try {
if (StringUtil.isNotBlank(inlongManagerUrl)) {
URL url = new URL(inlongManagerUrl);
this.inlongManagerHost = url.getHost();
this.inlongManagerPort = url.getPort();
if (this.inlongManagerPort == -1) {
this.inlongManagerPort = 8083;
}
}
LOGGER.info("Init AirflowConfig success for manager url ={}", this.inlongManagerUrl);
} catch (Exception e) {
LOGGER.error("Init AirflowConfig failed for manager url={}: ", this.inlongManagerUrl, e);
}
}

@Bean
public OkHttpClient okHttpClient() {
return new OkHttpClient.Builder()
Expand All @@ -79,6 +104,7 @@ public OkHttpClient okHttpClient() {
.retryOnConnectionFailure(true)
.build();
}

@Bean
public AirflowServerClient airflowServerClient(OkHttpClient okHttpClient, AirflowConfig airflowConfig) {
return new AirflowServerClient(okHttpClient, airflowConfig);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,8 @@ public class DolphinScheduleEngine implements ScheduleEngine {

private static final Logger LOGGER = LoggerFactory.getLogger(DolphinScheduleEngine.class);

@Value("${schedule.engine.inlong.manager.host:127.0.0.1}")
private String host;

@Value("${server.port:8083}")
private int port;
@Value("${schedule.engine.inlong.manager.url:http://127.0.0.1:8083}")
private String inlongManagerUrl;

@Value("${default.admin.user:admin}")
private String username;
Expand All @@ -86,10 +83,10 @@ public void init() {
this.projectBaseUrl = dolphinUrl + DS_PROJECT_URL;
}

public DolphinScheduleEngine(String host, int port, String username, String password, String dolphinUrl,
public DolphinScheduleEngine(String inlongManagerUrl, String username, String password,
String dolphinUrl,
String token) {
Zkplo marked this conversation as resolved.
Show resolved Hide resolved
this.host = host;
this.port = port;
this.inlongManagerUrl = inlongManagerUrl;
this.username = username;
this.password = password;
this.dolphinUrl = dolphinUrl;
Expand Down Expand Up @@ -161,8 +158,7 @@ public boolean handleRegister(ScheduleInfo scheduleInfo) {
long offset = DolphinScheduleUtils.calculateOffset(scheduleInfo);
processDefCode =
dolphinScheduleOperator.createProcessDef(processDefUrl, token, processName, processDesc, taskCode,
host, port,
username, password, offset, scheduleInfo.getInlongGroupId());
inlongManagerUrl, username, password, offset, scheduleInfo.getInlongGroupId());
LOGGER.info("Create process definition success, process definition code: {}", processDefCode);

if (dolphinScheduleOperator.releaseProcessDef(processDefUrl, processDefCode, token, DS_ONLINE_STATE)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,11 +92,11 @@ public long genTaskCode(String url, String token) {
/**
* Creates a process definition in DolphinScheduler.
*/
public long createProcessDef(String url, String token, String name, String desc, long taskCode, String host,
int port, String username, String password, long offset, String groupId) {
public long createProcessDef(String url, String token, String name, String desc, long taskCode,
String inlongManagerUrl, String username, String password, long offset, String groupId) {
try {
return DolphinScheduleUtils.createProcessDef(url, token, name, desc, taskCode, host,
port, username, password, offset, groupId);
return DolphinScheduleUtils.createProcessDef(url, token, name, desc, taskCode, inlongManagerUrl, username,
password, offset, groupId);
} catch (Exception e) {
LOGGER.error("Unexpected wrong in creating process definition: ", e);
throw new DolphinScheduleException(UNEXPECTED_ERROR,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -282,22 +282,21 @@ public static long genTaskCode(String url, String token) {
/**
* Creates a process definition in DolphinScheduler.
*
* @param url The base URL of the DolphinScheduler API.
* @param token The authentication token to be used in the request header.
* @param name The name of the process definition.
* @param desc The description of the process definition.
* @param taskCode The task code to be associated with this process definition.
* @param host The host where the process will run.
* @param port The port where the process will run.
* @param username The username for authentication.
* @param password The password for authentication.
* @param offset The offset for the scheduling.
* @param groupId The group ID of the process.
* @param url The base URL of the DolphinScheduler API.
* @param token The authentication token to be used in the request header.
* @param name The name of the process definition.
* @param desc The description of the process definition.
* @param taskCode The task code to be associated with this process definition.
* @param inlongManagerUrl The host where the process will run.
* @param username The username for authentication.
* @param password The password for authentication.
* @param offset The offset for the scheduling.
* @param groupId The group ID of the process.
* @return The process definition code (ID) if creation is successful, or 0 if an error occurs.
*/
public static long createProcessDef(String url, String token, String name, String desc,
long taskCode, String host,
int port, String username, String password, long offset, String groupId) throws Exception {
long taskCode, String inlongManagerUrl, String username, String password, long offset, String groupId)
throws Exception {
try {
Map<String, String> header = buildHeader(token);

Expand All @@ -306,7 +305,7 @@ public static long createProcessDef(String url, String token, String name, Strin
String taskRelationJson = MAPPER.writeValueAsString(Collections.singletonList(taskRelation));

DSTaskParams taskParams = new DSTaskParams();
taskParams.setRawScript(buildScript(host, port, username, password, offset, groupId));
taskParams.setRawScript(buildScript(inlongManagerUrl, username, password, offset, groupId));

DSTaskDefinition taskDefinition = new DSTaskDefinition();
taskDefinition.setCode(taskCode);
Expand Down Expand Up @@ -774,10 +773,10 @@ private static JsonObject executeHttpRequest(String url, String method, Map<Stri
* When process definition schedule run, the shell node run,
* Call back in inlong, sending a request with parameters required
*/
private static String buildScript(String host, int port, String username, String password, long offset,
private static String buildScript(String inlongManagerUrl, String username, String password, long offset,
String groupId) {
LOGGER.info("build script for host: {}, port: {}, username: {}, password: {}, offset: {}, groupId: {}", host,
port, username, password, offset, groupId);
LOGGER.info("build script for Inlong Manager Url: {}, username: {}, password: {}, offset: {}, groupId: {}",
inlongManagerUrl, username, password, offset, groupId);
return "#!/bin/bash\n\n" +

// Get current timestamp
Expand All @@ -789,7 +788,7 @@ private static String buildScript(String host, int port, String username, String

// Set URL
"# Set URL and HTTP method\n" +
"url=\"http://" + host + ":" + port + SHELL_REQUEST_API +
"url=\"" + inlongManagerUrl + SHELL_REQUEST_API +
"?username=" + username + "&password=" + password + "\"\n" +
"echo \"get url: ${url}\"\n" +

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,8 @@ dirty.log.clean.interval.minutes=5
dirty.log.retention.minutes=10
dirty.log.db.table=inlong_iceberg::dirty_data_achive_iceberg

# Please confirm it is the actual address of manager
schedule.engine.inlong.manager.host=
# Inlong Manager URL accessible by the scheduler
schedule.engine.inlong.manager.url=http://127.0.0.1:8083

# DolphinScheduler related config
schedule.engine.dolphinscheduler.url=
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,8 @@ dirty.log.clean.interval.minutes=5
dirty.log.retention.minutes=10
dirty.log.db.table=inlong_iceberg::dirty_data_achive_iceberg

# Please confirm it is the actual address of manager
schedule.engine.inlong.manager.host=
# Inlong Manager URL accessible by the scheduler
schedule.engine.inlong.manager.url=http://127.0.0.1:8083

# DolphinScheduler related config
schedule.engine.dolphinscheduler.url=
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,8 @@ dirty.log.clean.interval.minutes=5
dirty.log.retention.minutes=10
dirty.log.db.table=inlong_iceberg::dirty_data_achive_iceberg

# Please confirm it is the actual address of manager
schedule.engine.inlong.manager.host=
# Inlong Manager URL accessible by the scheduler
schedule.engine.inlong.manager.url=http://127.0.0.1:8083

# DolphinScheduler related config
schedule.engine.dolphinscheduler.url=
Expand Down
Loading