-
Notifications
You must be signed in to change notification settings - Fork 523
Report Beam Lineage from Parquet reads #5850
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
| ): SCollection[Example] = { | ||
| val job = Job.getInstance(conf) | ||
| GcsConnectorUtil.setInputPaths(sc, job, path) | ||
| val filePattern = ScioUtil.filePattern(path, params.suffix) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am surprised suffix was not used initially. Or was it intentional?
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #5850 +/- ##
==========================================
+ Coverage 61.49% 61.56% +0.06%
==========================================
Files 317 318 +1
Lines 11650 11678 +28
Branches 845 834 -11
==========================================
+ Hits 7164 7189 +25
- Misses 4486 4489 +3 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
scio-parquet/src/main/scala/com/spotify/scio/parquet/HadoopParquet.scala
Outdated
Show resolved
Hide resolved
scio-parquet/src/main/scala/com/spotify/scio/parquet/HadoopParquet.scala
Outdated
Show resolved
Hide resolved
| Some(projectionFn), | ||
| None | ||
| ) | ||
| .parDo(new LineageReportDoFn(filePattern)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't this going to result in a new node in the graph? Why are we doing this in sequence w/ the read if it's not actually using any of the read elements; we should be doing like the scio init metrics which is just its own distinct graph create impulse -> submit parquet lineage
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am trying to correspond Beam conventions and have this metric associated with the actual read transform. This way we keep transform-level lineage (which is supported in Beam)
| tracker.currentRestriction.getFrom, | ||
| if (splitGranularity == SplitGranularity.File) "end" else tracker.currentRestriction().getTo | ||
| ) | ||
| FileSystems.reportSourceLineage(file.getMetadata.resourceId()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is different than the hadoop one insofar as we get every file here, right? That seems bad/annoying for using the lineage for anything
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually file-level lineage is the default approach in Beam. Which we might not need directly. Both Lineage Metric implementations (legacy and new one) work ok with many files:
StringSet has internal truncation to 100
BoundedTrie is a data structure that stores hierarchical data very well
| override def apply(input: Void): java.lang.Boolean = true | ||
| }) | ||
|
|
||
| val withSkipClone = skipValueClone.fold(hadoop)(skip => hadoop.withSkipValueClone(skip)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
withSkipValueClone?
| import java.util.concurrent.atomic.AtomicBoolean | ||
| import scala.reflect.ClassTag | ||
|
|
||
| private[parquet] object HadoopParquet { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is just to reduce duplication or is there a functional change here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just to reduce, no new functionality, except I noticed that in some cases Scio's derived coder was not set to HadoopFormatIO transformation. Probably Beam auto-derives the same coder, but anyway it is better to set explicitly
No description provided.