-
Notifications
You must be signed in to change notification settings - Fork 12
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
implement basic watermarking #24
base: master
Are you sure you want to change the base?
Conversation
R: @pabloem |
'wrong input watermark for %s. Expected %s, but got %s.' % ( | ||
ray.get(runner_execution_context.watermark_manager.get_stage_node.remote( | ||
bundle_context_manager.stage.name)), | ||
stage_node.output_watermark() == timestamp.MAX_TIMESTAMP), ( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was indeed supposed to be output_watermark
instead of input_watermark
, right?
TODO @pabloem - what is the produced watermark and why do stages differentiate |
4f0f971
to
015df34
Compare
Ran into an issue when implementing this:
Overwriting the payload here also changes the transform in the original pipeline definition. This new format is different from what the watermark manager __init__ expects. I fixed it for now by making sure that the watermark manager initializes before the RayBundleContextManager runs, How is this expected to work? Should we copy the transform before we overwrite the payload?
|
once we start parallelizing the execution of bundles, our watermarking will need to change (we need to wait for all parallel tasks of a given stage before advancing its watermark and executing the downstream stages). |
Re-enabled the watermarking code so that the watermark assertions pass again