Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TEZ-4547: Add Tez AM JobID to the JobConf #339

Merged
merged 8 commits into from
Aug 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -78,4 +78,6 @@ public interface OutputCommitterContext {
*/
public int getVertexIndex();

public int getDagIdentifier();

}
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,16 @@ public class OutputCommitterContextImpl implements OutputCommitterContext {
private final String dagName;
private final String vertexName;
private final int vertexIdx;
private final int dagIdentifier;
private final RootInputLeafOutput<OutputDescriptor, OutputCommitterDescriptor> output;

public OutputCommitterContextImpl(ApplicationId applicationId,
int dagAttemptNumber,
String dagName,
String vertexName,
RootInputLeafOutput<OutputDescriptor, OutputCommitterDescriptor> output,
int vertexIdx) {
int dagAttemptNumber,
VenkatSNarayanan marked this conversation as resolved.
Show resolved Hide resolved
String dagName,
String vertexName,
int dagIdentifier,
int vertexIdx,
RootInputLeafOutput<OutputDescriptor, OutputCommitterDescriptor> output) {
Objects.requireNonNull(applicationId, "applicationId is null");
Objects.requireNonNull(dagName, "dagName is null");
Objects.requireNonNull(vertexName, "vertexName is null");
Expand All @@ -52,6 +54,7 @@ public OutputCommitterContextImpl(ApplicationId applicationId,
this.vertexName = vertexName;
this.output = output;
this.vertexIdx = vertexIdx;
this.dagIdentifier = dagIdentifier;
}

@Override
Expand Down Expand Up @@ -94,4 +97,9 @@ public int getVertexIndex() {
return vertexIdx;
}

@Override
public int getDagIdentifier() {
return dagIdentifier;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -2560,8 +2560,10 @@ public Void run() throws Exception {
appContext.getApplicationAttemptId().getAttemptId(),
appContext.getCurrentDAG().getName(),
vertexName,
od,
vertexId.getId());
appContext.getCurrentDAG().getID().getId(),
vertexId.getId(),
od
);
OutputCommitter outputCommitter = ReflectionUtils
.createClazzInstance(od.getControllerDescriptor().getClassName(),
new Class[]{OutputCommitterContext.class},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.tez.mapreduce.committer;

import org.apache.tez.mapreduce.common.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience.Public;
Expand Down Expand Up @@ -78,6 +79,7 @@ public void initialize() throws IOException {
jobConf.getCredentials().mergeAll(UserGroupInformation.getCurrentUser().getCredentials());
jobConf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID,
getContext().getDAGAttemptNumber());
jobConf.set(MRJobConfig.JOB_COMMITTER_UUID, Utils.getDAGID(getContext()));

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this unique across all jobs which may be writing to a table, even from other processes?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. This ID is unique to a DAG + attempt number - so if we have some other job, it'll have a different application ID component, while if an attempt fails and the DAG retries, the attempt number will be different.

jobConf.setInt(MRJobConfig.VERTEX_ID, getContext().getVertexIndex());
committer = getOutputCommitter(getContext());
jobContext = getJobContextFromVertexContext(getContext());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,10 @@
import org.apache.hadoop.fs.FileSystem.Statistics;
import org.apache.hadoop.mapred.Counters.Counter;
import org.apache.tez.common.counters.TezCounter;
import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.mapreduce.hadoop.mapred.MRCounters;
import org.apache.tez.runtime.api.OutputCommitterContext;
import org.apache.tez.runtime.api.OutputContext;

@Private
public final class Utils {
Expand Down Expand Up @@ -63,5 +66,12 @@ public static Counter getMRCounter(TezCounter tezCounter) {
Objects.requireNonNull(tezCounter);
return new MRCounters.MRCounter(tezCounter);
}


public static String getDAGID(OutputCommitterContext context) {
return TezDAGID.getInstance(context.getApplicationId(), context.getDagIdentifier()).toString();
}

public static String getDAGID(OutputContext context) {
return TezDAGID.getInstance(context.getApplicationId(), context.getDagIdentifier()).toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,11 @@ public interface MRJobConfig {

public static final String CACHE_ARCHIVES_VISIBILITIES = "mapreduce.job.cache.archives.visibilities";

/**
* Used by committers to set a job-wide UUID.
*/
public static final String JOB_COMMITTER_UUID = "job.committer.uuid";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not the setting used by s3 committer right? How will it work ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a corresponding change I have in my Hadoop code where it will consult this property similar to how it consults the property Spark sets for this purpose.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so you can confirm this will work with job.committer.uuid, right?
can you link that point in hadoop code for later reference?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@VenkatSNarayanan May i ask that if the hadoop s3 committer can work with Hive+Tez after this change?
IMO, s3/magic committer can avoid some operation like rename on s3, which can speed up/improve hive job.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@abstractdog I haven't publicly posted the Hadoop PR yet, but the change I have is to check for this property around here: https://github.com/apache/hadoop/blob/51cb858cc8c23d873d4adfc21de5f2c1c22d346f/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/AbstractS3ACommitter.java#L1372 similar to how the Spark property is checked. I have tested these changes together already alongside my Hive implementation.

@zhangbutao There are some corresponding changes to Hadoop and Hive that also need to be merged which I have. Once all 3 PRs(Tez, Hadoop and Hive have been merged), then the magic committer will be usable with Hive.

Copy link
Contributor

@abstractdog abstractdog Jul 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this go into Tez 0.10.4? if so, it would be good to have it in 1-2 weeks, just FYI, regarding planning the hadoop change

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

0.10.4 would be ideal. In that case, let me loop in the Hadoop folks to see if they have any strong opinions about this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@VenkatSNarayanan https://issues.apache.org/jira/browse/HIVE-16295 I found a old ticket about integrating s3a committer, and it seems that supporting this needs lots of Hive code change.
I am not sure if you have done similar change in Hive to support the MagicS3GuardCommitter.
Anyway, I think it is very good to support this committer in Hive&Tez. Look forward to your further work.
Thanks.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://issues.apache.org/jira/browse/HADOOP-19091 I just saw your Hadoop ticket, and Hive change patch is also there too. Maybe you need create a PR against Hive latest master branch once you have done preparatory work. :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There haven't been any objections from the Hadoop folks, I think it should be safe to go ahead with the patch as it is @abstractdog .


public static final String FILEOUTPUTCOMMITTER_ALGORITHM_VERSION = "mapreduce.fileoutputcommitter.algorithm.version";

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.tez.common.Preconditions;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
import org.apache.tez.mapreduce.common.Utils;
import org.apache.tez.runtime.library.api.IOInterruptedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -413,6 +414,7 @@ protected List<Event> initializeBase() throws IOException, InterruptedException
}
jobConf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID,
getContext().getDAGAttemptNumber());
jobConf.set(MRJobConfig.JOB_COMMITTER_UUID, Utils.getDAGID(getContext()));
TaskAttemptID taskAttemptId = org.apache.tez.mapreduce.hadoop.mapreduce.TaskAttemptContextImpl
.createMockTaskAttemptID(getContext().getApplicationId().getClusterTimestamp(),
getContext().getTaskVertexIndex(), getContext().getApplicationId().getId(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,9 @@
import org.apache.tez.hadoop.shim.DefaultHadoopShim;
import org.apache.tez.mapreduce.TestUmbilical;
import org.apache.tez.mapreduce.TezTestUtils;
import org.apache.tez.mapreduce.common.Utils;
import org.apache.tez.mapreduce.hadoop.MRConfig;
import org.apache.tez.mapreduce.hadoop.MRJobConfig;
import org.apache.tez.runtime.LogicalIOProcessorRuntimeTask;
import org.apache.tez.runtime.api.OutputContext;
import org.apache.tez.runtime.api.ProcessorContext;
Expand Down Expand Up @@ -131,6 +133,26 @@ public void testMergeConfig() throws Exception {
assertEquals("base-value", mergedConf.get("base-key"));
}

@Test
public void testJobUUIDSet() throws Exception {
Configuration conf = new Configuration();
conf.setBoolean(MRConfig.IS_MAP_PROCESSOR, true);
DataSinkDescriptor dataSink = MROutput
.createConfigBuilder(conf, TextOutputFormat.class,
tmpDir.getPath())
.build();

OutputContext outputContext = createMockOutputContext(dataSink.getOutputDescriptor().getUserPayload(),
new Configuration(false));
MROutput output = new MROutput(outputContext, 2);
output.initialize();
String invalidDAGID = "invalid default";
String dagID = output.jobConf.get(MRJobConfig.JOB_COMMITTER_UUID, invalidDAGID);
assertNotEquals(dagID, invalidDAGID);
assertNotEquals(output.jobConf.get(org.apache.hadoop.mapred.JobContext.TASK_ATTEMPT_ID), dagID);
assertEquals(dagID, Utils.getDAGID(outputContext));
}

@Test(timeout = 5000)
public void testOldAPI_TextOutputFormat() throws Exception {
Configuration conf = new Configuration();
Expand Down
Loading