-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathdataloader.py
executable file
·326 lines (280 loc) · 11.6 KB
/
dataloader.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
#!/usr/bin/env python3
from copy import deepcopy
from pathlib import Path
from typing import Any, Iterator
import numpy as np
import pandas as pd
import torchvision
from python_tools import caching, generic
from python_tools.ml.data_loader import DataLoader
from python_tools.ml.pytorch_tools import dict_to_batched_data
from python_tools.ml.split import stratified_splits
prefix = Path("/projects/")
if not prefix.is_dir():
prefix = Path("/pool01/")
DISFA_FOLDER = prefix / "dataset_original/DISFA/"
BP4D_PLUS_FOLDER = prefix / "dataset_original/BP4D_plus/AUCoding/AU_INT/"
assert DISFA_FOLDER.is_dir()
class DISFA:
def __init__(
self,
au: str,
ifold: int = 0,
name: str = "training",
) -> None:
if not isinstance(au, str):
au = str(au)
self.n_folds: int = 5
self.au: str = au
self.ifold: int = ifold
self.name: str = name
def get_loader(self) -> DataLoader:
subject = self.get_list_of_subjects()[0]
property_dict = {
"x_names": [
x
for x in self.get_features(subject).columns
if x.startswith("p_") or x.startswith("hog")
],
"y_names": ["intensity"],
}
for key in property_dict:
property_dict[key] = np.asarray(property_dict[key])
# DataLoader will use a copy of it
self.property_dict = property_dict
return DataLoader(list(self), properties=deepcopy(property_dict))
def __iter__(self) -> Iterator[dict[str, list[np.ndarray]]]:
"""
Returns a list of training, evaluation, test folds
Yields multiple items for evaluation and testing
"""
x_names = self.property_dict["x_names"]
# load labels for all subjects
subjects = self.get_subjects_for_fold()
for subject in subjects:
# load openface and combine with labels
data = self.get_au(subject)[0].join(self.get_features(subject), how="inner")
# create folds
data = {
"x": data.loc[:, x_names].values.astype(np.float32),
"y": data["intensity"].values[:, None].astype(np.float32),
"meta_id": data["subject"].values[:, None],
"meta_frame": data.index.values[:, None],
}
data = dict_to_batched_data(data, batch_size=-1) # TODO
while data:
yield data.pop()
def get_au(self, subject: str) -> tuple[pd.DataFrame, list[Path]]:
"""
Returns a Pandas DataFrame with the specified AU.
"""
data = pd.read_csv(
DISFA_FOLDER / f"ActionUnit_Labels/{subject}/{subject}_au{self.au}.txt",
header=None,
names=["frame", "intensity"],
index_col="frame",
dtype=int,
)
return (data, [Path("DISFA") / f"LeftVideo{subject}_comp.hdf"])
def get_features(self, subject: str) -> pd.DataFrame:
"""
Loads (and extracts) the openface features.
"""
au_data, files = self.get_au(subject)
openface = caching.read_hdfs(files[0])["df"]
openface = openface.set_index("frame")
columns = [
x for x in openface.columns if x.startswith("hog") or x.startswith("p_")
]
openface = openface.loc[:, columns]
openface = openface - openface.median()
openface = openface.loc[
np.intersect1d(openface.index, au_data.index).tolist(), :
]
openface["subject"] = int(subject[2:])
return openface
def get_list_of_subjects(self) -> list[str]:
"""
Returns the list of subject ids.
"""
files = DISFA_FOLDER.glob("Videos_LeftCamera/*.avi")
return sorted(file.name[9:14] for file in files)
def get_subjects_for_fold(self) -> list[str]:
# load labels for all subjects
subjects = self.get_list_of_subjects()
data = [self.get_au(subject)[0] for subject in subjects]
# generate split
assert self.n_folds >= 4
groups = np.array(
[au_intensities["intensity"].mean() for au_intensities in data]
)
fold_index = stratified_splits(groups, np.ones(self.n_folds))
# determine relevant subjects
test = fold_index == self.ifold
evaluation = fold_index == ((self.ifold + 1) % self.n_folds)
training = ~test & ~evaluation
fold = {"training": training, "validation": evaluation, "test": test}
subject_index = fold[self.name]
assert subject_index.sum() > 0, f"{self.name} {self.ifold} {self.au}"
return [subject for subject, isin in zip(subjects, subject_index) if isin]
class MNIST(DISFA):
def __init__(self, *args: Any, imbalance: bool = False, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
mnist_training = torchvision.datasets.MNIST(".", download=True, train=True)
mnist_test = torchvision.datasets.MNIST(".", train=False)
self.mnist = {
"training": [
mnist_training.data.numpy(),
mnist_training.targets.numpy(),
],
"test": [
mnist_test.data.numpy(),
mnist_test.targets.numpy(),
],
}
for key in self.mnist:
self.mnist[key][0] = self.mnist[key][0].reshape(
self.mnist[key][0].shape[0], -1
)
self.mnist[key][1] = self.mnist[key][1][:, None]
# remove numbers above 5
index = self.mnist[key][1][:, 0] <= 5
self.mnist[key][0] = self.mnist[key][0][index, :].astype(float)
self.mnist[key][1] = self.mnist[key][1][index, :].astype(float)
# imbalance
# 0: 55% --> 100%
# 1: 8% --> 15%
# 2: 14% --> 26%
# 3: 18% --> 33%
# 4: 5% --> 9%
# 5: <1% --> 1%
if imbalance:
reference = (self.mnist[key][1] == 0).sum()
for level, percentage in zip(
[1, 2, 3, 4, 5], [0.15, 0.26, 0.33, 0.09, 0.01]
):
index = np.where(self.mnist[key][1] == level)[0]
rng = np.random.RandomState(0)
keep = rng.choice(
index, size=int(reference * percentage), replace=False
)
remove = np.setdiff1d(index, keep)
self.mnist[key][0][remove] = float("NaN")
self.mnist[key][1][remove] = float("NaN")
index = ~np.isnan(self.mnist[key][0]).any(axis=1)
self.mnist[key][0] = self.mnist[key][0][index]
self.mnist[key][1] = self.mnist[key][1][index]
# training/test subjects
self.n_training = int(np.floor(self.mnist["training"][1].size / 1006))
self.n_test = int(np.floor(self.mnist["test"][1].size / 1006))
def get_list_of_subjects(self) -> list[str]:
# simulate that each person has 1006 data points
return np.arange(self.n_training + self.n_test, dtype=int).astype(str).tolist()
def get_features(self, subject: str) -> pd.DataFrame:
# from training or testing
data = self.mnist["training"][0]
subject = int(subject)
if subject >= self.n_training:
data = self.mnist["test"][0]
subject -= self.n_training
# get block
data = data[subject * 1006 : (subject + 1) * 1006, :]
# wrap in pandas
data = pd.DataFrame(data)
data.columns = "hog_" + data.columns.astype(str)
data["subject"] = subject
data["frame"] = np.arange(subject * 1006, subject * 1006 + data.shape[0])
data = data.set_index("frame")
return data
def get_subjects_for_fold(self) -> list[str]:
if self.name == "test":
return np.arange(self.n_training, self.n_training + self.n_test).astype(str)
# determine split of training/development set
subjects = np.arange(self.n_training, dtype=int).astype(str)
groups = []
for subject in subjects:
groups.append(self.get_au(subject)[0]["intensity"].std())
groups = np.asarray(groups)
index = stratified_splits(groups, np.array([2, 1]))
if self.name == "training":
subjects = subjects[index == 0]
else:
subjects = subjects[index == 1]
return subjects.tolist()
def get_au(self, subject: str) -> tuple[pd.DataFrame, list[Path]]:
# from training or testing
data = self.mnist["training"][1]
subject = int(subject)
if subject >= self.n_training:
data = self.mnist["test"][1]
subject -= self.n_training
# get block
data = data[subject * 1006 : (subject + 1) * 1006, :].astype(int)
# wrap in pandas
data = pd.DataFrame({"intensity": data.flatten()})
data["frame"] = np.arange(subject * 1006, subject * 1006 + data.shape[0])
data = data.set_index("frame")
return data, []
class BP4D_PLUS(DISFA):
def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
if self.au == "6":
self.au = "06"
self.n_folds = 5
self.dataset = Path("BP4D_plus")
self.folder = BP4D_PLUS_FOLDER
def get_au(self, subject: str) -> tuple[pd.DataFrame, list[Path]]:
videos, files = self.get_labeled_videos(subject)
data = []
for ifile, file in enumerate(files):
data.append(
pd.read_csv(file, header=None, names=["frame", "intensity"], dtype=int)
)
data[-1] = data[-1].set_index("frame")
data[-1] = data[-1].loc[data[-1]["intensity"] != 9, :]
data[-1].index += ifile * 50000
data = pd.concat(data)
return data, videos
def get_labeled_videos(self, subject: str) -> tuple[list[Path], list[Path]]:
labels = self.folder.glob(f"AU{self.au}/{subject}*.csv")
videos = list(self.dataset.glob(f"{subject}*.hdf"))
# delete labels
new_labels = []
for label in labels:
name = "_".join(label.name.split("_")[:2])
if [1 for x in videos if name in x.name]:
new_labels.append(label)
# delete videos
new_videos = []
for video in videos:
name = generic.basename(video)
if [1 for x in new_labels if name in x.name]:
new_videos.append(video)
return sorted(new_videos), sorted(new_labels)
def get_features(self, subject: str) -> pd.DataFrame:
au_data, files = self.get_au(subject)
data = []
for ifile, file in enumerate(files):
file = file.name.split(".")[0]
openface = caching.read_hdfs(self.dataset / f"{file}.hdf")["df"]
openface = openface.set_index("frame")
columns = [
x for x in openface.columns if x.startswith("hog") or x.startswith("p_")
]
openface = openface.loc[:, columns]
openface = openface.astype(np.float32)
openface.index += ifile * 50000
openface = openface.loc[
np.intersect1d(openface.index, au_data.index).tolist(), :
]
data.append(openface)
data = pd.concat(data)
data -= data.median()
subject_id = int(subject[1:])
if subject[0] == "M":
subject_id *= -1
data["subject"] = subject_id
return data
def get_list_of_subjects(self) -> list[str]:
files = self.dataset.glob("*.hdf")
return sorted({file.name.split("_", 1)[0] for file in files})