Skip to content

Commit

Permalink
src: stream: Move Stream fields from std RwLock to tokio RwLock (non-…
Browse files Browse the repository at this point in the history
…blocking async)
  • Loading branch information
joaoantoniocardoso authored and patrickelectric committed Apr 26, 2024
1 parent e827ff2 commit 5cf1f0f
Showing 1 changed file with 43 additions and 32 deletions.
75 changes: 43 additions & 32 deletions src/stream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ pub mod sink;
pub mod types;
pub mod webrtc;

use std::sync::{Arc, RwLock};
use std::sync::Arc;

use tokio::sync::RwLock;

use crate::mavlink::mavlink_camera::MavlinkCamera;
use crate::video::types::{VideoEncodeType, VideoSourceType};
Expand Down Expand Up @@ -102,20 +104,15 @@ impl Stream {
loop {
period.tick().await;

if !state
.read()
.map_err(|e| anyhow::Error::msg(e.to_string()))?
.as_ref()
.is_some_and(|state| {
state
.pipeline
.inner_state_as_ref()
.pipeline_runner
.is_running()
})
{
if !state.read().await.as_ref().is_some_and(|state| {
state
.pipeline
.inner_state_as_ref()
.pipeline_runner
.is_running()
}) {
// First, drop the current state
if let Some(state) = state.write().unwrap().take() {
if let Some(state) = state.write().await.take() {
drop(state);
}

Expand Down Expand Up @@ -188,10 +185,10 @@ impl Stream {
};

// Try to recreate the stream
state.write().unwrap().replace(new_state);
state.write().await.replace(new_state);
}

if *terminated.read().unwrap() {
if *terminated.read().await {
debug!("Ending stream {pipeline_id:?}.");
break;
}
Expand All @@ -202,25 +199,39 @@ impl Stream {
}

impl Drop for Stream {
#[instrument(level = "debug", skip(self), fields(pipeline_id = self.state.read().unwrap().as_ref().map(|state| state.pipeline_id.clone().to_string())))]
#[instrument(level = "debug", skip(self))]
fn drop(&mut self) {
debug!("Dropping Stream...");

*self.terminated.write().unwrap() = true;

if let Some(handle) = self.watcher_handle.take() {
if !handle.is_finished() {
handle.abort();
tokio::spawn(async move {
let _ = handle.await;
debug!("PipelineWatcher task aborted");
});
} else {
debug!("PipelineWatcher task nicely finished!");
}
let state = self.state.clone();
let terminated = self.terminated.clone();

std::thread::Builder::new()
.name("Stream::Drop".to_string())
.spawn(move || {
let pipeline_id = state
.blocking_read()
.as_ref()
.map(|state| state.pipeline_id.clone().to_string());

debug!(pipeline_id, "Dropping Stream...");

*terminated.blocking_write() = true;

if !handle.is_finished() {
handle.abort();

// futures::executor::block_on(async move {
// let _ = handle.await;
// debug!(pipeline_id, "PipelineWatcher task aborted");
// });
} else {
debug!(pipeline_id, "PipelineWatcher task nicely finished!");
}
})
.unwrap()
.join()
.unwrap()
}

debug!("Stream Dropped!");
}
}

Expand Down

0 comments on commit 5cf1f0f

Please sign in to comment.