-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathLiveFT.py
347 lines (304 loc) · 15.3 KB
/
LiveFT.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Overview
========
This program captures images from the camera, applies a Fourier transform,
and displays the transformed image alongside the original on the screen.
To run:
$ python3 liveFT.py
For command-line options, use:
$ python3 liveFT.py --help
press "q" to exit the application
Profile with:
python -m cProfile -o LiveFT.prof LiveFT.py -i -c 2000 -r 1000
snakeviz LiveFT.prof
Author: Brian R. Pauw with some suggestions from AI
Contact: [email protected]
License: Apache-2.0
"""
from typing import Any, Tuple
import time
import numpy as np
import cv2
import argparse
import sys
from attrs import define, field, fields, validators
import math
# Vectorize the math.erf function
erf_vectorized = np.vectorize(math.erf)
# typical video resolutions (from Ingos webcam), extend if needed, must be sorted
# used to find one which just covers the given columns&rows area
# on Linux generated by:
# v4l2-ctl --list-formats-ext | awk '/x/{split($3,a,"x"); print a[1]","a[2]}' \
# | sort -h | uniq | awk '{ORS=", "; print "("$0")"}'
typRes = ((160,90), (160,120), (176,144), (320,180), (320,240), (352,288),
(432,240), (640,360), (640,480), (800,448), (800,600), (864,480),
(960,720), (1024,576), (1280,720), (1600,896), (1920,1080), (2304,1296), (2304,1536))
lineSpacing = 40
def drawTextLine(frame:cv2.UMat, line_idx:int, text:str) -> None:
posx, posy = 50, 50 # origin image coordinates
font = cv2.FONT_HERSHEY_SIMPLEX
font_scale = .7
color = (255, 255, 255) # White color in BGR
thickness = 2
pos = (posx, posy+int(line_idx*lineSpacing*font_scale))
cv2.putText(frame, text, pos, font, font_scale, color, thickness)
@define
class FrameProcessor:
cropVertLo: int = field(default=0)
cropVertUp: int = field(default=None)
cropHorzLo: int = field(default=0)
cropHorzUp: int = field(default=None)
scaleVert: float = field(default=1.)
scaleHorz: float = field(default=1.)
killCenterLines: bool = field(default=False)
taperWidth: float = field(default=0.2,
validator=validators.and_(validators.ge(0.0), validators.le(1.0)))
window: np.ndarray = field(default=None) # error function window for input video frame
def setWindow(self, w:int, h:int):
# create a grid for an error function window
x = np.linspace(-1.0, 1.0, w)
y = np.linspace(-1.0, 1.0, h)
x, y = np.meshgrid(x, y)
# Create a window using the error function
# largest difference to torch result is <1e-7, torch has lower precision probably
window_x = erf_vectorized((x + 1) / self.taperWidth) * erf_vectorized((1 - x) / self.taperWidth)
window_y = erf_vectorized((y + 1) / self.taperWidth) * erf_vectorized((1 - y) / self.taperWidth)
self.window = window_x * window_y
def prepareFrame(self, frame: np.ndarray) -> np.ndarray:
"""Crop, scale, and normalize the captured frame."""
# Crop the frame to the specified center region
frame = frame[self.cropVertLo:self.cropVertUp, self.cropHorzLo:self.cropHorzUp]
# Scale frame dimensions if necessary
frame = cv2.resize(frame, None, fx=self.scaleHorz, fy=self.scaleVert)
# make sure it's grayscale
frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
return frame
def applyWindow(self, frame: np.ndarray) -> np.ndarray:
# Create a window using the error function
# largest difference to torch result is <1e-7, torch has lower precision probably
if self.window is None:
h, w = frame.shape
self.setWindow(w, h)
# Apply the window to the frame
frame *= self.window
frame -= frame.min()
frame /= frame.max()
# expand range
return frame
# static for use in test cases
def computeFFT(self, frame: np.ndarray) -> np.ndarray:
"""Perform FFT on the frame, with optional line removal.
Its declared static for easier (UI free) testing."""
dft = cv2.dft(frame, flags=cv2.DFT_COMPLEX_OUTPUT)
# Calculate magnitude spectrum (from complex)
dft = dft[:,:,0]**2 + dft[:,:,1]**2
# Shift the zero-frequency component to the center
dft_shifted = np.fft.fftshift(dft)
# Use log scale for better visualization
fft_log = np.log1p(dft_shifted)
# Optionally remove central lines to enhance dynamic range in display
if self.killCenterLines:
h, w = fft_log.shape[:2]
fft_log[h // 2 - 1:h // 2 + 1, :] = fft_log[h // 2 + 1:h // 2 + 3, :]
fft_log[:, w // 2 - 1:w // 2 + 1] = fft_log[:, w // 2 + 1:w // 2 + 3]
# Normalize and convert back to NumPy array for display
fft_image = (fft_log / fft_log.max())
return fft_image
def __call__(self, frame) -> Tuple[np.ndarray]:
"""Process a single image with preparations resulting in the fourier transformed image.
No assumptions should be made of the source, can be from camera or from disk."""
frame = self.prepareFrame(frame)
frame = self.applyWindow(frame)
fft = self.computeFFT(frame)
return frame, fft
@define
class LiveFT:
"""Handles live Fourier Transform display of camera feed."""
# Core attributes with default values from command-line arguments
# Note: This order affects parse_args() below, all attrs until device become cmdline args
numShots: int = field(default=int(1e5),
metadata={"help": "Max number of images before program exits", "short": "n"})
# numBins: int = field(default=200, metadata={"help": "Number of integration bins", "short": "N"})
# nContrIms: int = field(default=30, metadata={"help": "Average contrast over N images", "short": "o"})
camDevice: int = field(default=0,
metadata={"help": "Camera device ID", "short": "d"})
imAvgs: int = field(default=1,
metadata={"help": "Average N images for display and FFT", "short": "a"})
vScale: float = field(default=1.2,
metadata={"help": "Vertical video scale", "short": "y"})
hScale: float = field(default=1.2,
metadata={"help": "Horizontal video scale", "short": "x"})
downScale: bool = field(default=False,
metadata={"help": "Enable pyramidal downscaling (not implemented yet)", "short": "p"})
killCenterLines: bool = field(default=False,
metadata={"help": "Remove central lines from FFT image", "short": "k"})
figid: str = field(default="liveFFT by Brian R. Pauw - press 'h' for help, 'q' to exit.",
metadata={"help": "Image window name", "short": "f"})
rows: int = field(default=500, metadata={"help": "Use center N rows of video", "short": "r"})
columns: int = field(default=500, metadata={"help": "Use center N columns of video", "short": "c"})
showInfo: bool = field(default=False, metadata={"help": "Show FPS info text overlay", "short": "i"})
noGPU: bool = field(default=True,
metadata={"help": "Switch between CPU or GPU for Fourier Transform", "short": "g"})
frameTimeCount: int = field(default=10,
metadata={"help": "Number of frames to average frame time by", "short": "t"})
# Derived attributes initialized post-instantiation
vc: cv2.VideoCapture = field(init=False, validator=validators.instance_of(cv2.VideoCapture))
optionsInteractive: Tuple[str] = field(
default=("showHelp", "showInfo", "downScale", "killCenterLines"))
frameTime: np.ndarray = field(init=False) # array for moving average of frame calc. time
frameProc: FrameProcessor = field(factory=FrameProcessor)
# not an attribute available as cmdline argument
showHelp: bool = field(default=False, metadata={"help": "Show interactive help text", "short": "h"})
def __attrs_post_init__(self) -> None:
"""Initialize video capture and plotting after attribute setup."""
# Open camera device
self.vc = cv2.VideoCapture(self.camDevice)
if not self.vc.isOpened():
raise ValueError("Could not open video device.")
# Set desired resolution based on desired colums & rows
res = [(w,h) for w,h in typRes if w >= self.columns and h >= self.rows]
res = res[0] if res else typRes[-1] # pick the largest if none was found
self.vc.set(cv2.CAP_PROP_FRAME_WIDTH, res[0])
self.vc.set(cv2.CAP_PROP_FRAME_HEIGHT, res[1])
# Set the codec to MJPEG which much faster often
fourcc = cv2.VideoWriter_fourcc(*'MJPG')
self.vc.set(cv2.CAP_PROP_FOURCC, fourcc)
# change the desired fps of the video source
desired_fps = 240 # typically lower, limited by camera driver support
self.vc.set(cv2.CAP_PROP_FPS, desired_fps)
# Initialize display window
cv2.namedWindow(self.figid, cv2.WINDOW_GUI_NORMAL | cv2.WINDOW_NORMAL)
cv2.resizeWindow(self.figid, 1024, 768)
self.frameTime = np.zeros(self.frameTimeCount)
# Start main loop for capturing and processing frames
self.run()
def drawInfoText(self, frame, infoData) -> None:
drawTextLine(frame, 0, ", ".join([f"{k}: {v}" for k,v in infoData.items()]))
# show the current camera resolution
actual_width = int(self.vc.get(cv2.CAP_PROP_FRAME_WIDTH))
actual_height = int(self.vc.get(cv2.CAP_PROP_FRAME_HEIGHT))
# show the video stream format as well
video_text = f"{actual_width}x{actual_height}"
fourcc = int(self.vc.get(cv2.CAP_PROP_FOURCC))
if fourcc > 31:
video_text += "@" + fourcc.to_bytes(4, byteorder=sys.byteorder).decode()
drawTextLine(frame, 1, f"(Input: {video_text})")
def drawHelpText(self, frame) -> None:
"""Draws a static help text into the frame."""
lineOffset = 3
drawTextLine(frame, lineOffset, "Help | press key:")
for index, attr in enumerate([a for a in fields(type(self))
if a.name in self.optionsInteractive]):
drawTextLine(frame, lineOffset+index+1, attr.metadata["short"]+"-> "+attr.metadata["help"])
def toggleShortOption(self, key) -> None:
for a in fields(type(self)):
if a.name not in self.optionsInteractive:
continue
if key & 0xFF == ord(a.metadata["short"]):
setattr(self, a.name, not getattr(self, a.name))
def run(self) -> None:
"""Main loop to capture and process frames from the camera."""
num_frames = 0
frames_counted = 0
start_time = time.time() # for calculating FPS including capturing
infoData = {"#Frame": 0, "fps": "", "hw": "cpu"}
while num_frames < self.numShots:
num_frames += 1
# Capture key press to close window (e.g., 'q' key)
key = cv2.waitKey(1)
if key & 0xFF == ord('q'):
print("Exiting on user request.")
break
self.toggleShortOption(key)
# Check if the window is still open, break if closed
if not cv2.getWindowProperty(self.figid, cv2.WND_PROP_VISIBLE):
print("Window closed by user.")
break
frame_final = self.composeFrame(num_frames, infoData)
# gather some info
elapsed = time.time() - start_time
fps = (num_frames - frames_counted) / elapsed
if elapsed > 2: # duration of FPS measurement window
start_time = time.time()
frames_counted = num_frames
# Show info text on request
if self.showInfo:
infoData.update({"#Frame": num_frames, "fps": f"{fps:.2f}"})
self.drawInfoText(frame_final, infoData)
if self.showHelp:
self.drawHelpText(frame_final)
if frame_final.size: # show the frame if there is any
(wx, wy, ww, wh) = cv2.getWindowImageRect(self.figid)
(fh, fw) = frame_final.shape
if num_frames == 1 and (ww != fw or wh != fh):
# resize appropriately only once initially
cv2.resizeWindow(self.figid, fw, fh)
cv2.imshow(self.figid, frame_final)
self.vc.release()
cv2.destroyAllWindows()
def captureFrame(self) -> np.ndarray:
"""Capture, process, and display a single frame."""
frame = None
nframes = 0
while nframes < self.imAvgs:
success, iframe = self.vc.read()
if not success:
raise ValueError("Failed to capture frame.")
#return np.array([])
if frame is None:
frame = iframe.astype(np.float32)
else:
frame += iframe.astype(np.float32)
nframes += 1
if self.imAvgs > 1: # average images possibly
frame /= self.imAvgs
return frame
def composeFrame(self, frameIdx:int, infoData: dict) -> np.ndarray:
frame = self.captureFrame()
frame_time = time.time() # calculation time of a single frame, without capturing
# Ensure crop dimensions are within frame limits
height, width = frame.shape[:2]
if self.rows > height:
self.rows = height
if self.columns > width:
self.columns = width
# Configure cropping boundaries for the center region of the frame.
self.frameProc.cropVertLo = height // 2 - self.rows // 2
self.frameProc.cropVertUp = height // 2 + self.rows // 2
self.frameProc.cropHorzLo = width // 2 - self.columns // 2
self.frameProc.cropHorzUp = width // 2 + self.columns // 2
# forward the desired scaling
self.frameProc.scaleHorz = self.hScale
self.frameProc.scaleVert = self.vScale
# forward options for the fourier transformed result
self.frameProc.killCenterLines = self.killCenterLines
frame, fft = self.frameProc(frame)
# normalize and convert to numpy array
framesCombined = np.concatenate((frame, fft), axis=1)
# record how long this frame took to process
self.frameTime[frameIdx%self.frameTime.size] = (time.time() - frame_time)
# show the frame time average for info overlay
infoData["frame time"] = f"{self.frameTime.mean()*1e3:.1f} ms"
return framesCombined
# Function to parse arguments for the script
def parse_args(liveftCls: type[LiveFT]) -> argparse.Namespace:
"""Parses command-line arguments.
Uses the LiveFT class for some options configuration."""
parser = argparse.ArgumentParser(description="Live Fourier Transform of camera feed.")
for attr in liveftCls.__attrs_attrs__:
if attr.name == "vc":
break
# print(f"{attr=}") # class config for debugging
pkwargs = dict(help=attr.metadata["help"])
if attr.type is bool:
pkwargs["action"] = "store_true" if not attr.default else "store_false"
else:
pkwargs.update(type=attr.type, default=attr.default)
# print(f"{pkwargs}") # show parser config for debugging
parser.add_argument("-"+attr.metadata["short"], "--"+attr.name, **pkwargs)
return parser.parse_args()
if __name__ == "__main__":
args = parse_args(LiveFT)
live_ft = LiveFT(**vars(args))