Skip to content

Commit b523264

Browse files
authored
[FLINK-26425][yarn] Support rolling log aggregation
1 parent b47cda2 commit b523264

File tree

4 files changed

+141
-2
lines changed

4 files changed

+141
-2
lines changed

docs/layouts/shortcodes/generated/yarn_config_configuration.html

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,18 @@
152152
<td>String</td>
153153
<td>The provided usrlib directory in remote. It should be pre-uploaded and world-readable. Flink will use it to exclude the local usrlib directory(i.e. usrlib/ under the parent directory of FLINK_LIB_DIR). Unlike yarn.provided.lib.dirs, YARN will not cache it on the nodes as it is for each application. An example could be hdfs://$namenode_address/path/of/flink/usrlib</td>
154154
</tr>
155+
<tr>
156+
<td><h5>yarn.rolled-logs.exclude-pattern</h5></td>
157+
<td style="word-wrap: break-word;">(none)</td>
158+
<td>String</td>
159+
<td>Java regular expression to exclude certain log files from rolling log aggregation. Log files matching the defined exclude pattern will be ignored during aggregation. If a log file matches both the include and exclude patterns, the exclude pattern takes precedence and the file will be excluded from aggregation.</td>
160+
</tr>
161+
<tr>
162+
<td><h5>yarn.rolled-logs.include-pattern</h5></td>
163+
<td style="word-wrap: break-word;">(none)</td>
164+
<td>String</td>
165+
<td>Java regular expression to match log file names for inclusion in rolling log aggregation. This regex is used by YARN’s log aggregation mechanism to identify which log files to collect. To enable rolling aggregation in YARN, set the `yarn.nodemanager.log-aggregation.roll-monitoring-interval-seconds` property in `yarn-site.xml`. Ensure that Flink’s Log4J configuration uses FileAppender or a compatible appender that can handle file deletions during runtime. The regex pattern (e.g., `jobmanager*`) must align with the log file names defined in the Log4J configuration (e.g., `jobmanager.log`) to ensure all relevant files will be aggregated.</td>
166+
</tr>
155167
<tr>
156168
<td><h5>yarn.security.appmaster.delegation.token.services</h5></td>
157169
<td style="word-wrap: break-word;">"hadoopfs"</td>

flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@
8787
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
8888
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
8989
import org.apache.hadoop.yarn.api.records.LocalResourceType;
90+
import org.apache.hadoop.yarn.api.records.LogAggregationContext;
9091
import org.apache.hadoop.yarn.api.records.NodeReport;
9192
import org.apache.hadoop.yarn.api.records.NodeState;
9293
import org.apache.hadoop.yarn.api.records.Priority;
@@ -180,8 +181,6 @@ public class YarnClusterDescriptor implements ClusterDescriptor<ApplicationId> {
180181

181182
private final String yarnQueue;
182183

183-
private Path flinkJarPath;
184-
185184
private final Configuration flinkConfiguration;
186185

187186
private final String customName;
@@ -190,6 +189,12 @@ public class YarnClusterDescriptor implements ClusterDescriptor<ApplicationId> {
190189

191190
private final String applicationType;
192191

192+
private final String rolledLogIncludePattern;
193+
194+
private final String rolledLogExcludePattern;
195+
196+
private Path flinkJarPath;
197+
193198
private YarnConfigOptions.UserJarInclusion userJarInclusion;
194199

195200
public YarnClusterDescriptor(
@@ -221,6 +226,10 @@ public YarnClusterDescriptor(
221226
this.customName = flinkConfiguration.get(YarnConfigOptions.APPLICATION_NAME);
222227
this.applicationType = flinkConfiguration.get(YarnConfigOptions.APPLICATION_TYPE);
223228
this.nodeLabel = flinkConfiguration.get(YarnConfigOptions.NODE_LABEL);
229+
this.rolledLogIncludePattern =
230+
flinkConfiguration.get(YarnConfigOptions.ROLLED_LOGS_INCLUDE_PATTERN);
231+
this.rolledLogExcludePattern =
232+
flinkConfiguration.get(YarnConfigOptions.ROLLED_LOGS_EXCLUDE_PATTERN);
224233
}
225234

226235
/** Adapt flink env setting. */
@@ -1237,6 +1246,8 @@ private ApplicationReport startAppMaster(
12371246

12381247
setApplicationTags(appContext);
12391248

1249+
setRolledLogConfigs(appContext);
1250+
12401251
// add a hook to clean up in case deployment fails
12411252
Thread deploymentFailureHook =
12421253
new DeploymentFailureHook(yarnApplication, fileUploader.getApplicationDir());
@@ -1533,6 +1544,25 @@ private void setApplicationNodeLabel(final ApplicationSubmissionContext appConte
15331544
}
15341545
}
15351546

1547+
@VisibleForTesting
1548+
void setRolledLogConfigs(final ApplicationSubmissionContext appContext) {
1549+
LogAggregationContext ctx = null;
1550+
1551+
if (!StringUtils.isNullOrWhitespaceOnly(rolledLogIncludePattern)) {
1552+
ctx = Records.newRecord(LogAggregationContext.class);
1553+
ctx.setRolledLogsIncludePattern(rolledLogIncludePattern);
1554+
}
1555+
1556+
if (!StringUtils.isNullOrWhitespaceOnly(rolledLogExcludePattern)) {
1557+
ctx = ctx == null ? Records.newRecord(LogAggregationContext.class) : ctx;
1558+
ctx.setRolledLogsExcludePattern(rolledLogExcludePattern);
1559+
}
1560+
1561+
if (ctx != null) {
1562+
appContext.setLogAggregationContext(ctx);
1563+
}
1564+
}
1565+
15361566
/**
15371567
* Singleton object which uses reflection to determine whether the {@link
15381568
* ApplicationSubmissionContext} supports various methods which, depending on the Hadoop

flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -401,6 +401,26 @@ public class YarnConfigOptions {
401401
+ " Unlike yarn.provided.lib.dirs, YARN will not cache it on the nodes as it is for each application. An example could be "
402402
+ "hdfs://$namenode_address/path/of/flink/usrlib");
403403

404+
public static final ConfigOption<String> ROLLED_LOGS_INCLUDE_PATTERN =
405+
key("yarn.rolled-logs.include-pattern")
406+
.stringType()
407+
.noDefaultValue()
408+
.withDescription(
409+
"Java regular expression to match log file names for inclusion in rolling log aggregation."
410+
+ " This regex is used by YARN’s log aggregation mechanism to identify which log files to collect."
411+
+ " To enable rolling aggregation in YARN, set the `yarn.nodemanager.log-aggregation.roll-monitoring-interval-seconds` property in `yarn-site.xml`."
412+
+ " Ensure that Flink’s Log4J configuration uses FileAppender or a compatible appender that can handle file deletions during runtime."
413+
+ " The regex pattern (e.g., `jobmanager*`) must align with the log file names defined in the Log4J configuration (e.g., `jobmanager.log`) to ensure all relevant files will be aggregated.");
414+
415+
public static final ConfigOption<String> ROLLED_LOGS_EXCLUDE_PATTERN =
416+
key("yarn.rolled-logs.exclude-pattern")
417+
.stringType()
418+
.noDefaultValue()
419+
.withDescription(
420+
"Java regular expression to exclude certain log files from rolling log aggregation."
421+
+ " Log files matching the defined exclude pattern will be ignored during aggregation."
422+
+ " If a log file matches both the include and exclude patterns, the exclude pattern takes precedence and the file will be excluded from aggregation.");
423+
404424
@SuppressWarnings("unused")
405425
public static final ConfigOption<String> HADOOP_CONFIG_KEY =
406426
key("flink.hadoop.<key>")

flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,8 @@
4747
import org.apache.hadoop.yarn.api.ApplicationConstants;
4848
import org.apache.hadoop.yarn.api.records.ApplicationId;
4949
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
50+
import org.apache.hadoop.yarn.api.records.LogAggregationContext;
51+
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
5052
import org.apache.hadoop.yarn.client.api.YarnClient;
5153
import org.apache.hadoop.yarn.conf.YarnConfiguration;
5254
import org.apache.hadoop.yarn.util.Records;
@@ -1022,4 +1024,79 @@ public void testSetTokensForYarnAppMaster() {
10221024
fail("Should not throw exception when setting tokens for AM container.");
10231025
}
10241026
}
1027+
1028+
@Test
1029+
void testSetRolledLogConfigs() {
1030+
final String includePattern = "(jobmanager|taskmanager).*";
1031+
final String excludePattern = "(jobmanager|taskmanager)\\.(out|err)";
1032+
1033+
// Both include and exclude patterns are given.
1034+
Configuration flinkConfig = new Configuration();
1035+
flinkConfig.set(YarnConfigOptions.ROLLED_LOGS_INCLUDE_PATTERN, includePattern);
1036+
flinkConfig.set(YarnConfigOptions.ROLLED_LOGS_EXCLUDE_PATTERN, excludePattern);
1037+
1038+
try (final YarnClusterDescriptor yarnClusterDescriptor =
1039+
createYarnClusterDescriptor(flinkConfig)) {
1040+
1041+
final TestApplicationSubmissionContext testAppCtx =
1042+
new TestApplicationSubmissionContext();
1043+
yarnClusterDescriptor.setRolledLogConfigs(testAppCtx);
1044+
assertThat(testAppCtx.logAggregationContext.getRolledLogsIncludePattern())
1045+
.isEqualTo(includePattern);
1046+
assertThat(testAppCtx.logAggregationContext.getRolledLogsExcludePattern())
1047+
.isEqualTo(excludePattern);
1048+
}
1049+
1050+
// Only include pattern is given.
1051+
flinkConfig = new Configuration();
1052+
flinkConfig.set(YarnConfigOptions.ROLLED_LOGS_INCLUDE_PATTERN, includePattern);
1053+
try (final YarnClusterDescriptor yarnClusterDescriptor =
1054+
createYarnClusterDescriptor(flinkConfig)) {
1055+
1056+
final TestApplicationSubmissionContext testAppCtx =
1057+
new TestApplicationSubmissionContext();
1058+
yarnClusterDescriptor.setRolledLogConfigs(testAppCtx);
1059+
assertThat(testAppCtx.logAggregationContext.getRolledLogsIncludePattern())
1060+
.isEqualTo(includePattern);
1061+
assertThat(testAppCtx.logAggregationContext.getRolledLogsExcludePattern()).isNull();
1062+
}
1063+
1064+
// Only exclude pattern is given.
1065+
flinkConfig = new Configuration();
1066+
flinkConfig.set(YarnConfigOptions.ROLLED_LOGS_EXCLUDE_PATTERN, excludePattern);
1067+
try (final YarnClusterDescriptor yarnClusterDescriptor =
1068+
createYarnClusterDescriptor(flinkConfig)) {
1069+
1070+
final TestApplicationSubmissionContext testAppCtx =
1071+
new TestApplicationSubmissionContext();
1072+
yarnClusterDescriptor.setRolledLogConfigs(testAppCtx);
1073+
assertThat(testAppCtx.logAggregationContext.getRolledLogsIncludePattern()).isNull();
1074+
assertThat(testAppCtx.logAggregationContext.getRolledLogsExcludePattern())
1075+
.isEqualTo(excludePattern);
1076+
}
1077+
1078+
// Blank values are ignored.
1079+
flinkConfig = new Configuration();
1080+
flinkConfig.set(YarnConfigOptions.ROLLED_LOGS_INCLUDE_PATTERN, " ");
1081+
flinkConfig.set(YarnConfigOptions.ROLLED_LOGS_EXCLUDE_PATTERN, " ");
1082+
try (final YarnClusterDescriptor yarnClusterDescriptor =
1083+
createYarnClusterDescriptor(flinkConfig)) {
1084+
1085+
final TestApplicationSubmissionContext testAppCtx =
1086+
new TestApplicationSubmissionContext();
1087+
yarnClusterDescriptor.setRolledLogConfigs(testAppCtx);
1088+
assertThat(testAppCtx.logAggregationContext).isNull();
1089+
}
1090+
}
1091+
1092+
private static class TestApplicationSubmissionContext
1093+
extends ApplicationSubmissionContextPBImpl {
1094+
1095+
private LogAggregationContext logAggregationContext = null;
1096+
1097+
@Override
1098+
public void setLogAggregationContext(LogAggregationContext logAggregationContext) {
1099+
this.logAggregationContext = logAggregationContext;
1100+
}
1101+
}
10251102
}

0 commit comments

Comments
 (0)