From 5e562340a3f2a3ecc7c8cce509b732897bbafe35 Mon Sep 17 00:00:00 2001 From: Miquel Farre Date: Thu, 22 Aug 2024 22:53:31 +0200 Subject: [PATCH] ruff --- examples/media_experiment.py | 22 +++++----------- .../pipeline/filters/video_frozen_filter.py | 26 +++++++++++-------- .../pipeline/readers/videotriplet.py | 3 +-- 3 files changed, 23 insertions(+), 28 deletions(-) diff --git a/examples/media_experiment.py b/examples/media_experiment.py index cf45d049..ab67b667 100644 --- a/examples/media_experiment.py +++ b/examples/media_experiment.py @@ -6,27 +6,20 @@ def run_step_1(): video_triplet_reader = VideoTripletReader( - data_folder="s3://amotoratolins/datatrovetest/", - metadata_origin="youtube" + data_folder="s3://amotoratolins/datatrovetest/", metadata_origin="youtube" ) - video_frozen_filter = VideoFrozenFilter( - ) + video_frozen_filter = VideoFrozenFilter() - pipeline_1 = [ - video_triplet_reader, - video_frozen_filter - ] + pipeline_1 = [video_triplet_reader, video_frozen_filter] # Create the executor with the pipeline - executor_1: PipelineExecutor = LocalPipelineExecutor( - pipeline=pipeline_1, - workers=1, - tasks=1 - ) + executor_1: PipelineExecutor = LocalPipelineExecutor(pipeline=pipeline_1, workers=1, tasks=1) # Execute the pipeline - result = executor_1.run() + # result = executor_1.run() + executor_1.run() + # # Additional debugging # for document in video_triplet_reader.read_file(None): @@ -37,6 +30,5 @@ def run_step_1(): # print("-" * 80) - # Run the testing pipeline run_step_1() diff --git a/src/datatrove/pipeline/filters/video_frozen_filter.py b/src/datatrove/pipeline/filters/video_frozen_filter.py index b0abc15d..2fc569b8 100644 --- a/src/datatrove/pipeline/filters/video_frozen_filter.py +++ b/src/datatrove/pipeline/filters/video_frozen_filter.py @@ -13,7 +13,9 @@ class VideoFrozenFilter(BaseFilter): name = "🧊 Video-Frozen-filter" _requires_dependencies = ["ffmpeg"] - def __init__(self, exclusion_writer=None, batch_size: int = 1, freeze_threshold: float = 0.005, freeze_duration: int = 60): + def __init__( + self, exclusion_writer=None, batch_size: int = 1, freeze_threshold: float = 0.005, freeze_duration: int = 60 + ): """ Args: exclusion_writer: optionally pass in a writer that will save the dropped documents. @@ -28,11 +30,14 @@ def __init__(self, exclusion_writer=None, batch_size: int = 1, freeze_threshold: # Check if ffmpeg is installed if shutil.which("ffmpeg") is None: - raise EnvironmentError("ffmpeg is not installed. Please install it to use the VideoFrozenFilter. More details: https://www.ffmpeg.org/download.html") + raise EnvironmentError( + "ffmpeg is not installed. Please install it to use the VideoFrozenFilter. More details: https://www.ffmpeg.org/download.html" + ) def filter(self, doc: Document) -> bool | Tuple[bool, str]: video_path = doc.media[0].local_path if doc.media else None import os + if not os.path.exists(video_path): logger.warning(f"Video path does not exist: {video_path}") if video_path and self.is_video_frozen(video_path): @@ -44,6 +49,7 @@ def is_video_frozen(self, video_path: str) -> bool: if self.ffmpeg is None: import ffmpeg + self.ffmpeg = ffmpeg video_duration = self.get_video_duration(video_path) @@ -75,29 +81,27 @@ def is_video_frozen(self, video_path: str) -> bool: return True return False - def get_video_duration(self, video_path: str) -> float: """Get the duration of the video in seconds using ffmpeg.""" try: probe = self.ffmpeg.probe(video_path) - return float(probe['format']['duration']) + return float(probe["format"]["duration"]) except self.ffmpeg.Error as e: logger.info(f"ffprobe {video_path}:") - logger.error(e.stderr.decode('utf-8')) + logger.error(e.stderr.decode("utf-8")) raise e def check_freeze(self, video_path: str, start_time: str, duration: str) -> bool: """Check for frozen frames in a specific interval using ffmpeg's freezedetect filter.""" try: out, err = ( - self.ffmpeg - .input(video_path, ss=start_time, t=duration) - .filter('freezedetect', n=self.freeze_threshold, d=self.freeze_duration) - .output('null', f='null') + self.ffmpeg.input(video_path, ss=start_time, t=duration) + .filter("freezedetect", n=self.freeze_threshold, d=self.freeze_duration) + .output("null", f="null") .run(capture_stdout=True, capture_stderr=True) ) - err = err.decode('utf-8') - return 'freeze_start' in err and 'freeze_end' not in err + err = err.decode("utf-8") + return "freeze_start" in err and "freeze_end" not in err except self.ffmpeg.Error as e: print(f"Error processing video {video_path}: {e}") return False diff --git a/src/datatrove/pipeline/readers/videotriplet.py b/src/datatrove/pipeline/readers/videotriplet.py index 98fe1be5..3e2b522f 100644 --- a/src/datatrove/pipeline/readers/videotriplet.py +++ b/src/datatrove/pipeline/readers/videotriplet.py @@ -27,7 +27,7 @@ def __init__( id_key: str = "id", default_metadata: dict = None, recursive: bool = True, - local_cache_dir = "/tmp/local_video_cache" + local_cache_dir="/tmp/local_video_cache", ): self.metadata_origin = metadata_origin self.local_cache_dir = local_cache_dir @@ -46,7 +46,6 @@ def __init__( recursive, ) - def run(self, data: DocumentsPipeline, rank: int = 0, world_size: int = 1) -> DocumentsPipeline: """Overrides the base run method to handle triplet statistics correctly.""" triplet_count = 0