Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extended Kalman Filter #96

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions mbodied/EKF/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from .extended_kalman_filter import ExtendedKalmanFilter

__all__ = ["ExtendedKalmanFilter"]
205 changes: 205 additions & 0 deletions mbodied/EKF/extended_kalman_filter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
import numpy as np

class ExtendedKalmanFilter:
def __init__(
self,
state_dim,
control_dim,
observation_dim,
initial_state=None,
initial_covariance=None,
process_noise_cov=None,
measurement_noise_cov=None,
uncertainty_percentage=0.1,
is_linear_f=False,
is_linear_h=False
):
"""
For reference, see: https://gaoyichao.com/Xiaotu/resource/refs/PR.MIT.en.pdf chapter 3.3

Initializes the EKF with dimensions for the state, control input, and observation vectors.

Args:
- state_dim (int): Dimension of the state vector.
- control_dim (int): Dimension of the control vector.
- observation_dim (int): Dimension of the observation vector.
- initial_state (np.ndarray or None): Optional initial state vector.
- initial_covariance (np.ndarray or None): Optional initial covariance matrix.
- uncertainty_percentage (float): Percentage for initial state uncertainty.
"""
self.state_dim = state_dim
self.control_dim = control_dim
self.observation_dim = observation_dim
self.is_linear_f = is_linear_f
self.is_linear_h = is_linear_h

if initial_state is not None:
self.x = self.to_column_vector(initial_state)
else:
self.x = np.zeros((state_dim, 1))

if initial_covariance is not None:
self.P = initial_covariance
else:
if np.all(self.x == 0):
self.P = np.eye(state_dim)
else:
variances = np.array([uncertainty_percentage * abs(value) for value in self.x.flatten()])
self.P = np.diag(variances)

self.Q = process_noise_cov if process_noise_cov is not None else np.eye(state_dim) * 0.5
self.R = measurement_noise_cov if measurement_noise_cov is not None else np.eye(observation_dim) * 0.8

self.F = np.eye(state_dim)
self.B = np.zeros((state_dim, control_dim))
self.H = np.eye(observation_dim, state_dim)

def predict(self, u):
"""
Prediction step of the EKF.

Args:
- u (np.array): Control input vector.
"""
u = self.to_column_vector(u)
self.x = self.f(self.x, u) # ¯μt = g(ut, μt−1)

F_jacobian = self.jacobian_f(self.x, u) # Gt

self.P = F_jacobian @ self.P @ F_jacobian.T + self.Q # ¯Σt = Gt Σt−1 GTt + Rt

def update(self, z):
"""
Update step of the EKF based on observation z.

Args:
- z (np.array): Observation vector.
"""
z = self.to_column_vector(z)
z_pred = self.h(self.x) # h(¯μt)

H_jacobian = self.jacobian_h(self.x) # Ht

y = z - z_pred # zt - h(¯μt)

S = H_jacobian @ self.P @ H_jacobian.T + self.R # Ht ¯Σt HTt + Qt

K = self.P @ H_jacobian.T @ np.linalg.inv(S) # Kt

self.x = self.x + K @ y # μt = ¯μt + Kt(zt − h(¯μt))

self.P = (np.eye(self.state_dim) - K @ H_jacobian) @ self.P # Σt = (I − Kt Ht) ¯Σt

def f(self, x, u):
"""
Non-linear state transition function (robot motion).

Args:
- x (np.array): Current state vector.
- u (np.array): Control input vector.

Returns:
- (np.array): Predicted next state.
"""
return self.F @ x + self.B @ u

def jacobian_f(self, x, u, epsilon=1e-5):
"""
Compute the Jacobian of the state transition function f with respect to the state x.

Args:
- x (np.array): Current state vector.
- u (np.array): Control input vector.

Returns:
- (np.array): Jacobian matrix of f with respect to x.
"""
if self.is_linear_f:
return self.F

state_dim = x.shape[0]
F_jacobian = np.zeros((state_dim, state_dim))

for i in range(state_dim):
x_perturbed = np.copy(x)
x_perturbed[i] += epsilon

F_jacobian[:, i] = (self.f(x_perturbed, u) - self.f(x, u)).flatten() / epsilon

return F_jacobian

def h(self, x):
"""
Non-linear observation function. To be defined by the specific system model.

Args:
- x (np.array): Current state vector.

Returns:
- (np.array): Predicted observation.
"""
return self.H @ x

def jacobian_h(self, x, epsilon=1e-5):
"""
Compute the Jacobian of the observation function h with respect to the state x.

Args:
- x (np.array): Current state vector.

Returns:
- (np.array): Jacobian matrix of h with respect to x.
"""
if self.is_linear_h:
return self.H

observation_dim = self.h(x).shape[0]
state_dim = x.shape[0]
H_jacobian = np.zeros((observation_dim, state_dim))

for i in range(state_dim):
x_perturbed = np.copy(x)
x_perturbed[i] += epsilon

H_jacobian[:, i] = (self.h(x_perturbed) - self.h(x)).flatten() / epsilon

return H_jacobian

def get_state(self):
"""
Returns the current state estimate.

Returns:
np.ndarray: The current state estimate vector.
"""
return self.x

def get_covariance(self):
"""
Returns the current covariance matrix.

Returns:
np.ndarray: The current state covariance matrix.
"""
return self.P

def get_measurement_prediction(self):
"""
Computes the predicted measurement based on the current state estimate.

Returns:
np.ndarray: The predicted measurement vector.
"""
return self.h(self.x)

def to_column_vector(self, v):
"""
Converts a vector to a column vector if it isn't already.

Args:
- v (np.array): Input vector.

Returns:
- (np.array): Column vector.
# """
return v.reshape(-1, 1) if v.ndim == 1 else v
85 changes: 85 additions & 0 deletions mbodied/EKF/tests/test_ekf.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
from mbodied.EKF.extended_kalman_filter import ExtendedKalmanFilter
from mbodied.EKF.world import world_to_vector
from mbodied.types.sense.world import World, WorldObject, BBox2D, BBox3D, Pose, PixelCoords
from mbodied.types.sense.vision import Image
from mbodied.types.motion.control import HandControl
from mbodied.EKF.trajectory import HandControlVector
import numpy as np

def test_ekf():
world = World(
image=Image(path="resources/color_image.png"),
objects=[
WorldObject(
name="box",
bbox_2d=BBox2D(10, 20, 50, 60),
bbox_3d=BBox3D(0, 0, 0, 1, 1, 1),
pose=Pose(x=1.0, y=2.0, z=3.0, roll=0.1, pitch=0.2, yaw=0.3),
pixel_coords=PixelCoords(100, 150)
),
WorldObject(
name="sphere",
bbox_2d=BBox2D(15, 25, 55, 65),
bbox_3d=BBox3D(1, 1, 1, 2, 2, 2),
pose=Pose(x=4.0, y=5.0, z=6.0, roll=0.4, pitch=0.5, yaw=0.6),
pixel_coords=PixelCoords(110, 160)
)
]
)

hand = HandControl.unflatten([20, 30, 10, 40, 50, 70, 1])

world_vector = world_to_vector(world)
motion_vector = HandControlVector(hand).to_vector()
state_vector = np.concatenate([world_vector, motion_vector])

state_dim = state_vector.size
control_dim = motion_vector.size
observation_dim = state_vector.size
Q = np.eye(state_dim) * 0.1
R = np.eye(observation_dim) * 0.5

ekf = ExtendedKalmanFilter(state_dim, control_dim, observation_dim, initial_state=state_vector, process_noise_cov=Q, measurement_noise_cov=R)

num_iterations = 10
innovations = []
state_estimates = []
robot_poses = []

for i in range(num_iterations):
print(f"\n--- Iteration {i + 1} ---")

control_input = np.random.randint(1, 100, size=7)

ekf.predict(control_input)

predicted_state = ekf.get_state().flatten()
print(f"Predicted State: {predicted_state}")

noise = np.random.normal(0, 0.1, observation_dim)
simulated_observation = predicted_state + noise
print(f"Simulated Observation: {simulated_observation}")

ekf.update(simulated_observation)

updated_state = ekf.get_state().flatten()
print(f"Updated State Estimate: {updated_state}")

predicted_measurement = ekf.get_measurement_prediction()
innovation = simulated_observation - predicted_measurement
innovations.append(innovation)
print(f"Innovation: {innovation}")

state_estimates.append(updated_state)

robot_pose = {
"position": updated_state[-7:-4],
"orientation": updated_state[-4:-1]
}
robot_poses.append(robot_pose)

print(f"Robot POSE: {robot_poses}")

return state_estimates, innovations

test_ekf()
60 changes: 60 additions & 0 deletions mbodied/EKF/tests/test_tree.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import unittest
from unittest import mock
from typing import List
from mbodied.EKF.tree_of_thought import TreeOfThought
from mbodied.agents.language import LanguageAgent

@mock.patch(
"mbodied.agents.language.language_agent.LanguageAgent.act",
side_effect = [
'["move hand to the right", "close hand"]',
'["move hand forward", "open hand"]',
'["rotate wrist clockwise", "close hand"]',
'["move hand up", "open hand"]',
'["move hand down", "close hand"]',
'["slide hand left", "hold position"]',
'["move hand diagonally right-up", "open hand"]',
'["move hand diagonally left-down", "close hand"]',
'["shake hand", "open hand"]',
'["tap surface lightly", "open hand"]',
'["clench fist", "hold position"]',
'["move hand back", "close hand"]',
'["wave hand", "open hand"]',
'["stretch fingers", "hold position"]',
'["move hand in a circle", "close hand"]',
None
]
)
def test_tree_of_thought(mock_act):

language_agent = mock.Mock()

language_agent.act.side_effect = [
'["move hand to the right", "close hand"]',
'["move hand forward", "open hand"]',
]

def parse_json_output(response: str) -> List[str]:
return eval(response)

language_agent = LanguageAgent()

tot = TreeOfThought(language_agent, depth=2)

root = tot.generate_thoughts("What should the robot do next?", parse_function=parse_json_output)

assert root.state == "move hand to the right", f"Expected root state to be 'move hand to the right', got {root.state}"

assert len(root.children) == 1, f"Expected root to have 2 children, got {len(root.children)}"

first_child = root.children[0]
assert first_child.state == "close hand", f"Expected first child state to be 'close hand', got {first_child.state}"

nested_child = root.children[0].children[0]
assert nested_child.state == "move hand forward", f"Expected second child state to be 'move hand forward', got {nested_child.state}"

tot.traverse_tree(root)


if __name__ == "__main__":
unittest.main()
25 changes: 25 additions & 0 deletions mbodied/EKF/trajectory/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
from .motion_vector import (
LocationAngleVector,
PoseVector,
JointControlVector,
FullJointControlVector,
HandControlVector,
HeadControlVector,
MobileSingleHandControlVector,
MobileSingleArmControlVector,
MobileBimanualArmControlVector,
HumanoidControlVector
)

__all__ = [
"LocationAngleVector",
"PoseVector",
"JointControlVector",
"FullJointControlVector",
"HandControlVector",
"HeadControlVector",
"MobileSingleHandControlVector",
"MobileSingleArmControlVector",
"MobileBimanualArmControlVector",
"HumanoidControlVector"
]
Loading
Loading