From b6753569a1a68b499a2a86dae5eb60deed9eb586 Mon Sep 17 00:00:00 2001
From: Panagiotis Tomer Karagiannis
Date: Mon, 19 Dec 2022 22:39:25 +0100
Subject: [PATCH] wip
---
.gitignore | 2 +
drawing_to_fsd_layout/common.py | 5 +
drawing_to_fsd_layout/cone_placement.py | 301 +++++++
drawing_to_fsd_layout/export.py | 250 ++++++
drawing_to_fsd_layout/image_processing.py | 244 ++++++
drawing_to_fsd_layout/math_utils.py | 638 +++++++++++++++
drawing_to_fsd_layout/spline_fit.py | 135 ++++
drawing_to_track.ipynb | 924 ++++++++++++++++++++++
requirements.txt | 6 +
streamlit_app.py | 328 ++++++++
10 files changed, 2833 insertions(+)
create mode 100644 drawing_to_fsd_layout/common.py
create mode 100644 drawing_to_fsd_layout/cone_placement.py
create mode 100644 drawing_to_fsd_layout/export.py
create mode 100644 drawing_to_fsd_layout/image_processing.py
create mode 100644 drawing_to_fsd_layout/math_utils.py
create mode 100644 drawing_to_fsd_layout/spline_fit.py
create mode 100755 drawing_to_track.ipynb
create mode 100644 requirements.txt
create mode 100644 streamlit_app.py
diff --git a/.gitignore b/.gitignore
index b6e4761..f347387 100644
--- a/.gitignore
+++ b/.gitignore
@@ -127,3 +127,5 @@ dmypy.json
# Pyre type checker
.pyre/
+
+.vscode/
\ No newline at end of file
diff --git a/drawing_to_fsd_layout/common.py b/drawing_to_fsd_layout/common.py
new file mode 100644
index 0000000..73b4614
--- /dev/null
+++ b/drawing_to_fsd_layout/common.py
@@ -0,0 +1,5 @@
+import numpy as np
+
+FloatArrayNx2 = np.typing.NDArray[np.float64]
+IntArrayN = np.typing.NDArray[np.int64]
+FloatArray2 = np.typing.NDArray[np.float64]
diff --git a/drawing_to_fsd_layout/cone_placement.py b/drawing_to_fsd_layout/cone_placement.py
new file mode 100644
index 0000000..46a2248
--- /dev/null
+++ b/drawing_to_fsd_layout/cone_placement.py
@@ -0,0 +1,301 @@
+import networkx as nx
+import numpy as np
+import streamlit as st
+from scipy.ndimage import uniform_filter1d
+from scipy.spatial.distance import cdist
+
+from drawing_to_fsd_layout.common import FloatArray2, FloatArrayNx2, IntArrayN
+
+
+def circle_fit(coords: np.ndarray, max_iter: int = 99) -> np.ndarray:
+ """
+ Function taken from: https://github.com/papalotis/ft-fsd-path-planning/blob/d82ba8f93c753a9d0fe0c77fa8c9af88aafad6ea/fsd_path_planning/utils/math_utils.py#L569
+
+ Fit a circle to a set of points. This function is adapted from the hyper_fit
+ function in the circle-fit package (https://pypi.org/project/circle-fit/).
+ The function is a njit version of the original function with some input validation
+ removed. Furthermore, the residuals are not calculated or returned.
+ Args:
+ coords: The coordinates of the points as an [N, 2] array.
+ max_iter: The maximum number of iterations.
+ Returns:
+ An array with 3 elements:
+ - center x
+ - center y
+ - radius
+ """
+
+ X = coords[:, 0]
+ Y = coords[:, 1]
+
+ n = X.shape[0]
+
+ Xi = X - X.mean()
+ Yi = Y - Y.mean()
+ Zi = Xi * Xi + Yi * Yi
+
+ # compute moments
+ Mxy = (Xi * Yi).sum() / n
+ Mxx = (Xi * Xi).sum() / n
+ Myy = (Yi * Yi).sum() / n
+ Mxz = (Xi * Zi).sum() / n
+ Myz = (Yi * Zi).sum() / n
+ Mzz = (Zi * Zi).sum() / n
+
+ # computing the coefficients of characteristic polynomial
+ Mz = Mxx + Myy
+ Cov_xy = Mxx * Myy - Mxy * Mxy
+ Var_z = Mzz - Mz * Mz
+
+ A2 = 4 * Cov_xy - 3 * Mz * Mz - Mzz
+ A1 = Var_z * Mz + 4.0 * Cov_xy * Mz - Mxz * Mxz - Myz * Myz
+ A0 = Mxz * (Mxz * Myy - Myz * Mxy) + Myz * (Myz * Mxx - Mxz * Mxy) - Var_z * Cov_xy
+ A22 = A2 + A2
+
+ # finding the root of the characteristic polynomial
+ y = A0
+ x = 0.0
+ for _ in range(max_iter):
+ Dy = A1 + x * (A22 + 16.0 * x * x)
+ x_new = x - y / Dy
+ if x_new == x or not np.isfinite(x_new):
+ break
+ y_new = A0 + x_new * (A1 + x_new * (A2 + 4.0 * x_new * x_new))
+ if abs(y_new) >= abs(y):
+ break
+ x, y = x_new, y_new
+
+ det = x * x - x * Mz + Cov_xy
+ X_center = (Mxz * (Myy - x) - Myz * Mxy) / det / 2.0
+ Y_center = (Myz * (Mxx - x) - Mxz * Mxy) / det / 2.0
+
+ x = X_center + X.mean()
+ y = Y_center + Y.mean()
+ r = np.sqrt(abs(X_center**2 + Y_center**2 + Mz))
+
+ return np.array([x, y, r])
+
+
+def create_cyclic_sliding_window_indices(
+ window_size: int, step_size: int, signal_length: int
+) -> IntArrayN:
+ """
+ Function taken from https://github.com/papalotis/ft-fsd-path-planning/blob/main/fsd_path_planning/calculate_path/path_parameterization.py
+ """
+ if window_size % 2 == 0:
+ raise ValueError("Window size must be odd.")
+ half_window_size = window_size // 2
+
+ indexer = (
+ np.arange(-half_window_size, half_window_size + 1)
+ + np.arange(0, signal_length, step_size).reshape(-1, 1)
+ ) % signal_length
+ return indexer
+
+
+def calculate_path_curvature(
+ path: FloatArrayNx2, window_size: int, path_is_closed: bool
+) -> FloatArrayNx2:
+ """
+ Function taken from https://github.com/papalotis/ft-fsd-path-planning/blob/main/fsd_path_planning/calculate_path/path_parameterization.py
+
+ Calculate the curvature of the path.
+ Args:
+ path: The path as a 2D array of points.
+ window_size: The size of the window to use for the curvature calculation.
+ path_is_closed: Whether the path is closed or not.
+ Returns:
+ The curvature of the path.
+ """
+ windows = create_cyclic_sliding_window_indices(
+ window_size=window_size, step_size=1, signal_length=len(path)
+ )
+
+ path_curvature = np.zeros(len(path))
+ for i, window in enumerate(windows):
+ if not path_is_closed:
+ diff = window[1:] - window[:-1]
+ if np.any(diff != 1):
+ idx_cutoff = int(np.argmax(diff != 1) + 1)
+ if i < window_size:
+ window = window[idx_cutoff:]
+ else:
+ window = window[:idx_cutoff]
+
+ points_in_window = path[window]
+
+ _, _, radius = circle_fit(points_in_window)
+ radius = min(
+ max(radius, 1.0), 3000.0
+ ) # np.clip didn't work for some reason (numba bug?)
+ curvature = 1 / radius
+ three_idxs = np.array([0, int(len(points_in_window) / 2), -1])
+ three_points = points_in_window[three_idxs]
+ hom_points = np.column_stack((np.ones(3), three_points))
+ sign = np.linalg.det(hom_points)
+
+ path_curvature[i] = curvature * np.sign(sign)
+
+ mode = "wrap" if path_is_closed else "nearest"
+
+ filtered_curvature = uniform_filter1d(
+ path_curvature, size=window_size // 10, mode=mode
+ )
+ # filtered_curvature = path_curvature
+
+ return filtered_curvature
+
+
+def calculate_edge_curvature(edge: FloatArrayNx2) -> FloatArrayNx2:
+ window_size = len(edge) // 25
+ if window_size % 2 == 0:
+ window_size += 1
+ return calculate_path_curvature(edge, window_size=window_size, path_is_closed=True)
+
+
+def place_cones_for_edge(edge: FloatArrayNx2) -> FloatArrayNx2:
+ """
+ Place cones along an edge.
+
+ Args:
+ edge: The edge to place cones on. Shape (n,2)
+ cone_spacing: The distance between cones.
+
+ Returns:
+ The locations of the cones. Shape (m,2)
+ """
+ raise NotImplementedError
+
+
+def calculate_min_track_width_index(
+ edge_1: FloatArrayNx2, edge_2: FloatArrayNx2
+) -> tuple[int, int]:
+ distances = cdist(edge_1, edge_2)
+ idx_min_distance_to_1 = np.min(distances, axis=1).argmin()
+ idx_min_distance_to_2 = np.min(distances, axis=0).argmin()
+
+ return idx_min_distance_to_1, idx_min_distance_to_2
+
+
+def calculate_min_track_width(edge_1: FloatArrayNx2, edge_2: FloatArrayNx2) -> float:
+ idx_min_distance_to_1, idx_min_distance_to_2 = calculate_min_track_width_index(
+ edge_1, edge_2
+ )
+ edge_1_closest_point = edge_1[idx_min_distance_to_1]
+ edge_2_closest_point = edge_2[idx_min_distance_to_2]
+
+ return np.linalg.norm(edge_1_closest_point - edge_2_closest_point)
+
+
+def estimate_centerline_from_edges(
+ edge_1: FloatArrayNx2, edge_2: FloatArrayNx2
+) -> FloatArrayNx2:
+ shorter, longer = sorted([edge_1, edge_2], key=len)
+ idx_shorter = cdist(shorter, longer).argmin(axis=0)
+ shorter_sampled = shorter[idx_shorter]
+ centerline = (longer + shorter_sampled) / 2
+
+ return centerline
+
+
+def estimate_centerline_length(edge_1: FloatArrayNx2, edge_2: FloatArrayNx2) -> float:
+ edge_1_sampled = edge_1[::10]
+ edge_2_sampled = edge_2[::10]
+ centerline = estimate_centerline_from_edges(edge_1_sampled, edge_2_sampled)
+ return np.sum(np.linalg.norm(np.diff(centerline, axis=0), axis=1))
+
+
+def split_edge_to_straight_and_curve(
+ edge: FloatArrayNx2,
+ curvature_threshold_upper: float = 0.02,
+ curvature_threshold_lower: float = 0.01,
+) -> IntArrayN:
+ """
+ Split an edge into straight and curve sections.
+ """
+ curvature_threshold_lower, curvature_threshold_upper = sorted(
+ (curvature_threshold_lower, curvature_threshold_upper)
+ )
+ curvature = calculate_edge_curvature(edge)
+ start_index = np.argmin(curvature) # start from a straight
+
+ # 0 is straight, 1 is left curve, -1 is right curve
+ out = np.zeros(len(curvature), dtype=int)
+ out[start_index] = 0
+ for offset in range(1, len(curvature) + 1):
+ previous_index = (start_index + offset - 1) % len(curvature)
+ next_index = (start_index + offset) % len(curvature)
+
+ state = out[previous_index]
+ if state == 0 and abs(curvature[next_index]) > curvature_threshold_upper:
+ state = np.sign(curvature[next_index])
+ elif state != 0 and abs(curvature[next_index]) < curvature_threshold_lower:
+ state = 0
+
+ out[next_index] = state
+
+ return out
+
+
+def decide_start_finish_line_position_and_direction(
+ edge: FloatArrayNx2,
+) -> tuple[FloatArray2, FloatArray2]:
+ track_point_types = split_edge_to_straight_and_curve(edge)
+ straights_indices: list[list[int]] = [[]]
+ for i, track_point_type in enumerate(track_point_types):
+ if track_point_type == 0:
+ straights_indices[-1].append(i)
+ else:
+ if len(straights_indices[-1]) > 0:
+ straights_indices.append([])
+
+ longest_straight_indices = max(straights_indices, key=len)
+ start_index = longest_straight_indices[len(longest_straight_indices) // 2]
+ direction_of_straight = np.diff(edge[longest_straight_indices], axis=0).mean(axis=0)
+ return edge[start_index], direction_of_straight / np.linalg.norm(
+ direction_of_straight
+ )
+
+
+def fix_edge_direction(
+ edge: FloatArrayNx2, start_position: FloatArray2, start_direction: FloatArray2
+) -> FloatArrayNx2:
+ idx_edge_start = np.linalg.norm(edge - start_position, axis=1).argmin()
+ edge_rolled = np.roll(edge, -idx_edge_start, axis=0)
+
+ edge_direction = edge_rolled[2] - edge_rolled[0]
+ dot = np.dot(edge_direction, start_direction)
+ if dot < 0:
+ edge_rolled = edge_rolled[::-1]
+
+ return edge_rolled
+
+
+def place_cones(
+ trace: FloatArrayNx2, seed: int, mean: float, variance: float
+) -> FloatArrayNx2:
+ rng = np.random.default_rng(seed)
+
+ idxs_to_keep = [0]
+ next_distance = rng.normal(mean, variance)
+ next_idx = idxs_to_keep[0]
+ while next_idx < len(trace):
+ trace_from_last_in = trace[next_idx:]
+ dist_to_next = np.linalg.norm(np.diff(trace_from_last_in, axis=0), axis=1)
+ cum_dist = np.cumsum(dist_to_next)
+
+ offset_next_idx = np.argmax(cum_dist > next_distance)
+
+ if offset_next_idx == 0:
+ break
+
+ next_idx = offset_next_idx + next_idx
+ idxs_to_keep.append(next_idx)
+ next_distance = rng.normal(mean, variance)
+
+ randomly_placed_cones = trace[idxs_to_keep]
+
+ st.write(np.linalg.norm(np.diff(randomly_placed_cones, axis=0), axis=1).mean())
+
+
+ return randomly_placed_cones
diff --git a/drawing_to_fsd_layout/export.py b/drawing_to_fsd_layout/export.py
new file mode 100644
index 0000000..2f035c5
--- /dev/null
+++ b/drawing_to_fsd_layout/export.py
@@ -0,0 +1,250 @@
+import json
+from enum import Enum
+from struct import Struct
+from typing import Dict, List, Literal, Tuple, cast
+
+import numpy as np
+
+from drawing_to_fsd_layout.common import FloatArrayNx2
+
+
+def export_json_string(edges_left: FloatArrayNx2, edges_right: FloatArrayNx2) -> str:
+ """
+ Export the track edges to a JSON string that can be used in the FSD layout editor.
+
+ Args:
+ edges_left: The left track edge. Shape (n,2)
+ edges_right: The right track edge. Shape (n,2)
+
+ Returns:
+ The JSON string
+ """
+ cones_x = np.concatenate((edges_left[:, 0], edges_right[:, 0])).tolist()
+ cones_y = np.concatenate((edges_left[:, 1], edges_right[:, 1])).tolist()
+ cones_color = (
+ ["orange_big"]
+ + ["blue"] * (len(edges_left) - 1)
+ + ["orange_big"]
+ + ["yellow"] * (len(edges_right) - 1)
+ )
+ obj = {
+ "x": cones_x,
+ "y": cones_y,
+ "color": cones_color,
+ }
+
+ return json.dumps(obj)
+
+
+# --- Export to LYT file ---
+
+
+def angle_from_2d_vector(vecs: np.ndarray) -> np.ndarray:
+ if vecs.shape == (2,):
+ return np.arctan2(vecs[1], vecs[0])
+ if vecs.ndim == 2 and vecs.shape[-1] == 2:
+ return np.arctan2(vecs[:, 1], vecs[:, 0])
+ raise ValueError("vecs can either be a 2d vector or an array of 2d vectors")
+
+
+class ConeTypes(int, Enum):
+ """
+ Enum for all possible cone types
+ """
+
+ UNKNOWN = 0
+ YELLOW = RIGHT = 1
+ BLUE = LEFT = 2
+ ORANGE_SMALL = START_FINISH_AREA = 3
+ ORANGE_BIG = START_FINISH_LINE = 4
+
+
+HEADER_STRUCT = Struct("6sBBhBB")
+BLOCK_STRUCT = Struct("2h4B")
+
+
+ConeTypeToLytObjectIndex: Dict[ConeTypes, int] = {
+ ConeTypes.UNKNOWN: 25,
+ ConeTypes.YELLOW: 29, # 30 also possible
+ ConeTypes.BLUE: 23, # 24 also possible
+ ConeTypes.ORANGE_BIG: 27,
+ ConeTypes.ORANGE_SMALL: 20, # 20 is red, we use it to represent a small orange cone
+}
+
+LytObjectIndexToConeType: Dict[int, ConeTypes] = {
+ 25: ConeTypes.UNKNOWN,
+ 29: ConeTypes.YELLOW,
+ 30: ConeTypes.YELLOW,
+ 23: ConeTypes.BLUE,
+ 24: ConeTypes.BLUE,
+ 27: ConeTypes.ORANGE_BIG,
+ 20: ConeTypes.ORANGE_SMALL,
+}
+
+
+def _to_lyt_heading(heading: np.ndarray, input_is_radians: bool) -> np.ndarray:
+ """
+ Convert real world heading (direction) to lyt heading
+
+ Args:
+ heading (np.ndarray): The heading in the real world
+ input_is_radians (bool, optional): If set to True the input will be converted to
+ degrees as required by LYT.
+
+ Returns:
+ np.ndarray: The heading is required by LYT
+ """
+ if input_is_radians:
+ heading = np.rad2deg(heading)
+ return ((heading + 180) * 256 // 360).astype(np.uint8)
+
+
+def _create_lyt_block_for_cone(
+ x_pos: int, y_pos: int, heading: int, color_index: int
+) -> bytes:
+ z_height = 240 # suggested by documentation (puts element on ground)
+ flags = 0 # these are simple cone objects no flags
+ block = BLOCK_STRUCT.pack(x_pos, y_pos, z_height, flags, color_index, heading)
+ return block
+
+
+def _create_lyt_trace_bytes(trace: np.ndarray, color_idx: int) -> bytes:
+ if len(trace) > 0:
+ trace_looped = cast(np.ndarray, np.vstack((trace, trace[:1])))
+ heading = angle_from_2d_vector(trace_looped[1:] - trace_looped[:-1])
+ # NOTE 2 in format
+ heading = _to_lyt_heading(heading, input_is_radians=True)
+ all_blocks = b"".join(
+ _create_lyt_block_for_cone(x, y, h, color_idx)
+ for (x, y), h in zip(trace, heading)
+ )
+ return all_blocks
+ return b""
+
+
+def _create_start_block(x_pos: int, y_pos: int, heading: float) -> bytes:
+ z_height = 240
+ index = 0
+ flags = 0 # start position has 0 width
+ heading = _to_lyt_heading(heading, input_is_radians=True)
+ block = BLOCK_STRUCT.pack(x_pos, y_pos, z_height, flags, index, heading)
+ return block
+
+
+def _create_finish_block(x_pos: int, y_pos: int, heading: float, width: float) -> bytes:
+ block = bytearray(_create_start_block(x_pos, y_pos, heading))
+ # width of finish object
+ block[5] |= int(width / 2) << 2
+ return bytes(block)
+
+
+def _create_checkpoint_block(
+ x_pos: int,
+ y_pos: int,
+ heading: float,
+ width: float,
+ checkpoint_index: Literal[1, 2, 3],
+) -> bytes:
+ if checkpoint_index not in (1, 2, 3):
+ raise ValueError(
+ f"checkout_index must be either 1, 2 or 3. It is {checkpoint_index}"
+ )
+ block = bytearray(_create_finish_block(x_pos, y_pos, heading, width))
+ block[5] |= checkpoint_index
+ return bytes(block)
+
+
+def _traces_to_lyt_bytes(
+ cones_per_type: List[np.ndarray], offset_in_meters: Tuple[float, float]
+) -> bytes:
+ lfs_scale = 16
+ offset = offset_in_meters * lfs_scale
+
+ cones_in_map = [(c * lfs_scale + offset).astype(int) for c in cones_per_type]
+
+ cones_bytes = [
+ _create_lyt_trace_bytes(cones, ConeTypeToLytObjectIndex[cone_type])
+ for cone_type, cones in zip(ConeTypes, cones_in_map)
+ ]
+
+ right_in_map = cones_in_map[ConeTypes.RIGHT]
+ left_in_map = cones_in_map[ConeTypes.LEFT]
+ start_finish_in_map = cones_in_map[ConeTypes.START_FINISH_LINE]
+
+ pos_x, pos_y = (left_in_map[-2] + right_in_map[-2]) // 2
+ start_heading: float = (
+ angle_from_2d_vector(left_in_map[-2] - left_in_map[-1]) + np.pi / 2
+ )
+ start_block = _create_start_block(pos_x, pos_y, start_heading)
+
+ finish_pos_x, finish_pos_y = (start_finish_in_map[0] + start_finish_in_map[1]) // 2
+
+ # divide by lfs_scale because we need actual width
+ finish_width = 3 * (
+ np.linalg.norm(start_finish_in_map[0] - start_finish_in_map[1]) / lfs_scale
+ )
+ finish_heading = angle_from_2d_vector(left_in_map[-1] - left_in_map[0]) + np.pi / 2
+ finish_block = _create_finish_block(
+ finish_pos_x, finish_pos_y, finish_heading, finish_width
+ )
+
+ # put checkpoint approximately in the middle
+ # this is so that lfs counts lap times
+ half_len = len(left_in_map) // 2
+ left_point_half = left_in_map[half_len]
+ right_point_half_index = np.linalg.norm(
+ left_point_half - right_in_map, axis=1
+ ).argmin(axis=0)
+
+ right_point_half = right_in_map[right_point_half_index]
+ check_pos_x, check_pos_y = (left_point_half + right_point_half) // 2
+ check_width = 5 * (np.linalg.norm(left_point_half - right_point_half) / lfs_scale)
+ check_heading = (
+ angle_from_2d_vector(left_in_map[half_len - 1] - left_in_map[half_len])
+ + np.pi / 2
+ )
+
+ print(check_heading)
+ check_block = _create_checkpoint_block(
+ check_pos_x, check_pos_y, check_heading, check_width, 1
+ )
+
+ final_object_blocks = b"".join(
+ (start_block, finish_block, check_block, *cones_bytes)
+ )
+
+ n_obj = len(final_object_blocks) // BLOCK_STRUCT.size
+ assert len(final_object_blocks) % BLOCK_STRUCT.size == 0
+ header = HEADER_STRUCT.pack(b"LFSLYT", 0, 251, n_obj, 10, 8)
+
+ final_bytes = b"".join((header, final_object_blocks))
+ return final_bytes
+
+
+def cones_to_lyt(
+ world_name: Literal["BL4", "AU1", "AU2", "AU3", "WE3", "LA2"],
+ cones_left: FloatArrayNx2,
+ cones_right: FloatArrayNx2,
+) -> bytes:
+ offset = {
+ "BL4": (-261, 124),
+ "AU1": (-50, -1010),
+ "AU2": (-138, -696),
+ "AU3": (-66, -50),
+ "WE3": (64, -1200),
+ "LA2": (538, 548),
+ }[world_name]
+
+ offset = np.array(offset)
+
+ assert offset is not None
+
+ cones_per_type = [np.empty((0, 2)) for _ in ConeTypes]
+ cones_per_type[ConeTypes.LEFT] = cones_left[1:]
+ cones_per_type[ConeTypes.RIGHT] = cones_right[1:]
+ cones_per_type[ConeTypes.START_FINISH_LINE] = np.row_stack(
+ (cones_left[0], cones_right[0])
+ )
+
+ bytes_to_write = _traces_to_lyt_bytes(cones_per_type, offset)
+ return bytes_to_write
diff --git a/drawing_to_fsd_layout/image_processing.py b/drawing_to_fsd_layout/image_processing.py
new file mode 100644
index 0000000..8aa2e75
--- /dev/null
+++ b/drawing_to_fsd_layout/image_processing.py
@@ -0,0 +1,244 @@
+from pathlib import Path
+from typing import Iterable
+
+import matplotlib.pyplot as plt
+import networkx as nx
+import numpy as np
+import streamlit as st
+from scipy.spatial.distance import cdist
+from skimage import io
+from skimage.color import rgb2gray
+from skimage.feature import canny
+from skimage.filters import unsharp_mask
+from skimage.transform import rescale
+
+from drawing_to_fsd_layout.common import FloatArrayNx2
+
+Image = np.typing.NDArray[np.float64]
+
+
+def rotate(points: np.ndarray, theta: float) -> np.ndarray:
+ """
+ Rotates the points in `points` by angle `theta` around the origin
+
+ Args:
+ points: The points to rotate. Shape (n,2)
+ theta: The angle by which to rotate in radians
+
+ Returns:
+ The points rotated
+ """
+ cos_theta, sin_theta = np.cos(theta), np.sin(theta)
+ rotation_matrix = np.array(((cos_theta, -sin_theta), (sin_theta, cos_theta))).T
+ return np.dot(points, rotation_matrix)
+
+
+def load_raw_image(image_path: Path | str) -> Image:
+ """
+ Load an image. If input is a string, it is assumed to be a path to an image. If
+ input is a Path, it is assumed to be a path to an image. If input is an image, it is
+ returned as is.
+ """
+ if isinstance(image_path, str):
+ image_path = Path(image_path)
+
+ image = io.imread(image_path) if isinstance(image_path, Path) else image_path.copy()
+
+ return image
+
+
+def create_close_point_adjacency_matrix(
+ edge_1: FloatArrayNx2, edge_2: FloatArrayNx2, distance: float
+) -> np.ndarray:
+ """
+ Combine two edges into a single edge.
+
+ Args:
+ edge_1: The first edge. Shape (n,2)
+ edge_2: The second edge. Shape (m,2)
+
+ Returns:
+ The combined edge.
+ """
+ dist = cdist(edge_1, edge_2)
+
+ # find the closest point in edge_2 for each point in edge_1
+ idxs_closest_in_edge_2 = dist.argmin(axis=1)
+
+ # find the closest point in edge_1 for each point in edge_2
+ idxs_closest_in_edge_1 = dist.argmin(axis=0)
+
+ adj_edge_1_to_edge_2 = idxs_closest_in_edge_1 < distance
+ adj_edge_2_to_edge_1 = idxs_closest_in_edge_2 < distance
+
+ combined_length = len(edge_1) + len(edge_2)
+
+ adj = np.zeros((combined_length, combined_length), dtype=bool)
+
+ adj[: len(edge_1), len(edge_1) :] = adj_edge_1_to_edge_2
+ adj[len(edge_1) :, : len(edge_1)] = adj_edge_2_to_edge_1
+
+ return adj
+
+
+def combine_edges(edge_1: FloatArrayNx2, edge_2: FloatArrayNx2) -> FloatArrayNx2:
+ """
+ Combine two edges into a single edge.
+
+ Args:
+ edge_1: The first edge. Shape (n,2)
+ edge_2: The second edge. Shape (m,2)
+
+ Returns:
+ The combined edge.
+ """
+ adj = create_close_point_adjacency_matrix(edge_1, edge_2, distance=2)
+ # remove one directional edges
+ adj = np.logical_and(adj, adj.T)
+
+ # find the index of the closest point in edge_2 for each point in edge_1
+ idx_edge_2_to_edge_1_pair = np.argmax(adj[: len(edge_1), len(edge_1) :], axis=1)
+ # if argmax returns 0, it means that there is no edge between the two points
+ filtered_idxs_mask = idx_edge_2_to_edge_1_pair != 0
+
+ edge_2_keep_idxs = idx_edge_2_to_edge_1_pair[filtered_idxs_mask]
+ edge_1_keep_idxs = np.arange(len(edge_1))[filtered_idxs_mask]
+
+ # combine the edges by element-wise mean
+ edge_1_keep = edge_1[edge_1_keep_idxs]
+ edge_2_keep = edge_2[edge_2_keep_idxs]
+ combined_edge = (edge_1_keep + edge_2_keep) / 2
+
+ return combined_edge
+
+
+@st.cache(show_spinner=False)
+def load_image_and_preprocess(
+ image_path: Path | str | Image, target_size: tuple[int, int] = (1000, 1000)
+) -> Image:
+ """Load an image and scale it to the correct size for FSD layout."""
+ image = load_raw_image(image_path)
+
+ if image.ndim == 3:
+ image = image[:, :, :3]
+
+ image_resolution = np.prod(image.shape)
+ target_resolution = np.prod(target_size)
+
+ rescale_ratio = target_resolution / image_resolution
+ # we need to take the root of the ratio to achieve the correct scaling
+ image_resized = rescale(image, rescale_ratio**0.5, channel_axis=-1)
+
+ # convert to grayscale
+ image_gray = (
+ rgb2gray(image_resized) if image_resized.ndim == 3 else image_resized.copy()
+ )
+ image_one_channel = image_gray[:, :, 0] if image_gray.ndim == 3 else image_gray
+
+ # apply unsharp mask
+ image_unsharp = unsharp_mask(image_one_channel, radius=5, amount=3)
+
+ return image_unsharp
+
+
+def reorder_track_edge(
+ g: nx.Graph, cc_idxs: Iterable[int], all_nodes_positions: FloatArrayNx2
+) -> FloatArrayNx2:
+ cc_idxs_list = list(cc_idxs)
+ start_index = cc_idxs_list[0]
+
+ idxs_ordered = list(nx.depth_first_search.dfs_preorder_nodes(g, start_index))
+
+ cc_positions_ordered = all_nodes_positions[idxs_ordered]
+
+ distances = np.linalg.norm(np.diff(cc_positions_ordered, axis=0), axis=1)
+ mask_remove = distances > 5
+
+ first_index_over_k = np.argmax(mask_remove)
+ cc_positions_ordered = cc_positions_ordered[:first_index_over_k]
+
+ return cc_positions_ordered
+
+
+def extract_track_edges(
+ image: Image,
+ show_steps: bool = False,
+) -> tuple[FloatArrayNx2, FloatArrayNx2]:
+ # apply canny edge detection with increasing sigma until the number of pixels
+ # designated as edges is low enough
+ for s in np.linspace(2, 5, 20):
+ image_canny = canny(image, sigma=s).astype(float)
+ m = image_canny.mean()
+ if m < 0.03: # prior, found by trial and error
+ break
+
+ if show_steps:
+ st.image(image_canny, caption="Canny edge detection")
+
+ # treat each pixel as a node in a graph and connect nodes that are close to each
+ # other
+ edge_pixels = np.argwhere(image_canny > 0.01)
+ dist = cdist(edge_pixels, edge_pixels)
+ adj = dist < 2
+ adj[np.eye(len(adj), len(adj), dtype=bool)] = 0
+ g = nx.from_numpy_matrix(adj)
+
+ # find the connected components of the graph
+ # we expect four connected components, two for the outer edge and two for the inner
+ # edge
+ cc = list(nx.connected_components(g))
+
+ best_ccs = sorted(cc, key=len, reverse=True)[:4]
+
+ best_clusters = [edge_pixels[list(idxs_in_cc)] for idxs_in_cc in best_ccs]
+ # keep the longer version of each track edge
+ if len(best_clusters) == 2:
+ outer, inner = best_clusters
+ cc_outer, cc_inner = best_ccs
+ else:
+ outer, _, inner, _ = best_clusters
+ cc_outer, _, cc_inner, _ = best_ccs
+
+ if show_steps:
+ plt.figure()
+ plt.plot(*outer.T, ".", markersize=1)
+ plt.plot(*inner.T, ".", markersize=1)
+ plt.axis("equal")
+ st.pyplot(plt.gcf())
+
+ outer_ordered = reorder_track_edge(g, cc_outer, edge_pixels)
+ inner_ordered = reorder_track_edge(g, cc_inner, edge_pixels)
+
+ if show_steps:
+ plt.figure()
+ plt.plot(*outer_ordered[::3].T, "-")
+ plt.plot(*inner_ordered[::3].T, "-")
+ plt.axis("equal")
+ st.pyplot(plt.gcf())
+
+ return outer_ordered, inner_ordered
+
+
+def fix_edges_orientation_and_scale_to_unit(
+ edge_a: FloatArrayNx2, edge_b: FloatArrayNx2
+) -> tuple[FloatArrayNx2, FloatArrayNx2]:
+ points = [edge_a, edge_b]
+
+ all_points = np.concatenate(points)
+
+ # peak to peak (max - min)
+ ptp = np.ptp(all_points)
+ min_value = np.min(all_points)
+ # min-max scaling
+ points_scaled = [(points - min_value) / ptp for points in points]
+
+ points_center = np.row_stack(points_scaled).mean(axis=0)
+
+ points_centered = [points - points_center for points in points_scaled]
+
+ # orientation fix
+ points_rotated = [rotate(p, theta=-3.1415 / 2) for p in points_centered]
+
+ return points_rotated[0], points_rotated[1]
+ return points_rotated[0], points_rotated[1]
+ return points_rotated[0], points_rotated[1]
diff --git a/drawing_to_fsd_layout/math_utils.py b/drawing_to_fsd_layout/math_utils.py
new file mode 100644
index 0000000..c1d0560
--- /dev/null
+++ b/drawing_to_fsd_layout/math_utils.py
@@ -0,0 +1,638 @@
+#!/usin_roll/bin/env python3
+# -*- coding:utf-8 -*-
+"""
+Description: A module with common mathematical functions
+
+Taken from ft-as-utils
+
+Project: FaSTTUBe Chabo Common
+"""
+from typing import Tuple, TypeVar, cast
+
+import numpy as np
+from numba import jit
+
+T = TypeVar("T")
+
+
+def my_njit(func: T) -> T:
+ """
+ numba.njit is an untyped decorator. This wrapper helps type checkers keep the
+ type information after applying the decorator. Furthermore, it sets some performance
+ flags
+
+ Args:
+ func (T): The function to jit
+
+ Returns:
+ T: The jitted function
+ """
+ jit_func: T = jit(nopython=True, cache=True, nogil=True, fastmath=True)(func)
+
+ return jit_func
+
+
+@my_njit
+def vec_dot(vecs1: np.ndarray, vecs2: np.ndarray) -> np.ndarray:
+ """
+ Mutliplies vectors in an array elementwise
+
+ Args:
+ vecs1 (np.array): The first "list" of vectors
+ vecs2 (np.array): The second "list" of vectors
+
+ Returns:
+ np.array: The results
+ """
+ return np.sum(vecs1 * vecs2, axis=-1)
+
+
+@my_njit
+def norm_of_last_axis(arr: np.ndarray) -> np.ndarray:
+
+ original_shape = arr.shape
+ arr_row_col = arr.flatten().reshape(-1, arr.shape[-1])
+ result = np.empty(arr_row_col.shape[0])
+ for i in range(arr_row_col.shape[0]):
+ vec = arr_row_col[i]
+ result[i] = np.sqrt(vec_dot(vec, vec))
+
+ result = result.reshape(original_shape[:-1])
+
+ return result
+
+
+@my_njit
+def vec_angle_between(
+ vecs1: np.ndarray, vecs2: np.ndarray, clip_cos_theta: bool = True
+) -> np.ndarray:
+ """
+ Calculates the angle between the vectors of the last dimension
+
+ Args:
+ vecs1 (np.ndarray): An array of shape (...,2)
+ vecs2 (np.ndarray): An array of shape (...,2)
+ clip_cos_theta (bool): Clip the values of the dot products so that they are
+ between -1 and 1. Defaults to True.
+
+ Returns:
+ np.ndarray: A vector, such that each element i contains the angle between
+ vectors vecs1[i] and vecs2[i]
+ """
+ cos_theta = vec_dot(vecs1, vecs2)
+
+ cos_theta /= norm_of_last_axis(vecs1) * norm_of_last_axis(vecs2)
+
+ cos_theta = np.asarray(cos_theta)
+
+ cos_theta_flat = cos_theta.ravel()
+
+ if clip_cos_theta:
+ cos_theta_flat[cos_theta_flat < -1] = -1
+ cos_theta_flat[cos_theta_flat > 1] = 1
+
+ return np.arccos(cos_theta)
+
+
+def rotate(points: np.ndarray, theta: float) -> np.ndarray:
+ """
+ Rotates the points in `points` by angle `theta` around the origin
+
+ Args:
+ points (np.array): The points to rotate. Shape (n,2)
+ theta (float): The angle by which to rotate in radians
+
+ Returns:
+ np.array: The points rotated
+ """
+ cos_theta, sin_theta = np.cos(theta), np.sin(theta)
+ rotation_matrix = np.array(((cos_theta, -sin_theta), (sin_theta, cos_theta))).T
+ return np.dot(points, rotation_matrix)
+
+
+@my_njit
+def my_cdist_sq_euclidean(arr_a: np.ndarray, arr_b: np.ndarray) -> np.ndarray:
+ """
+ Calculates the pairwise square euclidean distances from each point in `X` to each
+ point in `Y`
+
+ Credit:
+ Uses https://stackoverflow.com/a/56084419 which in turn uses
+ https://github.com/droyed/eucl_dist
+
+ Args:
+ arr_a (np.array): A 2d array of shape (m,k)
+ arr_b (np.array): A 2d array of shape (n,k)
+
+ Returns:
+ np.array: A matrix of shape (m,n) containing the square euclidean distance
+ between all the points in `X` and `Y`
+ """
+ n_x, dim = arr_a.shape
+ x_ext = np.empty((n_x, 3 * dim))
+ x_ext[:, :dim] = 1
+ x_ext[:, dim : 2 * dim] = arr_a
+ x_ext[:, 2 * dim :] = np.square(arr_a)
+
+ n_y = arr_b.shape[0]
+ y_ext = np.empty((3 * dim, n_y))
+ y_ext[:dim] = np.square(arr_b).T
+ y_ext[dim : 2 * dim] = -2 * arr_b.T
+ y_ext[2 * dim :] = 1
+
+ return np.dot(x_ext, y_ext)
+
+
+@my_njit
+def calc_pairwise_distances(
+ points: np.ndarray, dist_to_self: float = 0.0
+) -> np.ndarray:
+ """
+ Given a set of points, creates a distance matrix from each point to every point
+
+ Args:
+ points (np.ndarray): The points for which the distance matrix should be
+ calculated dist_to_self (np.ndarray, optional): The distance to set the
+ diagonal. Defaults to 0.0.
+
+ Returns:
+ np.ndarray: The 2d distance matrix
+ """
+ pairwise_distances = my_cdist_sq_euclidean(points, points)
+
+ if dist_to_self != 0:
+ for i in range(len(points)):
+ pairwise_distances[i, i] = dist_to_self
+ return pairwise_distances
+
+
+@my_njit
+def my_in1d(test_values: np.ndarray, source_container: np.ndarray) -> np.ndarray:
+ """
+ Calculate a boolean mask for a 1d array indicating if an element in `test_values` is
+ present in `source container` which is also 1d
+
+ Args:
+ test_values (np.ndarray): The values to test if they are inside the container
+ source_container (np.ndarray): The container
+
+ Returns:
+ np.ndarray: A boolean array with the same length as `test_values`. If
+ `return_value[i]` is `True` then `test_value[i]` is in `source_container`
+ """
+ source_sorted = np.sort(source_container)
+ is_in = np.zeros(test_values.shape[0], dtype=np.bool_)
+ for i, test_val in enumerate(test_values):
+ for source_val in source_sorted:
+
+ if test_val == source_val:
+ is_in[i] = True
+ break
+
+ if source_val > test_val:
+ break
+
+ return is_in
+
+
+def trace_calculate_consecutive_radii(trace: np.ndarray) -> np.ndarray:
+ """
+ Expects a (n,2) array and returns the radius of the circle that passes
+ between all consecutive point triples. The radius between index 0,1,2, then 1,2,3
+ and so on
+
+ Args:
+ trace (np.ndarray): The points for which the radii will be calculated
+
+ Returns:
+ np.ndarray: The radii for each consecutive point triple
+ """
+
+ # TODO: Vectorize this function. Limit is the indexer
+ indexer = np.arange(3)[None, :] + 1 * np.arange(trace.shape[-2] - 2)[:, None]
+
+ points = trace[indexer]
+ radii = calculate_radius_from_points(points)
+ return radii
+
+
+def trace_distance_to_next(trace: np.ndarray) -> np.ndarray:
+ """
+ Calculates the distance of one point in the trace to the next. Obviously the last
+ point doesn't have any distance associated
+
+ Args:
+ trace (np.array): The points of the trace
+
+ Returns:
+ np.array: A vector containing the distances from one point to the next
+ """
+ return np.linalg.norm(np.diff(trace, axis=-2), axis=-1)
+
+
+def trace_angles_between(trace: np.ndarray) -> np.ndarray:
+ """
+ Calculates the angles in a trace from each point to its next
+
+ Args:
+ trace (np.array): The trace containing a series of 2d vectors
+
+ Returns:
+ np.array: The angle from each vector to its next, with `len(return_value) ==
+ len(trace) - 1`
+ """
+ all_to_next = np.diff(trace, axis=-2)
+ from_middle_to_next = all_to_next[..., 1:, :]
+ from_middle_to_prev = -all_to_next[..., :-1, :]
+ angles = vec_angle_between(from_middle_to_next, from_middle_to_prev)
+ return angles
+
+
+@my_njit
+def unit_2d_vector_from_angle(rad: np.ndarray) -> np.ndarray:
+ """
+ Creates unit vectors for each value in the rad array
+
+ Args:
+ rad (np.array): The angles (in radians) for which the vectors should be created
+
+ Returns:
+ np.array: The created unit vectors
+ """
+ rad = np.asarray(rad)
+ new_shape = rad.shape + (2,)
+ res = np.empty(new_shape, dtype=rad.dtype)
+ res[..., 0] = np.cos(rad)
+ res[..., 1] = np.sin(rad)
+ return res
+
+
+# Calculates the angle of each vector in `vecs`
+# TODO: Look into fixing return type when a single vector is provided (return float)
+def angle_from_2d_vector(vecs: np.ndarray) -> np.ndarray:
+ """
+ Calculates the angle of each vector in `vecs`. If `vecs` is just a single 2d vector
+ then one angle is calculated and a scalar is returned
+
+ >>> import numpy as np
+ >>> x = np.array([[1, 0], [1, 1], [0, 1]])
+ >>> angle_from_2d_vector(x)
+ >>> array([0. , 0.78539816, 1.57079633])
+
+ Args:
+ vecs (np.array): The vectors for which the angle is calculated
+
+ Raises:
+ ValueError: If `vecs` has the wrong shape a ValueError is raised
+
+ Returns:
+ np.array: The angle of each vector in `vecs`
+ """
+ if vecs.shape == (2,):
+ return np.arctan2(vecs[1], vecs[0])
+ if vecs.ndim == 2 and vecs.shape[-1] == 2:
+ return np.arctan2(vecs[:, 1], vecs[:, 0])
+ raise ValueError("vecs can either be a 2d vector or an array of 2d vectors")
+
+
+def normalize(vecs: np.ndarray, axis: int = -1) -> np.ndarray:
+ """
+ Returns a normalized version of vecs
+
+ Args:
+ vecs (np.ndarray): The vectors to normalize
+ axis (int, optional): The axis to use for lengths. Defaults to -1.
+
+ Returns:
+ np.ndarray: The normalized vectors
+ """
+ return vecs / np.linalg.norm(vecs, axis=axis, keepdims=True)
+
+
+@my_njit
+def lerp(
+ values_to_lerp: np.ndarray,
+ start1: np.ndarray,
+ stop1: np.ndarray,
+ start2: np.ndarray,
+ stop2: np.ndarray,
+) -> np.ndarray:
+ """
+ Linearly interpolates (lerps) from one sin_pitchace `[start1, stop1]` to another
+ `[start2, stop2]`. `start1 >= stop1` and `start2 >= stop2` are allowed. If ns is a
+ 2d array, then start1, stop1, start2, stop2 must be 1d vectors. This allows for
+ lerping in any n-dim sin_pitchace
+
+ >>> import numpy as np
+ >>> x = np.array([1, 2, 3])
+ >>> lerp(x, 0, 10, 30, 100)
+ >>> array([37., 44., 51.])
+
+ Args:
+ values_to_lerp (np.array): The points to interpolate
+ start1 (np.array): The beginning of the original sin_pitchace
+ stop1 (np.array): The end of the original sin_pitchace
+ start2 (np.array): The beginning of the target sin_pitchace
+ stop2 (np.array): The end of the target sin_pitchace
+
+ Returns:
+ np.array: The interpolated points
+ """
+ return (values_to_lerp - start1) / (stop1 - start1) * (stop2 - start2) + start2
+
+
+def calculate_radius_from_points(points: np.ndarray) -> np.ndarray:
+ """
+ Given a three points this function calculates the radius of the circle that passes
+ through these points
+
+ Based on: https://math.stackexchange.com/questions/133638/
+ how-does-this-equation-to-find-the-radius-from-3-points-actually-work
+
+ Args:
+ points (np.ndarray): The points for which should be used to calculate the radius
+
+ Returns:
+ np.ndarray: The calculated radius
+ """
+ # implements the equation discussed here:
+ #
+ # assert points.shape[-2:] == (3, 2)
+ # get side lengths
+ points_circular = points[..., [0, 1, 2, 0], :]
+ len_sides = trace_distance_to_next(points_circular)
+
+ # calc prod of sides
+ prod_of_sides = np.prod(len_sides, axis=-1, keepdims=True)
+
+ # calc area of triangle
+ # https://www.mathopenref.com/heronsformula.html
+
+ # calc half of perimeter
+ perimeter = np.sum(len_sides, axis=-1, keepdims=True)
+ half_perimeter = perimeter / 2
+ half_perimeter_minus_sides = half_perimeter - len_sides
+ area_sqr = (
+ np.prod(half_perimeter_minus_sides, axis=-1, keepdims=True) * half_perimeter
+ )
+ area = np.sqrt(area_sqr)
+
+ radius = prod_of_sides / (area * 4)
+
+ radius = radius[..., 0]
+ return radius
+
+
+Numeric = TypeVar("Numeric", float, np.ndarray)
+
+
+def linearly_combine_values_over_time(
+ tee: float, delta_time: float, previous_value: Numeric, new_value: Numeric
+) -> Numeric:
+ """
+ Linear combination of two values over time
+ (see https://de.wikipedia.org/wiki/PT1-Glied)
+ Args:
+ tee (float): The parameter selecting how much we keep from the previous value
+ and how much we update from the new
+ delta_time (float): The time difference between the previous and new value
+ previous_value (Numeric): The previous value
+ new_value (Numeric): The next value
+
+ Returns:
+ Numeric: The combined value
+ """
+ tee_star = 1 / (tee / delta_time + 1)
+ combined_value: Numeric = tee_star * (new_value - previous_value) + previous_value
+ return combined_value
+
+
+def odd_square(values: Numeric) -> Numeric:
+ return cast(Numeric, np.sign(values) * np.square(values))
+
+
+def euler_angles_to_quaternion(euler_angles: np.ndarray) -> np.ndarray:
+ """
+ Converts Euler angles to a quaternion representation.
+
+ Args:
+ euler_angles (np.ndarray): Euler angles as an [...,3] array. Order is
+ [roll, pitch, yaw]
+
+ Returns:
+ np.ndarray: The quaternion representation in [..., 4] [x, y, z, w] order
+ """
+ roll_index, pitch_index, yaw_index = 0, 1, 2
+ sin_values = np.sin(euler_angles * 0.5)
+ cos_values = np.cos(euler_angles * 0.5)
+
+ cos_yaw = cos_values[..., yaw_index]
+ sin_yaw = sin_values[..., yaw_index]
+ cos_pitch = cos_values[..., pitch_index]
+ sin_pitch = sin_values[..., pitch_index]
+ cos_roll = cos_values[..., roll_index]
+ sin_roll = sin_values[..., roll_index]
+
+ quaternion_x = sin_roll * cos_pitch * cos_yaw - cos_roll * sin_pitch * sin_yaw
+ quaternion_y = cos_roll * sin_pitch * cos_yaw + sin_roll * cos_pitch * sin_yaw
+ quaternion_z = cos_roll * cos_pitch * sin_yaw - sin_roll * sin_pitch * cos_yaw
+ quaternion_w = cos_roll * cos_pitch * cos_yaw + sin_roll * sin_pitch * sin_yaw
+
+ return_value = np.stack(
+ [quaternion_x, quaternion_y, quaternion_z, quaternion_w], axis=-1
+ )
+ return return_value
+
+
+def quaternion_to_euler_angles(quaternion: np.ndarray) -> np.ndarray:
+ """
+ Converts a quaternion to Euler angles. Based on
+ https://stackoverflow.com/a/37560411.
+
+ Args:
+ quaternion (np.ndarray): The quaternion as an [..., 4] array. Order is
+ [x, y, z, w]
+
+ Returns:
+ np.ndarray: The Euler angles as an [..., 3] array. Order is [roll, pitch, yaw]
+ """
+ x_index, y_index, z_index, w_index = 0, 1, 2, 3
+ x_value = quaternion[..., x_index]
+ y_value = quaternion[..., y_index]
+ z_value = quaternion[..., z_index]
+ w_value = quaternion[..., w_index]
+
+ y_square = y_value * y_value
+ temporary_0 = -2.0 * (y_square + z_value * z_value) + 1.0
+ temporary_1 = +2.0 * (x_value * y_value + w_value * z_value)
+ temporary_2 = -2.0 * (x_value * z_value - w_value * y_value)
+ temporary_3 = +2.0 * (y_value * z_value + w_value * x_value)
+ temporary_4 = -2.0 * (x_value * x_value + y_square) + 1.0
+
+ temporary_2 = np.clip(temporary_2, -1.0, 1.0)
+
+ roll = np.arctan2(temporary_3, temporary_4)
+ pitch = np.arcsin(temporary_2)
+ yaw = np.arctan2(temporary_1, temporary_0)
+
+ return_value = np.stack([roll, pitch, yaw], axis=-1)
+ return return_value
+
+
+def points_inside_ellipse(
+ points: np.ndarray,
+ center: np.ndarray,
+ major_direction: np.ndarray,
+ major_radius: float,
+ minor_radius: float,
+) -> np.ndarray:
+ """
+ Checks if a set of points are inside an ellipse.
+
+ Args:
+ points: The points as an [..., 2] array.
+ center: The center of the ellipse as an [2] array.
+ major_direction: The major direction of the ellipse as an [2] array.
+ major_radius: The major radius of the ellipse.
+ minor_radius: The minor radius of the ellipse.
+
+ Returns:
+ An [...] array of booleans.
+ """
+
+ # Center the points around the center
+ # [..., 2]
+ centered_points = points - center
+ # Calculate angle of the major direction with the x-axis
+ # [1]
+ major_direction_angle = float(angle_from_2d_vector(major_direction))
+ # Rotate the points around the center of the ellipse
+ # [..., 2]
+ rotated_points = rotate(centered_points, -major_direction_angle)
+ # [2]
+ radii_square = np.array([major_radius, minor_radius]) ** 2
+ # [...] [..., 2] [2]
+ criterion_value = (rotated_points**2 / radii_square).sum(axis=-1)
+
+ mask_is_inside = criterion_value < 1
+ return mask_is_inside
+
+
+def center_of_circle_from_3_points(
+ point_1: np.ndarray,
+ point_2: np.ndarray,
+ point_3: np.ndarray,
+ atol: float = 1e-6,
+) -> np.ndarray:
+ """
+ Calculates the center of a circle from three points.
+
+ Adapted from http://paulbourke.net/geometry/circlesphere/Circle.cpp (CalcCircle)
+
+ Args:
+ point_1: The first point as an [2] array.
+ point_2: The second point as an [2] array.
+ point_3: The third point as an [2] array.
+
+ Returns:
+ The center of the circle as an [2] array.
+ """
+ y_delta_1 = point_2[1] - point_1[1]
+ x_delta_1 = point_2[0] - point_1[0]
+ y_delta_2 = point_3[1] - point_2[1]
+ x_delta_2 = point_3[0] - point_2[0]
+
+ if np.isclose(x_delta_1, 0.0, atol=atol) and np.isclose(x_delta_2, 0.0, atol=atol):
+ center_x = (point_2[0] + point_3[0]) / 2
+ center_y = (point_1[1] + point_2[1]) / 2
+ return np.array([center_x, center_y]) # early return
+
+ slope_1 = y_delta_1 / x_delta_1
+ slope_2 = y_delta_2 / x_delta_2
+ if np.isclose(slope_1, slope_2, atol=atol):
+ raise ValueError("Points are colinear")
+
+ center_x = (
+ slope_1 * slope_2 * (point_1[1] - point_3[1])
+ + slope_2 * (point_1[0] + point_2[0])
+ - slope_1 * (point_2[0] + point_3[0])
+ ) / (2 * (slope_2 - slope_1))
+
+ center_y = (
+ -(center_x - (point_1[0] + point_2[0]) / 2) / slope_1
+ + (point_1[1] + point_2[1]) / 2
+ )
+
+ center = np.array([center_x, center_y])
+ return center
+
+
+@my_njit
+def circle_fit(coords: np.ndarray, max_iter: int = 99) -> np.ndarray:
+ """
+ Fit a circle to a set of points. This function is adapted from the hyper_fit function
+ in the circle-fit package (https://pypi.org/project/circle-fit/). The function is
+ a njit version of the original function with some input validation removed. Furthermore,
+ the residuals are not calculated or returned.
+
+ Args:
+ coords: The coordinates of the points as an [N, 2] array.
+ max_iter: The maximum number of iterations.
+
+ Returns:
+ An array with 3 elements:
+ - center x
+ - center y
+ - radius
+ """
+
+ X = coords[:, 0]
+ Y = coords[:, 1]
+
+ n = X.shape[0]
+
+ Xi = X - X.mean()
+ Yi = Y - Y.mean()
+ Zi = Xi * Xi + Yi * Yi
+
+ # compute moments
+ Mxy = (Xi * Yi).sum() / n
+ Mxx = (Xi * Xi).sum() / n
+ Myy = (Yi * Yi).sum() / n
+ Mxz = (Xi * Zi).sum() / n
+ Myz = (Yi * Zi).sum() / n
+ Mzz = (Zi * Zi).sum() / n
+
+ # computing the coefficients of characteristic polynomial
+ Mz = Mxx + Myy
+ Cov_xy = Mxx * Myy - Mxy * Mxy
+ Var_z = Mzz - Mz * Mz
+
+ A2 = 4 * Cov_xy - 3 * Mz * Mz - Mzz
+ A1 = Var_z * Mz + 4.0 * Cov_xy * Mz - Mxz * Mxz - Myz * Myz
+ A0 = Mxz * (Mxz * Myy - Myz * Mxy) + Myz * (Myz * Mxx - Mxz * Mxy) - Var_z * Cov_xy
+ A22 = A2 + A2
+
+ # finding the root of the characteristic polynomial
+ y = A0
+ x = 0.0
+ for _ in range(max_iter):
+ Dy = A1 + x * (A22 + 16.0 * x * x)
+ x_new = x - y / Dy
+ if x_new == x or not np.isfinite(x_new):
+ break
+ y_new = A0 + x_new * (A1 + x_new * (A2 + 4.0 * x_new * x_new))
+ if abs(y_new) >= abs(y):
+ break
+ x, y = x_new, y_new
+
+ det = x * x - x * Mz + Cov_xy
+ X_center = (Mxz * (Myy - x) - Myz * Mxy) / det / 2.0
+ Y_center = (Myz * (Mxx - x) - Mxz * Mxy) / det / 2.0
+
+ x = X_center + X.mean()
+ y = Y_center + Y.mean()
+ r = np.sqrt(abs(X_center**2 + Y_center**2 + Mz))
+
+ return np.array([x, y, r])
+
diff --git a/drawing_to_fsd_layout/spline_fit.py b/drawing_to_fsd_layout/spline_fit.py
new file mode 100644
index 0000000..f6abfa3
--- /dev/null
+++ b/drawing_to_fsd_layout/spline_fit.py
@@ -0,0 +1,135 @@
+"""
+Taken from https://github.com/papalotis/ft-fsd-path-planning/blob/main/fsd_path_planning/utils/math_utils.py
+"""
+from dataclasses import dataclass
+from typing import Any, Optional, Tuple
+
+import numpy as np
+from scipy.interpolate import splev, splprep
+
+
+@dataclass
+class SplineEvaluator:
+ """
+ A class for evaluating a spline.
+ """
+
+ max_u: float
+ tck: Tuple[Any, Any, int]
+ predict_every: float
+
+ def calculate_u_eval(self, max_u: Optional[float] = None) -> np.ndarray:
+ """
+ Calculate the u_eval values for the spline.
+ Args:
+ max_u (Optional[float], optional): The maximum u value. Defaults to None. If
+ None, the maximum u value used during fitting is taken.
+ Returns:
+ np.ndarray: The values for which the spline should be evaluated.
+ """
+
+ if max_u is None:
+ max_u = self.max_u
+ return np.arange(0, max_u, self.predict_every)
+
+ def predict(self, der: int, max_u: Optional[float] = None) -> np.ndarray:
+ """
+ Predict the spline. If der is 0, the function returns the spline. If der is 1,
+ the function returns the first derivative of the spline and so on.
+ Args:
+ der (int): The derivative to predict.
+ max_u (Optional[float], optional): The maximum u value. Defaults to None. If
+ None, the maximum u value used during fitting is taken.
+ Returns:
+ np.ndarray: The predicted spline.
+ """
+
+ u_eval = self.calculate_u_eval(max_u)
+ values = np.array(splev(u_eval, tck=self.tck, der=der)).T
+
+ return values
+
+
+class NullSplineEvaluator(SplineEvaluator):
+ """
+ A dummy spline evaluator used for when an empty list is attempted to be fitted
+ """
+
+ def predict(self, der: int, max_u: Optional[float] = None) -> np.ndarray:
+ points = np.zeros((0, 2))
+ return points
+
+
+class SplineFitterFactory:
+ """
+ Wrapper class for `splev`, `splprep` functions
+ """
+
+ def __init__(self, smoothing: float, predict_every: float, max_deg: int):
+ """
+ Constructor for SplineFitter class
+ Args:
+ smoothing (float): The smoothing factor. 0 means no smoothing
+ predict_every (float): The approximate distance along the fitted trace to
+ calculate a point for
+ max_deg (int): The maximum degree of the fitted splines
+ """
+ self.smoothing = smoothing
+ self.predict_every = predict_every
+ self.max_deg = max_deg
+
+ def fit(self, trace: np.ndarray, periodic: bool = False) -> SplineEvaluator:
+ """
+ Fit a trace and returns a closure that can evaluate the fitted spline at
+ different positions. The maximal spline degree is 2.
+ Args:
+ trace (np.ndarray): The trace to fit
+ Returns:
+ Callable[[int, float]: A closure that when called evalues
+ the fitted spline on the provided positions.
+ """
+ if len(trace) < 2:
+ return NullSplineEvaluator(
+ # dummy values
+ 0,
+ (0, 0, 0),
+ 0,
+ )
+ k = np.clip(len(trace) - 1, 1, self.max_deg)
+ distance_to_next = np.linalg.norm(np.diff(trace, axis=0), axis=1)
+ u_fit = np.concatenate(([0], np.cumsum(distance_to_next)))
+ try:
+ tck, _ = splprep( # pylint: disable=unbalanced-tuple-unpacking
+ trace.T, s=self.smoothing, k=k, u=u_fit, per=periodic
+ )
+ except ValueError:
+ with np.printoptions(threshold=100000):
+ print(self.smoothing, self.predict_every, self.max_deg, repr(trace))
+
+ raise
+
+ max_u = float(u_fit[-1])
+
+ return SplineEvaluator(max_u, tck, self.predict_every)
+
+ def fit_then_evaluate_trace_and_derivative(
+ self, trace: np.ndarray
+ ) -> Tuple[np.ndarray, np.ndarray]:
+ """
+ Fit a provided trace, then evaluates it, and its derivative in `n_predict`
+ evenly spaced positions
+ Args:
+ trace (np.ndarray): The trace to fit
+ Returns:
+ Tuple[np.ndarray, np.ndarray]: A tuple containing the evaluated trace and
+ the evaluated derivative
+ """
+ if len(trace) < 2:
+ return trace.copy(), trace.copy()
+
+ fitted_func = self.fit(trace)
+
+ evaluated_trace = fitted_func.predict(der=0)
+ evaluated_derivative = fitted_func.predict(der=1)
+
+ return (evaluated_trace,)
diff --git a/drawing_to_track.ipynb b/drawing_to_track.ipynb
new file mode 100755
index 0000000..12157ec
--- /dev/null
+++ b/drawing_to_track.ipynb
@@ -0,0 +1,924 @@
+{
+ "cells": [
+ {
+ "cell_type": "code",
+ "execution_count": 1,
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "from itertools import count\n",
+ "from typing import Iterable\n",
+ "\n",
+ "import matplotlib.pyplot as plt\n",
+ "import numpy as np\n",
+ "import thinning\n",
+ "from drawing_to_fsd_layout.math_utils import (trace_distance_to_next,\n",
+ " unit_2d_vector_from_angle)\n",
+ "from drawing_to_fsd_layout.math_utils import normalize\n",
+ "from drawing_to_fsd_layout.math_utils import rotate as rotate_points\n",
+ "\n",
+ "from chabo_common.utils.spline_fit import SplineFitterFactory\n",
+ "from chabo_simulation.lfs.lyt_interface.io.write_lyt import write_traces_as_lyt\n",
+ "from scipy.sparse.csgraph import connected_components\n",
+ "from chabo_common.utils.math_utils import my_cdist_sq_euclidean\n",
+ "from skimage import io\n",
+ "from skimage.exposure import rescale_intensity\n",
+ "from skimage.feature import canny\n",
+ "from skimage.filters import gaussian\n",
+ "from skimage.transform import rescale\n",
+ "from skimage.transform import rotate as rotate_image\n",
+ "from sklearn.metrics.pairwise import pairwise_distances\n"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 119,
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "# !python -m pip install --user --upgrade pip"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "# Load image and basic transforms\n",
+ "\n",
+ "- Change the **path_to_image** variable to the path to the image you want to load.\n",
+ "- The image is loaded as grayscale.\n",
+ "- The image is resized to a (smaller) size to aid with computational speed.\n",
+ "- The image contrast is boosted to improve edge detection"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 6,
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "Rescaling image by factor 0.063\n"
+ ]
+ },
+ {
+ "data": {
+ "text/plain": [
+ ""
+ ]
+ },
+ "execution_count": 6,
+ "metadata": {},
+ "output_type": "execute_result"
+ },
+ {
+ "data": {
+ "image/png": "",
+ "text/plain": [
+ "