Skip to content

Commit

Permalink
ruff
Browse files Browse the repository at this point in the history
  • Loading branch information
mfarre committed Aug 22, 2024
1 parent b31717f commit 5e56234
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 28 deletions.
22 changes: 7 additions & 15 deletions examples/media_experiment.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,27 +6,20 @@

def run_step_1():
video_triplet_reader = VideoTripletReader(
data_folder="s3://amotoratolins/datatrovetest/",
metadata_origin="youtube"
data_folder="s3://amotoratolins/datatrovetest/", metadata_origin="youtube"
)

video_frozen_filter = VideoFrozenFilter(
)
video_frozen_filter = VideoFrozenFilter()

pipeline_1 = [
video_triplet_reader,
video_frozen_filter
]
pipeline_1 = [video_triplet_reader, video_frozen_filter]

# Create the executor with the pipeline
executor_1: PipelineExecutor = LocalPipelineExecutor(
pipeline=pipeline_1,
workers=1,
tasks=1
)
executor_1: PipelineExecutor = LocalPipelineExecutor(pipeline=pipeline_1, workers=1, tasks=1)

# Execute the pipeline
result = executor_1.run()
# result = executor_1.run()
executor_1.run()


# # Additional debugging
# for document in video_triplet_reader.read_file(None):
Expand All @@ -37,6 +30,5 @@ def run_step_1():
# print("-" * 80)



# Run the testing pipeline
run_step_1()
26 changes: 15 additions & 11 deletions src/datatrove/pipeline/filters/video_frozen_filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ class VideoFrozenFilter(BaseFilter):
name = "🧊 Video-Frozen-filter"
_requires_dependencies = ["ffmpeg"]

def __init__(self, exclusion_writer=None, batch_size: int = 1, freeze_threshold: float = 0.005, freeze_duration: int = 60):
def __init__(
self, exclusion_writer=None, batch_size: int = 1, freeze_threshold: float = 0.005, freeze_duration: int = 60
):
"""
Args:
exclusion_writer: optionally pass in a writer that will save the dropped documents.
Expand All @@ -28,11 +30,14 @@ def __init__(self, exclusion_writer=None, batch_size: int = 1, freeze_threshold:

# Check if ffmpeg is installed
if shutil.which("ffmpeg") is None:
raise EnvironmentError("ffmpeg is not installed. Please install it to use the VideoFrozenFilter. More details: https://www.ffmpeg.org/download.html")
raise EnvironmentError(
"ffmpeg is not installed. Please install it to use the VideoFrozenFilter. More details: https://www.ffmpeg.org/download.html"
)

def filter(self, doc: Document) -> bool | Tuple[bool, str]:
video_path = doc.media[0].local_path if doc.media else None
import os

if not os.path.exists(video_path):
logger.warning(f"Video path does not exist: {video_path}")
if video_path and self.is_video_frozen(video_path):
Expand All @@ -44,6 +49,7 @@ def is_video_frozen(self, video_path: str) -> bool:

if self.ffmpeg is None:
import ffmpeg

self.ffmpeg = ffmpeg

video_duration = self.get_video_duration(video_path)
Expand Down Expand Up @@ -75,29 +81,27 @@ def is_video_frozen(self, video_path: str) -> bool:
return True
return False


def get_video_duration(self, video_path: str) -> float:
"""Get the duration of the video in seconds using ffmpeg."""
try:
probe = self.ffmpeg.probe(video_path)
return float(probe['format']['duration'])
return float(probe["format"]["duration"])
except self.ffmpeg.Error as e:
logger.info(f"ffprobe {video_path}:")
logger.error(e.stderr.decode('utf-8'))
logger.error(e.stderr.decode("utf-8"))
raise e

def check_freeze(self, video_path: str, start_time: str, duration: str) -> bool:
"""Check for frozen frames in a specific interval using ffmpeg's freezedetect filter."""
try:
out, err = (
self.ffmpeg
.input(video_path, ss=start_time, t=duration)
.filter('freezedetect', n=self.freeze_threshold, d=self.freeze_duration)
.output('null', f='null')
self.ffmpeg.input(video_path, ss=start_time, t=duration)
.filter("freezedetect", n=self.freeze_threshold, d=self.freeze_duration)
.output("null", f="null")
.run(capture_stdout=True, capture_stderr=True)
)
err = err.decode('utf-8')
return 'freeze_start' in err and 'freeze_end' not in err
err = err.decode("utf-8")
return "freeze_start" in err and "freeze_end" not in err
except self.ffmpeg.Error as e:
print(f"Error processing video {video_path}: {e}")
return False
3 changes: 1 addition & 2 deletions src/datatrove/pipeline/readers/videotriplet.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def __init__(
id_key: str = "id",
default_metadata: dict = None,
recursive: bool = True,
local_cache_dir = "/tmp/local_video_cache"
local_cache_dir="/tmp/local_video_cache",
):
self.metadata_origin = metadata_origin
self.local_cache_dir = local_cache_dir
Expand All @@ -46,7 +46,6 @@ def __init__(
recursive,
)


def run(self, data: DocumentsPipeline, rank: int = 0, world_size: int = 1) -> DocumentsPipeline:
"""Overrides the base run method to handle triplet statistics correctly."""
triplet_count = 0
Expand Down

0 comments on commit 5e56234

Please sign in to comment.