-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcreate_video_frames_dataset
151 lines (130 loc) · 5.83 KB
/
create_video_frames_dataset
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
"""
Processes videos to extract behaviors(or anything else) using XML annotations.
Reads videos and XMLs for bounding box and behavior details. Frames are
captured based on these details and saved as cropped images. Frames are
filteredby predefined frequencies and the 'occluded' attribute.
Results are saved in an outputfolder, sorted by behaviors
or the 'occluded' status.
"""
import xml.etree.ElementTree as ET
from pathlib import Path
import click
import cv2
from tqdm import tqdm
from multiprocessing import Pool
# Mapping for class names and frame frequencies
class_mapping = {
'handshake': 'handshake',
'hugging': 'hugging',
'waving': 'waving',
'neutral': 'neutral',
'nodding': 'nodding',
'pointing': 'pointing'
}
frame_frequency_mapping = {
'handshake': 3,
'hugging': 3,
'waving': 3,
'neutral': 7,
'nodding': 5,
'pointing': 3,
'occluded': 7
}
def extract_bboxes_from_xml(root):
"""Extract bounding box info from XML."""
person_tracks_info_dict = {}
for track in root.findall('.//track[@label="person"]'):
track_id = int(track.get('id'))
for box in track.findall('.//box'):
frame = int(box.get('frame'))
coords = ['xtl', 'ytl', 'xbr', 'ybr']
xtl, ytl, xbr, ybr = map(lambda c: int(float(box.get(c))), coords)
behavior = box.find('.//attribute[@name="behavior"]').text
behavior = behavior if behavior else None
occluded = box.get('occluded', '0')
track_info = {'track_id': track_id, 'bbox': [xtl, ytl, xbr, ybr],
'behavior': behavior, 'occluded': occluded}
person_tracks_info_dict.setdefault(frame, []).append(track_info)
return person_tracks_info_dict
def save_crops_from_video(video_path, person_tracks_info_dict,
output_folder, occluded=None):
"""Capture frames and save the crops based on bbox info."""
vidcap = cv2.VideoCapture(str(video_path))
video_name = Path(video_path).stem
count = 0
success, image = vidcap.read()
while success:
success, image = vidcap.read()
behaviors = [p['behavior']
for p in person_tracks_info_dict.get(count, [])]
is_occluded = any(d.get('occluded') == '1'
for d in person_tracks_info_dict.get(count, []))
if is_occluded and occluded:
current_frame_frequency = frame_frequency_mapping['occluded']
else:
current_frame_frequency = min(
[frame_frequency_mapping.get(b, 1) for b in behaviors],
default=1)
if success and count % current_frame_frequency == 0:
for person_data in person_tracks_info_dict.get(count, []):
crop_and_save_image(image, person_data, video_name,
output_folder, count, occluded)
count += 1
vidcap.release()
def crop_and_save_image(image, person_data, video_name, output_folder,
count, occluded=None):
"""Crop image based on bbox info and save it."""
xtl, ytl, xbr, ybr = person_data['bbox']
label = class_mapping.get(person_data['behavior'], 'other')
crop = image[ytl:ybr, xtl:xbr]
occlusion_flag = person_data.get('occluded') == '1'
occlusion_suffix = "_occluded" if occlusion_flag else ""
if occlusion_flag and occluded:
folder_choice = 'occluded'
else:
folder_choice = label
folder_path = output_folder / folder_choice
frame_name_template = (
f"{video_name}_{label}{occlusion_suffix}_frame{count:05d}.jpg")
frame_name = folder_path / frame_name_template
cv2.imwrite(str(frame_name), crop)
def process_video(args):
"""Process video, extract bbox info, and save the crops."""
video_path, xml_folder, output_folder, occluded = args
xml_path = Path(xml_folder) / Path(video_path).with_suffix('.xml').name
if not xml_path.exists():
print(f"WARNING: XML file not found for video: {video_path}. "
"Skipping this video.")
return
root = ET.parse(xml_path).getroot()
person_tracks_info_dict = extract_bboxes_from_xml(root)
save_crops_from_video(video_path, person_tracks_info_dict,
output_folder, occluded)
@click.command()
@click.option('--video-folder', required=True, type=click.Path(exists=True),
help='Path to the aggressive poses video files folder.')
@click.option('--xml-folder', required=True, type=click.Path(exists=True),
help='Path to the xml files folder.')
@click.option('--output-folder', required=True, type=click.Path(exists=False),
help='Path to the output image folder.')
@click.option('--occluded', type=click.Choice(['save_only', 'save_together']),
help="Save method for the 'occluded' attribute.", default=None)
@click.option('--num-workers', default=8, type=int,
help='Number of worker processes for parallel processing.')
def main(video_folder, xml_folder, output_folder, num_workers, occluded=None):
"""Main function to process videos and save crops."""
video_files = sorted([x for x in Path(video_folder).rglob('*.*')
if x.suffix.lower() in ['.avi', '.mp4', '.mkv']])
result_dataset = Path(output_folder)
folders_to_create = {'occluded'} if occluded == 'save_only' else set(
class_mapping.values()).union({'occluded'}
if occluded == 'save_together' else {})
for folder in folders_to_create:
(result_dataset / folder).mkdir(parents=True, exist_ok=True)
process_args = [(v, xml_folder, result_dataset, occluded)
for v in video_files]
with Pool(num_workers) as pool:
list(tqdm(pool.imap(process_video, process_args),
total=len(process_args)))
if __name__ == "__main__":
main()