Skip to content

Commit

Permalink
src: stream: pipeline: Fix stream restart logic, use Interval::tick i…
Browse files Browse the repository at this point in the history
…nstead of sleep
  • Loading branch information
joaoantoniocardoso authored and patrickelectric committed Mar 15, 2024
1 parent 98223ac commit 2b1dc02
Showing 1 changed file with 35 additions and 32 deletions.
67 changes: 35 additions & 32 deletions src/stream/pipeline/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,41 +166,44 @@ impl PipelineRunner {
let mut lost_timestamps: usize = 0;
let max_lost_timestamps: usize = 30;

loop {
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;

if let Some(reason) = finish.recv().await {
return Err(anyhow!("{reason:?}"));
}
let mut period = tokio::time::interval(tokio::time::Duration::from_millis(100));

if !allow_block {
// Restart pipeline if pipeline position do not change,
// occur if usb connection is lost and gst do not detect it
let pipeline = pipeline_weak
.upgrade()
.context("Unable to access the Pipeline from its weak reference")?;

if let Some(position) = pipeline.query_position::<gst::ClockTime>() {
previous_position = match previous_position {
Some(current_previous_position) => {
if current_previous_position.nseconds() != 0
&& current_previous_position == position
{
lost_timestamps += 1;
} else if lost_timestamps > 0 {
// We are back in track, erase lost timestamps
warn!("Position normalized, but didn't changed for {lost_timestamps} timestamps");
lost_timestamps = 0;
}
if lost_timestamps == 1 {
warn!("Position did not change for {lost_timestamps}, silently tracking until {max_lost_timestamps}, then the stream will be recreated");
} else if lost_timestamps > max_lost_timestamps {
return Err(anyhow!("Pipeline lost too many timestamps (max. was {max_lost_timestamps})"));
loop {
tokio::select! {
reason = finish.recv() => {
return Err(anyhow!("{reason:?}"));
}
_ = period.tick() => {
if !allow_block {
// Restart pipeline if pipeline position do not change,
// occur if usb connection is lost and gst do not detect it
let pipeline = pipeline_weak
.upgrade()
.context("Unable to access the Pipeline from its weak reference")?;

if let Some(position) = pipeline.query_position::<gst::ClockTime>() {
previous_position = match previous_position {
Some(current_previous_position) => {
if current_previous_position.nseconds() != 0
&& current_previous_position == position
{
lost_timestamps += 1;
} else if lost_timestamps > 0 {
// We are back in track, erase lost timestamps
warn!("Position normalized, but didn't changed for {lost_timestamps} timestamps");
lost_timestamps = 0;
}
if lost_timestamps == 1 {
warn!("Position did not change for {lost_timestamps}, silently tracking until {max_lost_timestamps}, then the stream will be recreated");
} else if lost_timestamps > max_lost_timestamps {
return Err(anyhow!("Pipeline lost too many timestamps (max. was {max_lost_timestamps})"));
}

Some(position)
}
None => Some(position),
}

Some(position)
}
None => Some(position),
}
}
}
Expand Down

0 comments on commit 2b1dc02

Please sign in to comment.