-
Notifications
You must be signed in to change notification settings - Fork 425
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
TEZ-4547: Add Tez AM JobID to the JobConf #339
TEZ-4547: Add Tez AM JobID to the JobConf #339
Conversation
Some committers require a job-wide UUID to function correctly. Adding the AM JobID to the JobConf will allow applications to pass that to the committers that need it.
25cae6d
to
9040a2f
Compare
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
@@ -417,6 +418,7 @@ protected List<Event> initializeBase() throws IOException, InterruptedException | |||
.createMockTaskAttemptID(getContext().getApplicationId().getClusterTimestamp(), | |||
getContext().getTaskVertexIndex(), getContext().getApplicationId().getId(), | |||
getContext().getTaskIndex(), getContext().getTaskAttemptNumber(), isMapperOutput); | |||
jobConf.set(MRJobConfig.MR_PARENT_JOB_ID, new JobID(String.valueOf(getContext().getApplicationId().getClusterTimestamp()), getContext().getApplicationId().getId()).toString()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move this to line above TaskAttemptID taskAttemptId =
assertNotEquals(parentJobID,invalidJobID); | ||
assertNotEquals(output.jobConf.get(org.apache.hadoop.mapred.JobContext.TASK_ATTEMPT_ID),parentJobID); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix code formatting. space after ,
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If my understanding is correct, Hive/Pig would use the value from mapreduce.parent.job.id
to set the correct committer UUID right?
@@ -119,6 +119,7 @@ public void abortOutput(VertexStatus.State finalState) throws IOException { | |||
|| jobConf.getBoolean("mapred.mapper.new-api", false)) { | |||
newApiCommitter = true; | |||
} | |||
jobConf.set(MRJobConfig.MR_PARENT_JOB_ID, new org.apache.hadoop.mapred.JobID(String.valueOf(getContext().getApplicationId().getClusterTimestamp()), getContext().getApplicationId().getId()).toString()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we have String.valueOf(getContext().getApplicationId().getClusterTimestamp()), getContext().getApplicationId().getId()).toString(
inside a method and re-use it in MROutput.java as well ?
Yes, that was the plan. The property name was just chosen arbitrarily so I could put the PR up, any suggestions for a better one are welcome. |
This commit also adds the DAG identifier to the job UUID to ensure that multiple jobs within the same session will be assigned different UUIDs.
This comment was marked as outdated.
This comment was marked as outdated.
tez-mapreduce/src/main/java/org/apache/tez/mapreduce/committer/MROutputCommitter.java
Outdated
Show resolved
Hide resolved
tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/Utils.java
Outdated
Show resolved
Hide resolved
tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRJobConfig.java
Outdated
Show resolved
Hide resolved
tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
Outdated
Show resolved
Hide resolved
This comment was marked as outdated.
This comment was marked as outdated.
@shameersss1 We could actually just set fs.s3a.committer.uuid directly instead of the indirection through the other setting. |
Switch UUID property name to the one required by S3A committers.
This comment was marked as outdated.
This comment was marked as outdated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM +1
@abstractdog - Could you please review the same ? |
Refactors the implementation to reuse Tez's DAGID type instead of hand-rolling our own.
This comment was marked as outdated.
This comment was marked as outdated.
@abstractdog @shameersss1 Is there anything else needed? |
tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRJobConfig.java
Outdated
Show resolved
Hide resolved
tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/OutputCommitterContextImpl.java
Outdated
Show resolved
Hide resolved
This comment was marked as outdated.
This comment was marked as outdated.
tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/OutputCommitterContextImpl.java
Show resolved
Hide resolved
tez-mapreduce/src/main/java/org/apache/tez/mapreduce/committer/MROutputCommitter.java
Outdated
Show resolved
Hide resolved
left minor comments on this @VenkatSNarayanan , other than that, this looks good to me |
🎊 +1 overall
This message was automatically generated. |
one more thing @VenkatSNarayanan , please address checkstyle comments where applicable, thanks! |
💔 -1 overall
This message was automatically generated. |
/** | ||
* Used by committers to set a job-wide UUID. | ||
*/ | ||
public static final String JOB_COMMITTER_UUID = "job.committer.uuid"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not the setting used by s3 committer right? How will it work ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a corresponding change I have in my Hadoop code where it will consult this property similar to how it consults the property Spark sets for this purpose.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so you can confirm this will work with job.committer.uuid, right?
can you link that point in hadoop code for later reference?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@VenkatSNarayanan May i ask that if the hadoop s3 committer can work with Hive+Tez after this change?
IMO, s3/magic committer can avoid some operation like rename on s3, which can speed up/improve hive job.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@abstractdog I haven't publicly posted the Hadoop PR yet, but the change I have is to check for this property around here: https://github.com/apache/hadoop/blob/51cb858cc8c23d873d4adfc21de5f2c1c22d346f/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/AbstractS3ACommitter.java#L1372 similar to how the Spark property is checked. I have tested these changes together already alongside my Hive implementation.
@zhangbutao There are some corresponding changes to Hadoop and Hive that also need to be merged which I have. Once all 3 PRs(Tez, Hadoop and Hive have been merged), then the magic committer will be usable with Hive.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should this go into Tez 0.10.4? if so, it would be good to have it in 1-2 weeks, just FYI, regarding planning the hadoop change
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
0.10.4 would be ideal. In that case, let me loop in the Hadoop folks to see if they have any strong opinions about this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@VenkatSNarayanan https://issues.apache.org/jira/browse/HIVE-16295 I found a old ticket about integrating s3a committer, and it seems that supporting this needs lots of Hive code change.
I am not sure if you have done similar change in Hive to support the MagicS3GuardCommitter.
Anyway, I think it is very good to support this committer in Hive&Tez. Look forward to your further work.
Thanks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
https://issues.apache.org/jira/browse/HADOOP-19091 I just saw your Hadoop ticket, and Hive change patch is also there too. Maybe you need create a PR against Hive latest master branch once you have done preparatory work. :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There haven't been any objections from the Hadoop folks, I think it should be safe to go ahead with the patch as it is @abstractdog .
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
commented. all s3a committers save a json _SUCCESS file (parser in hadoop-aws for older hadoop releases, in hadoop-mapreduce more recently). you can verify job id end to end with this,
@@ -78,6 +79,7 @@ public void initialize() throws IOException { | |||
jobConf.getCredentials().mergeAll(UserGroupInformation.getCurrentUser().getCredentials()); | |||
jobConf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, | |||
getContext().getDAGAttemptNumber()); | |||
jobConf.set(MRJobConfig.JOB_COMMITTER_UUID, Utils.getDAGID(getContext())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this unique across all jobs which may be writing to a table, even from other processes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. This ID is unique to a DAG + attempt number - so if we have some other job, it'll have a different application ID component, while if an attempt fails and the DAG retries, the attempt number will be different.
ok. if you look into _SUCCESS json from an s3a or the manifest committer, then the job id is one of the root attributes, as is the source there's a java definition of this in |
guys: @steveloughran , @VenkatSNarayanan : please let me know if this PR is fine to be merged to tez (from hadoop's point of view)? I'm about to start the release process of 0.10.4 soon |
FYI: I'm about to merge this tomorrow to have this in tez 0.10.4 |
Some committers require a job-wide UUID to function correctly. Adding the AM JobID to the JobConf
will allow applications to pass that to
the committers that need it.