-
Notifications
You must be signed in to change notification settings - Fork 1
/
_utils.py
167 lines (129 loc) · 5.43 KB
/
_utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
import torch
from torch.utils import data
import numpy as np
import math
import time
import platform, socket, sys, psutil
def printCPUInfo():
print(f"Name: {socket.gethostname()}")
print(f"FQDN: {socket.getfqdn()}")
print(f"System Platform: {sys.platform}")
print(f"Machine: {platform.machine()}")
print(f"Node {platform.node()}")
print(f"Platform: {platform.platform()}")
print(f"Pocessor: {platform.processor()}")
print(f"System OS: {platform.system()}")
print(f"Release: {platform.release()}")
print(f"Version: {platform.version()}")
print(f"Number of CPUs: {str(psutil.cpu_count())}")
print(f"Number of Physical CPUs: {str(psutil.cpu_count(logical=False))}\n")
return platform.processor()
def select_device():
[device, device_name] = get_default_device()
if (len(sys.argv) > 1):
if str(sys.argv[1]) == "-c":
device = torch.device("cpu")
device_name = None
elif str(sys.argv[1]) == "-g":
device = torch.device("cuda")
return [device, device_name]
def get_default_device():
"""Pick GPU if available, else CPU"""
if torch.cuda.is_available():
print("FOUND GPU:")
print(torch.cuda.get_device_name(torch.device('cuda')))
return [torch.device('cuda'), torch.cuda.get_device_name(torch.device('cuda'))]
else:
print("NO GPU AVAILABLE, USING CPU:")
return [torch.device('cpu'), None]
def to_device(data, device, print_flag=False):
"""Move tensor(s) to the chosen device"""
if print_flag:
print(f"Moving a tensor to device ({device})")
if isinstance(data, (list,tuple)):
return [to_device(x, device, print_flag) for x in data]
return data.to(device, non_blocking=True)
class DeviceDataLoader():
"""Wrap a dataloader to move data to a device"""
def __init__(self, dataloader, device):
self.dl = dataloader
self.device = device
def __iter__(self):
"""Yield a batch of data after moving it to device"""
for b in self.dl:
yield to_device(b, self.device, False) # automatically pushes to device
def __len__(self):
"""Number of batches"""
return len(self.dl)
def split_indices(n, val_pct):
val_size = int(val_pct * n) # first get size of the val set
idxs = np.random.permutation(n) # get a random group
# pick the first (val_size) indices and make them val set
return idxs[val_size:], idxs[:val_size]
def accuracy(outputs, labels):
_, predictions = torch.max(outputs, dim=1)
return torch.sum(predictions == labels).item() / len(labels)
def step_decay(epoch, lr_init, drop, epochs_drop):
"""args
- epoch = current epoch
- lr_init = initial learning rate
- drop = drop amount
- epochs_drop = amount of epochs until a drop
"""
lr = lr_init * math.pow(drop, math.floor(1+epoch/epochs_drop))
return lr
def loss_batch(model, loss_function, xb, yb, opt=None, metric=None):
# first calculate loss
predictions = model(xb)
loss = loss_function(predictions, yb)
if opt is not None: # CHECKING FOR OPTIMIZER
# compute the gradients and back prop
loss.backward()
opt.step()
opt.zero_grad()
metric_result = None
if metric is not None:
# compute the metric
metric_result = metric(predictions, yb)
return loss.item(), len(xb), metric_result
def evaluate(model, loss_function, valid_data, metric=None):
with torch.no_grad():
# pass the batches through
results = [loss_batch(model, loss_function, xb, yb, metric=metric) for xb, yb in valid_data]
losses, nums, metrics = zip(*results)
total = np.sum(nums)
total_loss = np.sum(np.multiply(losses, nums))
avg_loss = total_loss / total
avg_metric = None
if metric is not None:
total_metric = np.sum(np.multiply(metrics, nums))
avg_metric = total_metric / total
return avg_loss, total, avg_metric
def fit(epochs, lr, model, loss_function, train_data, validation_data, metric=None, opt_fn=None, lr_mod=0.0):
lr_init = lr
for epoch in range(epochs):
if opt_fn is None: opt_fn=torch.optim.SGD
# training
for xb, yb in train_data:
opt = torch.optim.SGD(model.parameters(), lr=lr, momentum = 0.9)
loss, _, _ = loss_batch(model, loss_function, xb, yb, opt)
# evaluate
result = evaluate(model, loss_function, validation_data, metric)
validation_loss, total, validation_metric = result
if metric is None:
print(f'Epoch [{epoch + 1}/{epochs}], Loss: {validation_loss}')
else:
print(f'Epoch [{epoch + 1}/{epochs}], Loss: {validation_loss}, {metric.__name__}: {validation_metric}')
lr = step_decay(epoch, lr_init, .75, 10) # update the learning rate
def predict_image_with_timing(image, model):
xb = image.unsqueeze(0) # insert dim at head
elapsed_time = 0.0
yb = model(xb)
_, predictions = torch.max(yb, dim=1)
elapsed_time += time.process_time()
return [(predictions[0].item()), elapsed_time]
def predict_image(image, model):
xb = image.unsqueeze(0) # insert dim at head
yb = model(xb)
_, predictions = torch.max(yb, dim=1)
return (predictions[0].item())