-
Notifications
You must be signed in to change notification settings - Fork 39
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement soft corcking #586
Conversation
e4614a5
to
d763bb5
Compare
d763bb5
to
858c2bf
Compare
Co-authored-by: Łukasz Kita <[email protected]>
Co-authored-by: Łukasz Kita <[email protected]>
Process.sleep(100 * RedemandingSource.sleep_time()) | ||
|
||
Testing.Pipeline.execute_actions(pipeline, notify_child: {:sink, :pause_auto_demand}) | ||
|
||
assert_pipeline_notified(pipeline, :sink, {:buff_no, buff_no}) | ||
# sink should receive aropund 100 buffers, but the boundary is set to 70, in case of eg. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
# sink should receive aropund 100 buffers, but the boundary is set to 70, in case of eg. | |
# sink should receive around 100 buffers, but the boundary is set to 70, in case of eg. |
def handle_end_of_stream(_pad, ctx, state) do | ||
actions = | ||
Map.values(ctx.pads) | ||
|> Enum.filter(&(&1.direction == :output and not &1.end_of_stream?)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
AFAIK, there is only one pad with direction:output
, something might be wrong with that action generation.
pad_upperbound = Map.get(state.pads_upperbounds, pad, :infinity) | ||
|
||
actions = | ||
if pad_counter > pad_upperbound and not ctx.pads[pad].auto_demand_paused? do |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that pad_counter > pad_upperbound
comparison is quite tricky (since we are using :infinity
atom) :D
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks great!
if old_value == paused? do | ||
operation = if paused?, do: "pause", else: "resume" | ||
|
||
Membrane.Logger.debug( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Membrane.Logger.debug( | |
Membrane.Logger.debug_verbose( |
if ctx.pads.output.end_of_stream? do | ||
{[], state} | ||
else | ||
{[end_of_stream: :output], state} | ||
end |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If it's ok that the eos will be send on funnel's output right after first eos from input is received, we can leave it that way
Related Jira ticket: https://membraneframework.atlassian.net/browse/MC-213