Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kalman filter #198

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions field_friend/interface/components/operation.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ def __init__(self, system: 'System') -> None:
self.key_controls = KeyControls(self.system)

with ui.row().classes('w-full').style('min-height: 100%; width: 55%;'):
self.system.robot_locator.ui()
with ui.row().classes('m-4').style('width: calc(100% - 2rem)'):
with ui.column().classes('w-full'):
activities = ui.row().classes('items-center')
Expand Down
2 changes: 1 addition & 1 deletion field_friend/localization/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
from .gnss import Gnss, GNSSRecord
from .gnss_hardware import GnssHardware
from .gnss_simulation import GnssSimulation

from .robot_locator import RobotLocator

reference: GeoPoint = GeoPoint(lat=0, long=0)
4 changes: 4 additions & 0 deletions field_friend/localization/gnss.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ def __init__(self, odometer: rosys.driving.Odometer, antenna_offset: float) -> N
self.ROBOT_GNSS_POSITION_CHANGED = rosys.event.Event()
"""the robot has been located (argument: GeoPoint), may only be a rough estimate if no RTK fix is available"""

self.NEW_GNSS_RECORD = rosys.event.Event()
"""a new GNSS record is available (argument: GNSSRecord)"""

self.RTK_FIX_LOST = rosys.event.Event()
"""the robot lost RTK fix"""

Expand Down Expand Up @@ -76,6 +79,7 @@ async def check_gnss(self) -> None:
previous = deepcopy(self.current)
try:
self.current = await self._create_new_record()
self.NEW_GNSS_RECORD.emit(self.current)
except Exception:
self.log.exception('creation of gnss record failed')
if previous is not None:
Expand Down
237 changes: 237 additions & 0 deletions field_friend/localization/robot_locator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,237 @@
from typing import Any

import numpy as np
import rosys
from nicegui import ui

from .gnss import Gnss, GNSSRecord
from .point_transformation import get_new_position


class RobotLocator(rosys.persistence.PersistentModule):
def __init__(self, wheels: rosys.hardware.Wheels, gnss: Gnss):
""" Robot Locator based on an extended Kalman filter.

### State

```py
x = [
x, # x position
y, # y position
theta, # orientation
v, # linear velocity
omega, # angular velocity
a # linear acceleration
]
```

### Process Model

```py
x = x + v * cos(theta) * dt
y = y + v * sin(theta) * dt
theta = theta + omega * dt
v = v + a * dt
omega = omega
a = a

F = [
[1, 0, -v * sin(theta) * dt, cos(theta) * dt, 0, 0],
[0, 1, v * cos(theta) * dt, sin(theta) * dt, 0, 0],
[0, 0, 1, 0, dt, 0],
[0, 0, 0, 1, 0, dt],
[0, 0, 0, 0, 1, 0],
[0, 0, 0, 0, 0, 1],
]
```

### Odometry Measurement Model

```py
z = [
v,
omega,
]

H = [
[0, 0, 0, 1, 0, 0],
[0, 0, 0, 0, 1, 0],
]
```

### GNSS Measurement Model

```py
z = [
x,
y,
theta,
]

H = [
[1, 0, 0, 0, 0, 0],
[0, 1, 0, 0, 0, 0],
[0, 0, 1, 0, 0, 0],
]
```
"""
super().__init__(persistence_key="field_friend.robot_locator")
self.x = np.zeros((6, 1))
self.Sxx = np.zeros((6, 6))
self.t = rosys.time()

self.ignore_odometry = False
self.ignore_gnss = False

self.q_odometry_v = 0.01
self.q_odometry_omega = 0.01

self.r_x = 0.01
self.r_y = 0.01
self.r_theta = 0.01
self.r_v = 0.01
self.r_omega = 1.00
self.r_a = 1.00

self.gnss = gnss

wheels.VELOCITY_MEASURED.register(self.handle_velocity_measurement)
gnss.NEW_GNSS_RECORD.register(self.handle_gnss_measurement)

def backup(self) -> dict[str, Any]:
return {
'q_odometry_v': self.q_odometry_v,
'q_odometry_omega': self.q_odometry_omega,
'r_x': self.r_x,
'r_y': self.r_y,
'r_theta': self.r_theta,
'r_v': self.r_v,
'r_omega': self.r_omega,
'r_a': self.r_a,
}

def restore(self, data: dict[str, Any]) -> None:
self.q_odometry_v = data.get('q_odometry_v', 0.01)
self.q_odometry_omega = data.get('q_odometry_omega', 0.01)
self.r_x = data.get('r_x', 0.01)
self.r_y = data.get('r_y', 0.01)
self.r_theta = data.get('r_theta', 0.01)
self.r_v = data.get('r_v', 0.01)
self.r_omega = data.get('r_omega', 1.00)
self.r_a = data.get('r_a', 1.00)

@property
def pose(self) -> rosys.geometry.Pose:
return rosys.geometry.Pose(
x=self.x[0, 0],
y=self.x[1, 0],
yaw=self.x[2, 0],
)

@property
def velocity(self) -> rosys.geometry.Velocity:
return rosys.geometry.Velocity(
linear=self.x[3, 0],
angular=self.x[4, 0],
time=self.t,
)

def handle_velocity_measurement(self, velocities: list[rosys.geometry.Velocity]) -> None:
self.predict()
if not self.ignore_odometry:
for velocity in velocities:
self.update(
z=np.array([[velocity.linear, velocity.angular]]).T,
h=np.array([[self.x[3, 0], self.x[4, 0]]]).T,
H=np.array([
[0, 0, 0, 1, 0, 0],
[0, 0, 0, 0, 1, 0],
]),
Q=np.diag([self.q_odometry_v, self.q_odometry_omega])**2,
)

def handle_gnss_measurement(self, record: GNSSRecord) -> None:
self.predict()
if not self.ignore_gnss:
yaw = np.deg2rad(-record.heading)
geo_point = get_new_position(record.location, self.gnss.antenna_offset, yaw+np.pi/2)
cartesian_coords = geo_point.cartesian()
pose = rosys.geometry.Pose(
x=cartesian_coords.x,
y=cartesian_coords.y,
yaw=self.x[2, 0] + rosys.helpers.angle(self.x[2, 0], yaw),
)
z = [[pose.x], [pose.y], [pose.yaw]]
h = [[self.x[0, 0]], [self.x[1, 0]], [self.x[2, 0]]]
H = [
[1, 0, 0, 0, 0, 0],
[0, 1, 0, 0, 0, 0],
[0, 0, 1, 0, 0, 0],
]
# TODO: switch gps_qual for accuracy
r_xy = 4 - record.gps_qual
r_theta = 4 - record.gps_qual
# r_xy = (record.latitude_accuracy + record.longitude_accuracy) / 2
# r_theta = np.deg2rad(record.heading_accuracy)
Q = np.diag([r_xy, r_xy, r_theta])**2
self.update(z=np.array(z), h=np.array(h), H=np.array(H), Q=Q)

def predict(self) -> None:
dt = rosys.time() - self.t
self.t = rosys.time()

theta = self.x[2, 0]
v = self.x[3, 0]
omega = self.x[4, 0]
a = self.x[5, 0]

F = np.array([
[1, 0, -v * np.sin(theta) * dt, np.cos(theta) * dt, 0, 0],
[0, 1, v * np.cos(theta) * dt, np.sin(theta) * dt, 0, 0],
[0, 0, 1, 0, dt, 0],
[0, 0, 0, 1, 0, dt],
[0, 0, 0, 0, 1, 0],
[0, 0, 0, 0, 0, 1],
])

R = (np.diag([self.r_x, self.r_y, self.r_theta, self.r_v, self.r_omega, self.r_a]) * dt)**2

self.x[0] += v * np.cos(theta) * dt
self.x[1] += v * np.sin(theta) * dt
self.x[2] += omega * dt
self.x[3] += a * dt
self.Sxx = F @ self.Sxx @ F.T + R

def update(self, *, z: np.ndarray, h: np.ndarray, H: np.ndarray, Q: np.ndarray) -> None:
K = self.Sxx @ H.T @ np.linalg.inv(H @ self.Sxx @ H.T + Q)
self.x = self.x + K @ (z - h)
self.Sxx = (np.eye(self.Sxx.shape[0]) - K @ H) @ self.Sxx

def reset(self, *, x: float = 0.0, y: float = 0.0, yaw: float = 0.0) -> None:
self.x = np.array([[x, y, yaw, 0, 0, 0]], dtype=float).T
self.Sxx = np.diag([0.01, 0.01, 0.01, 0.01, 0.01, 0.01])**2

def ui(self) -> None:
with ui.column():
ui.label('Kalman Filter Settings').classes('text-bold text-xl')
with ui.grid(columns=2).classes('w-full'):
ui.checkbox('Ignore Odometry').props('dense color=red').classes('col-span-2') \
.bind_value(self, 'ignore_odometry')
ui.checkbox('Ignore GNSS').props('dense color=red').classes('col-span-2') \
.bind_value(self, 'ignore_gnss')
for key, label in {
'q_odometry_v': 'Q Odometry v',
'q_odometry_omega': 'Q Odometry ω',
'r_x': 'R x',
'r_y': 'R y',
'r_theta': 'R θ',
'r_v': 'R v',
'r_omega': 'R ω',
'r_a': 'R a',
}.items():
with ui.column().classes('w-full gap-0'):
ui.label(label)
ui.slider(min=0, max=1, step=0.01, on_change=self.request_backup) \
.bind_value(self, key).props('label')
ui.label().bind_text_from(self, 'pose', lambda p: f'Pose: x={p.x:.2f} y={p.y:.2f} yaw={p.yaw:.2f}')
ui.label().bind_text_from(self, 'velocity', lambda v: f'Velocity: lin.={v.linear:.2f} ang.={v.angular:.2f}')
2 changes: 2 additions & 0 deletions field_friend/log_configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import sys
from pathlib import Path

import coloredlogs
import icecream

project = 'field_friend'
Expand Down Expand Up @@ -35,6 +36,7 @@ def configure():
'disable_existing_loggers': True,
'formatters': {
'default': {
'()': coloredlogs.ColoredFormatter,
'format': '%(asctime)s.%(msecs)03d [%(levelname)s] %(relativepath)s:%(lineno)d: %(message)s',
'datefmt': '%Y-%m-%d %H:%M:%S',
},
Expand Down
4 changes: 3 additions & 1 deletion field_friend/system.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
)
from .interface.components.info import Info
from .kpi_generator import generate_kpis
from .localization import RobotLocator
from .localization.geo_point import GeoPoint
from .localization.gnss_hardware import GnssHardware
from .localization.gnss_simulation import GnssSimulation
Expand Down Expand Up @@ -96,8 +97,9 @@ def __init__(self) -> None:
else:
assert isinstance(self.field_friend.wheels, rosys.hardware.WheelsSimulation)
self.gnss = GnssSimulation(self.odometer, self.field_friend.wheels)
self.robot_locator = RobotLocator(self.field_friend.wheels, self.gnss)
self.gnss.ROBOT_POSE_LOCATED.register(self.odometer.handle_detection)
self.driver = rosys.driving.Driver(self.field_friend.wheels, self.odometer)
self.driver = rosys.driving.Driver(self.field_friend.wheels, self.robot_locator)
self.driver.parameters.linear_speed_limit = 0.3
self.driver.parameters.angular_speed_limit = 0.2
self.driver.parameters.can_drive_backwards = True
Expand Down
Loading