Skip to content

Commit

Permalink
Merge pull request #779 from roboflow/feature/video_frames_sampling
Browse files Browse the repository at this point in the history
Subsampling video frame rate
  • Loading branch information
PawelPeczek-Roboflow authored Nov 6, 2024
2 parents 2692e87 + 6b15115 commit 4e922b3
Show file tree
Hide file tree
Showing 7 changed files with 153 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -134,13 +134,12 @@ def main() -> None:
workflow_specification=WORKFLOW_DEFINITION,
watchdog=watchdog,
on_prediction=workflows_sink,
source_buffer_filling_strategy=BufferFillingStrategy.DROP_OLDEST,
source_buffer_consumption_strategy=BufferConsumptionStrategy.EAGER,
workflows_parameters={
"line": [[100, 900], [1900, 900]],
"email": os.environ["EMAIL"],
"email_password": os.environ["EMAIL_PASSWORD"],
}
},
max_fps=1,
)
control_thread = Thread(target=command_thread, args=(pipeline, watchdog))
control_thread.start()
Expand Down
13 changes: 13 additions & 0 deletions docs/using_inference/inference_pipeline.md
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,19 @@ pipeline = InferencePipeline.init(

See the reference docs for the [full list of Inference Pipeline parameters](../../docs/reference/inference/core/interfaces/stream/inference_pipeline/#inference.core.interfaces.stream.inference_pipeline.InferencePipeline).

!!! Warning "Breaking change planned at the **end of Q4 2024**"

We've discovered that the behaviour of `max_fps` parameter is not in line with `inference` clients expectations
regarding processing of video files. Current implementation for vides waits before processing the next
video frame, instead droping the frames to *modulate* video FPS.

We have added a way to change this suboptimal behaviour in release `v0.26.0` - new behaviour of
`InferencePipeline` can be enabled setting environmental variable flag
`ENABLE_FRAME_DROP_ON_VIDEO_FILE_RATE_LIMITING=True`.

Please note that the new behaviour will be the default one end of Q4 2024!


## Performance

We tested the performance of Inference on a variety of hardware devices.
Expand Down
4 changes: 4 additions & 0 deletions inference/core/env.py
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,10 @@
os.getenv("VIDEO_SOURCE_MAXIMUM_ADAPTIVE_FRAMES_DROPPED_IN_ROW", "16")
)

ENABLE_FRAME_DROP_ON_VIDEO_FILE_RATE_LIMITING = str2bool(
os.getenv("ENABLE_FRAME_DROP_ON_VIDEO_FILE_RATE_LIMITING", "False")
)

NUM_CELERY_WORKERS = os.getenv("NUM_CELERY_WORKERS", 4)
CELERY_LOG_LEVEL = os.getenv("CELERY_LOG_LEVEL", "WARNING")

Expand Down
61 changes: 61 additions & 0 deletions inference/core/interfaces/camera/video_source.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import random
import time
from dataclasses import dataclass
from datetime import datetime
Expand Down Expand Up @@ -192,6 +193,7 @@ def init(
maximum_adaptive_frames_dropped_in_row: int = DEFAULT_MAXIMUM_ADAPTIVE_FRAMES_DROPPED_IN_ROW,
video_source_properties: Optional[Dict[str, float]] = None,
source_id: Optional[int] = None,
desired_fps: Optional[Union[float, int]] = None,
):
"""
This class is meant to represent abstraction over video sources - both video files and
Expand Down Expand Up @@ -314,6 +316,7 @@ def init(
minimum_adaptive_mode_samples=minimum_adaptive_mode_samples,
maximum_adaptive_frames_dropped_in_row=maximum_adaptive_frames_dropped_in_row,
status_update_handlers=status_update_handlers,
desired_fps=desired_fps,
)
return cls(
stream_reference=video_reference,
Expand Down Expand Up @@ -739,6 +742,9 @@ class VideoConsumer:
"""
This class should be consumed as part of internal implementation.
It provides abstraction around stream consumption strategies.
It must always be given the same video source for consecutive invocations,
otherwise the internal state does not make sense.
"""

@classmethod
Expand All @@ -750,6 +756,7 @@ def init(
minimum_adaptive_mode_samples: int,
maximum_adaptive_frames_dropped_in_row: int,
status_update_handlers: List[Callable[[StatusUpdate], None]],
desired_fps: Optional[Union[float, int]] = None,
) -> "VideoConsumer":
minimum_adaptive_mode_samples = max(minimum_adaptive_mode_samples, 2)
reader_pace_monitor = sv.FPSMonitor(
Expand All @@ -771,6 +778,7 @@ def init(
reader_pace_monitor=reader_pace_monitor,
stream_consumption_pace_monitor=stream_consumption_pace_monitor,
decoding_pace_monitor=decoding_pace_monitor,
desired_fps=desired_fps,
)

def __init__(
Expand All @@ -784,6 +792,7 @@ def __init__(
reader_pace_monitor: sv.FPSMonitor,
stream_consumption_pace_monitor: sv.FPSMonitor,
decoding_pace_monitor: sv.FPSMonitor,
desired_fps: Optional[Union[float, int]],
):
self._buffer_filling_strategy = buffer_filling_strategy
self._frame_counter = 0
Expand All @@ -797,7 +806,11 @@ def __init__(
self._reader_pace_monitor = reader_pace_monitor
self._stream_consumption_pace_monitor = stream_consumption_pace_monitor
self._decoding_pace_monitor = decoding_pace_monitor
self._desired_fps = desired_fps
self._declared_source_fps = None
self._is_source_video_file = None
self._status_update_handlers = status_update_handlers
self._next_frame_from_video_to_accept = 1

@property
def buffer_filling_strategy(self) -> Optional[BufferFillingStrategy]:
Expand All @@ -812,6 +825,7 @@ def reset(self, source_properties: SourceProperties) -> None:
self.reset_stream_consumption_pace()
self._decoding_pace_monitor.reset()
self._adaptive_frames_dropped_in_row = 0
self._next_frame_from_video_to_accept = self._frame_counter + 1

def reset_stream_consumption_pace(self) -> None:
self._stream_consumption_pace_monitor.reset()
Expand All @@ -828,6 +842,10 @@ def consume_frame(
frames_buffering_allowed: bool,
source_id: Optional[int] = None,
) -> bool:
if self._is_source_video_file is None:
source_properties = video.discover_source_properties()
self._is_source_video_file = source_properties.is_file
self._declared_source_fps = source_properties.fps
frame_timestamp = datetime.now()
success = video.grab()
self._stream_consumption_pace_monitor.tick()
Expand All @@ -844,6 +862,8 @@ def consume_frame(
},
status_update_handlers=self._status_update_handlers,
)
if self._video_fps_should_be_sub_sampled():
return True
return self._consume_stream_frame(
video=video,
declared_source_fps=declared_source_fps,
Expand All @@ -862,6 +882,32 @@ def _set_stream_mode_buffering_strategies(self) -> None:
if self._buffer_filling_strategy is None:
self._buffer_filling_strategy = BufferFillingStrategy.ADAPTIVE_DROP_OLDEST

def _video_fps_should_be_sub_sampled(self) -> bool:
if self._desired_fps is None:
return False
if self._is_source_video_file:
actual_fps = self._declared_source_fps
else:
fraction_of_pace_monitor_samples = (
len(self._stream_consumption_pace_monitor.all_timestamps)
/ self._stream_consumption_pace_monitor.all_timestamps.maxlen
)
if fraction_of_pace_monitor_samples < 0.9:
actual_fps = self._declared_source_fps
elif hasattr(self._stream_consumption_pace_monitor, "fps"):
actual_fps = self._stream_consumption_pace_monitor.fps
else:
actual_fps = self._stream_consumption_pace_monitor()
if self._frame_counter == self._next_frame_from_video_to_accept:
stride = calculate_video_file_stride(
actual_fps=actual_fps,
desired_fps=self._desired_fps,
)
self._next_frame_from_video_to_accept += stride
return False
# skipping frame
return True

def _consume_stream_frame(
self,
video: VideoFrameProducer,
Expand Down Expand Up @@ -1133,3 +1179,18 @@ def get_fps_if_tick_happens_now(fps_monitor: sv.FPSMonitor) -> float:
now = time.monotonic()
reader_taken_time = now - min_reader_timestamp
return (len(fps_monitor.all_timestamps) + 1) / reader_taken_time


def calculate_video_file_stride(
actual_fps: Optional[Union[float, int]], desired_fps: Optional[Union[float, int]]
) -> int:
if actual_fps is None or desired_fps is None:
return 1
if actual_fps < 0 or desired_fps < 0:
return 1
true_stride = actual_fps / desired_fps
integer_stride = max(int(true_stride), 1)
probability_of_missing_frame = max(true_stride - integer_stride, 0)
if random.random() < probability_of_missing_frame:
integer_stride += 1
return integer_stride
57 changes: 42 additions & 15 deletions inference/core/interfaces/stream/inference_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
ACTIVE_LEARNING_ENABLED,
API_KEY,
DISABLE_PREPROC_AUTO_ORIENT,
ENABLE_FRAME_DROP_ON_VIDEO_FILE_RATE_LIMITING,
ENABLE_WORKFLOWS_PROFILING,
MAX_ACTIVE_MODELS,
PREDICTIONS_QUEUE_SIZE,
Expand Down Expand Up @@ -154,9 +155,14 @@ def init(
api_key (Optional[str]): Roboflow API key - if not passed - will be looked in env under "ROBOFLOW_API_KEY"
and "API_KEY" variables. API key, passed in some form is required.
max_fps (Optional[Union[float, int]]): Specific value passed as this parameter will be used to
dictate max FPS of processing. It can be useful if we wanted to run concurrent inference pipelines
on single machine making tradeoff between number of frames and number of streams handled. Disabled
by default.
dictate max FPS of each video source.
The implementation details of this option has been changed in release `v0.26.0`. Prior to the release
this value, when applied to video files caused the processing to wait `1 / max_fps` seconds before next
frame is processed - the new implementation drops the intermediate frames, which seems to be more
aligned with peoples expectations.
New behaviour is now enabled in experimental mode, by setting environmental variable flag
`ENABLE_FRAME_DROP_ON_VIDEO_FILE_RATE_LIMITING=True`. Please note that the new behaviour will
be the default one end of Q4 2024!
watchdog (Optional[PipelineWatchDog]): Implementation of class that allows profiling of
inference pipeline - if not given null implementation (doing nothing) will be used.
status_update_handlers (Optional[List[Callable[[StatusUpdate], None]]]): List of handlers to intercept
Expand Down Expand Up @@ -330,9 +336,14 @@ def init_with_yolo_world(
once prediction is ready - passing both decoded frame, their metadata and dict with standard
Roboflow Object Detection prediction.
max_fps (Optional[Union[float, int]]): Specific value passed as this parameter will be used to
dictate max FPS of processing. It can be useful if we wanted to run concurrent inference pipelines
on single machine making tradeoff between number of frames and number of streams handled. Disabled
by default.
dictate max FPS of each video source.
The implementation details of this option has been changed in release `v0.26.0`. Prior to the release
this value, when applied to video files caused the processing to wait `1 / max_fps` seconds before next
frame is processed - the new implementation drops the intermediate frames, which seems to be more
aligned with peoples expectations.
New behaviour is now enabled in experimental mode, by setting environmental variable flag
`ENABLE_FRAME_DROP_ON_VIDEO_FILE_RATE_LIMITING=True`. Please note that the new behaviour will
be the default one end of Q4 2024!
watchdog (Optional[PipelineWatchDog]): Implementation of class that allows profiling of
inference pipeline - if not given null implementation (doing nothing) will be used.
status_update_handlers (Optional[List[Callable[[StatusUpdate], None]]]): List of handlers to intercept
Expand Down Expand Up @@ -379,7 +390,6 @@ def init_with_yolo_world(
old sinks - but then `SinkMode.SEQUENTIAL` is to be used, causing sink to be called on each
prediction element.
Other ENV variables involved in low-level configuration:
* INFERENCE_PIPELINE_PREDICTIONS_QUEUE_SIZE - size of buffer for predictions that are ready for dispatching
* INFERENCE_PIPELINE_RESTART_ATTEMPT_DELAY - delay for restarts on stream connection drop
Expand Down Expand Up @@ -483,9 +493,14 @@ def init_with_workflow(
on_prediction (Callable[AnyPrediction, VideoFrame], None]): Function to be called
once prediction is ready - passing both decoded frame, their metadata and dict with workflow output.
max_fps (Optional[Union[float, int]]): Specific value passed as this parameter will be used to
dictate max FPS of processing. It can be useful if we wanted to run concurrent inference pipelines
on single machine making tradeoff between number of frames and number of streams handled. Disabled
by default.
dictate max FPS of each video source.
The implementation details of this option has been changed in release `v0.26.0`. Prior to the release
this value, when applied to video files caused the processing to wait `1 / max_fps` seconds before next
frame is processed - the new implementation drops the intermediate frames, which seems to be more
aligned with peoples expectations.
New behaviour is now enabled in experimental mode, by setting environmental variable flag
`ENABLE_FRAME_DROP_ON_VIDEO_FILE_RATE_LIMITING=True`. Please note that the new behaviour will
be the default one end of Q4 2024!
watchdog (Optional[PipelineWatchDog]): Implementation of class that allows profiling of
inference pipeline - if not given null implementation (doing nothing) will be used.
status_update_handlers (Optional[List[Callable[[StatusUpdate], None]]]): List of handlers to intercept
Expand Down Expand Up @@ -525,6 +540,7 @@ def init_with_workflow(
use_workflow_definition_cache (bool): Controls usage of cache for workflow definitions. Set this to False
when you frequently modify definition saved in Roboflow app and want to fetch the
newest version for the request. Only applies for Workflows definitions saved on Roboflow platform.
Other ENV variables involved in low-level configuration:
* INFERENCE_PIPELINE_PREDICTIONS_QUEUE_SIZE - size of buffer for predictions that are ready for dispatching
* INFERENCE_PIPELINE_RESTART_ATTEMPT_DELAY - delay for restarts on stream connection drop
Expand Down Expand Up @@ -680,9 +696,14 @@ def init_with_custom_logic(
on_pipeline_end (Optional[Callable[[], None]]): Optional (parameter-free) function to be called
whenever pipeline ends
max_fps (Optional[Union[float, int]]): Specific value passed as this parameter will be used to
dictate max FPS of processing. It can be useful if we wanted to run concurrent inference pipelines
on single machine making tradeoff between number of frames and number of streams handled. Disabled
by default.
dictate max FPS of each video source.
The implementation details of this option has been changed in release `v0.26.0`. Prior to the release
this value, when applied to video files caused the processing to wait `1 / max_fps` seconds before next
frame is processed - the new implementation drops the intermediate frames, which seems to be more
aligned with peoples expectations.
New behaviour is now enabled in experimental mode, by setting environmental variable flag
`ENABLE_FRAME_DROP_ON_VIDEO_FILE_RATE_LIMITING=True`. Please note that the new behaviour will
be the default one end of Q4 2024!
watchdog (Optional[PipelineWatchDog]): Implementation of class that allows profiling of
inference pipeline - if not given null implementation (doing nothing) will be used.
status_update_handlers (Optional[List[Callable[[StatusUpdate], None]]]): List of handlers to intercept
Expand Down Expand Up @@ -719,7 +740,6 @@ def init_with_custom_logic(
old sinks - but then `SinkMode.SEQUENTIAL` is to be used, causing sink to be called on each
prediction element.
Other ENV variables involved in low-level configuration:
* INFERENCE_PIPELINE_PREDICTIONS_QUEUE_SIZE - size of buffer for predictions that are ready for dispatching
* INFERENCE_PIPELINE_RESTART_ATTEMPT_DELAY - delay for restarts on stream connection drop
Expand All @@ -735,12 +755,16 @@ def init_with_custom_logic(
if status_update_handlers is None:
status_update_handlers = []
status_update_handlers.append(watchdog.on_status_update)
desired_source_fps = None
if ENABLE_FRAME_DROP_ON_VIDEO_FILE_RATE_LIMITING:
desired_source_fps = max_fps
video_sources = prepare_video_sources(
video_reference=video_reference,
video_source_properties=video_source_properties,
status_update_handlers=status_update_handlers,
source_buffer_filling_strategy=source_buffer_filling_strategy,
source_buffer_consumption_strategy=source_buffer_consumption_strategy,
desired_source_fps=desired_source_fps,
)
watchdog.register_video_sources(video_sources=video_sources)
predictions_queue = Queue(maxsize=PREDICTIONS_QUEUE_SIZE)
Expand Down Expand Up @@ -963,9 +987,12 @@ def _generate_frames(
) -> Generator[List[VideoFrame], None, None]:
for video_source in self._video_sources:
video_source.start()
max_fps = None
if not ENABLE_FRAME_DROP_ON_VIDEO_FILE_RATE_LIMITING:
max_fps = self._max_fps
yield from multiplex_videos(
videos=self._video_sources,
max_fps=self._max_fps,
max_fps=max_fps,
batch_collection_timeout=self._batch_collection_timeout,
should_stop=lambda: self._stop,
)
Expand Down
4 changes: 4 additions & 0 deletions inference/core/interfaces/stream/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ def prepare_video_sources(
status_update_handlers: Optional[List[Callable[[StatusUpdate], None]]],
source_buffer_filling_strategy: Optional[BufferFillingStrategy],
source_buffer_consumption_strategy: Optional[BufferConsumptionStrategy],
desired_source_fps: Optional[Union[float, int]] = None,
) -> List[VideoSource]:
video_reference = wrap_in_list(element=video_reference)
if len(video_reference) < 1:
Expand All @@ -46,6 +47,7 @@ def prepare_video_sources(
status_update_handlers=status_update_handlers,
source_buffer_filling_strategy=source_buffer_filling_strategy,
source_buffer_consumption_strategy=source_buffer_consumption_strategy,
desired_source_fps=desired_source_fps,
)


Expand Down Expand Up @@ -73,6 +75,7 @@ def initialise_video_sources(
status_update_handlers: Optional[List[Callable[[StatusUpdate], None]]],
source_buffer_filling_strategy: Optional[BufferFillingStrategy],
source_buffer_consumption_strategy: Optional[BufferConsumptionStrategy],
desired_source_fps: Optional[Union[float, int]] = None,
) -> List[VideoSource]:
return [
VideoSource.init(
Expand All @@ -82,6 +85,7 @@ def initialise_video_sources(
buffer_consumption_strategy=source_buffer_consumption_strategy,
video_source_properties=source_properties,
source_id=i,
desired_fps=desired_source_fps,
)
for i, (reference, source_properties) in enumerate(
zip(video_reference, video_source_properties)
Expand Down
Loading

0 comments on commit 4e922b3

Please sign in to comment.