Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add possibility to save optimization output to file #18

Merged
merged 7 commits into from
Jul 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci_cd.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ jobs:
- name: Dependencies
shell: bash -l {0}
run: |
mamba install python=${{ matrix.python }} casadi pytest liecasadi adam-robotics idyntree meshcat-python ffmpeg-python matplotlib resolve-robotics-uri-py git
mamba install python=${{ matrix.python }} casadi pytest liecasadi adam-robotics idyntree meshcat-python ffmpeg-python matplotlib resolve-robotics-uri-py git hdf5storage
mamba list

- name: Install
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ hippopt is an open-source framework for generating whole-body trajectories for l
## Installation
It is suggested to use [``mamba``](https://github.com/conda-forge/miniforge).
```bash
conda install -c conda-forge -c robotology python=3.11 casadi pytest liecasadi adam-robotics idyntree meshcat-python ffmpeg-python matplotlib resolve-robotics-uri-py
conda install -c conda-forge -c robotology python=3.11 casadi pytest liecasadi adam-robotics idyntree meshcat-python ffmpeg-python matplotlib resolve-robotics-uri-py hdf5storage
pip install --no-deps -e .[all]
```

Expand Down
1 change: 1 addition & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ robot_planning=
turnkey_planners=
idyntree
resolve-robotics-uri-py
hdf5storage
visualization=
ffmpeg-python
idyntree
Expand Down
119 changes: 97 additions & 22 deletions src/hippopt/base/optimization_object.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,25 +68,44 @@ def _scan(
input_dict: dict | None = None,
output_filter: Callable[[str, Any, dict], bool] | None = None,
input_conversion: Callable[[str, Any], Any] | None = None,
) -> (dict, dict):
output_conversion: Callable[[str, Any], Any] | None = None,
output_flat: bool = True,
) -> tuple[dict, dict] | tuple[list, list]:
output_dict = {}
metadata_dict = {}
if isinstance(input_object, list):
assert all(
if not all(
isinstance(elem, OptimizationObject) or isinstance(elem, list)
for elem in input_object
)
):
raise ValueError(
"The input object is a list, but not all elements are"
" OptimizationObject instances."
)
output_list = []
output_metadata_list = []
if not output_flat and name_prefix != "":
output_dict[name_prefix] = output_list
metadata_dict[name_prefix] = output_metadata_list

for i, elem in enumerate(input_object):
inner_dict, inner_metadata = OptimizationObject._scan(
input_object=elem,
name_prefix=name_prefix + f"[{str(i)}].",
name_prefix=name_prefix + f"[{str(i)}]." if output_flat else "",
parent_metadata=parent_metadata,
input_dict=input_dict,
output_filter=output_filter,
input_conversion=input_conversion,
output_conversion=output_conversion,
output_flat=output_flat,
)
output_dict.update(inner_dict)
output_list.append(inner_dict)
metadata_dict.update(inner_metadata)
output_metadata_list.append(inner_metadata)

if not output_flat and name_prefix == "":
return output_list, output_metadata_list
return output_dict, metadata_dict

assert isinstance(input_object, OptimizationObject)
Expand Down Expand Up @@ -131,14 +150,24 @@ def _scan(
separator = "" if list_of_optimization_objects else "."
inner_dict, inner_metadata = OptimizationObject._scan(
input_object=composite_value,
name_prefix=name_prefix + field.name + separator,
name_prefix=(
name_prefix + field.name + separator if output_flat else ""
),
parent_metadata=new_parent_metadata,
input_dict=input_dict,
output_filter=output_filter,
input_conversion=input_conversion,
output_conversion=output_conversion,
output_flat=output_flat,
)
output_dict.update(inner_dict)
metadata_dict.update(inner_metadata)

if output_flat:
output_dict.update(inner_dict)
metadata_dict.update(inner_metadata)
else:
output_dict[field.name] = inner_dict
metadata_dict[field.name] = inner_metadata

continue

if OptimizationObject.StorageTypeField in field.metadata:
Expand All @@ -157,15 +186,20 @@ def _scan(
parent_metadata[OptimizationObject.StorageTypeField]
)

composite_value = OptimizationObject._convert_to_np_array(
composite_value_edited = OptimizationObject._convert_to_np_array(
composite_value
)
value_is_list = isinstance(composite_value, list)
value_is_list = isinstance(composite_value_edited, list)
value_list = composite_value if value_is_list else [composite_value]
name_radix = name_prefix + field.name
name_radix = name_prefix + field.name if output_flat else field.name
value_from_dict = []

if not output_flat and value_is_list:
output_dict[field.name] = []
metadata_dict[field.name] = []

for i, val in enumerate(value_list):
postfix = f"[{i}]" if value_is_list else ""
postfix = f"[{i}]" if value_is_list and output_flat else ""
full_name = name_radix + postfix

if input_dict is not None and full_name in input_dict:
Expand All @@ -177,17 +211,27 @@ def _scan(
value_from_dict.append(converted_input)

output_value = (
OptimizationObject._convert_to_np_array(composite_value[i])
if value_is_list
else composite_value
composite_value[i] if value_is_list else composite_value
)

output_value = (
output_conversion(full_name, output_value)
if output_conversion is not None
else output_value
)

output_value = OptimizationObject._convert_to_np_array(output_value)

if output_filter is not None:
if not output_filter(full_name, output_value, value_metadata):
continue

metadata_dict[full_name] = value_metadata
output_dict[full_name] = output_value
if not output_flat and value_is_list:
output_dict[full_name].append(output_value)
metadata_dict[full_name].append(value_metadata)
else:
output_dict[full_name] = output_value
metadata_dict[full_name] = value_metadata

if len(value_from_dict) > 0:
input_object.__setattr__(
Expand All @@ -197,25 +241,42 @@ def _scan(

continue

if not output_flat and name_prefix != "":
nested_output = {name_prefix: output_dict}
nested_metadata = {name_prefix: metadata_dict}
return nested_output, nested_metadata

return output_dict, metadata_dict

def to_dict(
self,
prefix: str = "",
output_filter: Callable[[str, Any, dict], bool] | None = None,
output_conversion: Callable[[str, Any], Any] | None = None,
flatten: bool = True,
) -> dict:
output_dict, _ = OptimizationObject._scan(
input_object=self, name_prefix=prefix, output_filter=output_filter
input_object=self,
name_prefix=prefix,
output_filter=output_filter,
output_conversion=output_conversion,
output_flat=flatten,
)
return output_dict

def to_dicts(
self,
prefix: str = "",
output_filter: Callable[[str, Any, dict], bool] | None = None,
output_conversion: Callable[[str, Any], Any] | None = None,
flatten: bool = True,
) -> (dict, dict):
output_dict, metadata_dict = OptimizationObject._scan(
input_object=self, name_prefix=prefix, output_filter=output_filter
input_object=self,
name_prefix=prefix,
output_filter=output_filter,
output_conversion=output_conversion,
output_flat=flatten,
)
return output_dict, metadata_dict

Expand All @@ -232,16 +293,30 @@ def from_dict(
input_conversion=input_conversion,
)

def to_list(self) -> list:
def to_list(
self,
output_filter: Callable[[str, Any, dict], bool] | None = None,
output_conversion: Callable[[str, Any], Any] | None = None,
) -> list:
output_list = []
as_dict = self.to_dict()
as_dict = self.to_dict(
output_filter=output_filter, output_conversion=output_conversion
)
for key in sorted(as_dict.keys()):
output_list.append(as_dict[key])

return output_list

def to_mx(self) -> cs.MX:
return cs.vertcat(*self.to_list())
def to_mx(
self,
output_filter: Callable[[str, Any, dict], bool] | None = None,
output_conversion: Callable[[str, Any], Any] | None = None,
) -> cs.MX:
return cs.vertcat(
*self.to_list(
output_filter=output_filter, output_conversion=output_conversion
)
)

@classmethod
def default_storage_metadata(cls, **kwargs) -> dict:
Expand Down
23 changes: 23 additions & 0 deletions src/hippopt/base/problem.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,29 @@ def __post_init__(
self.cost_values = _cost_values
self.constraint_multipliers = _constraint_multipliers

def to_dict(self) -> dict:
def set_nested_value(d, input_key, value):
keys = input_key.split(".")
assert all(isinstance(k, str) and len(k) > 0 for k in keys)
for key in keys[:-1]:
d = d.setdefault(key, {})
d[keys[-1]] = value

def flatten_to_nested_dict(flat_dict):
nested_dict = {}
for key, value in flat_dict.items():
set_nested_value(nested_dict, key, value)
return nested_dict

return {
"values": self.values.to_dict(flatten=False),
"cost_value": self.cost_value,
"cost_values": flatten_to_nested_dict(self.cost_values),
"constraint_multipliers": flatten_to_nested_dict(
self.constraint_multipliers
),
}


@dataclasses.dataclass
class Problem(abc.ABC, Generic[TGenericSolver, TInputObjects]):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@
frames/*
*.png
*.mp4
*.mat
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import logging

import casadi as cs
import hdf5storage
import idyntree.bindings as idyntree
import liecasadi
import numpy as np
Expand Down Expand Up @@ -496,3 +497,17 @@ def get_references(
save=True,
file_name_stem="humanoid_walking_periodic",
)

print("Saving data to humanoid_walking_periodic.mat")

humanoid_walking_periodic = {
"output": output.to_dict(),
"guess": planner_guess.to_dict(
flatten=False, output_conversion=hippopt.OptimizationObject.DMConversion
),
}
hdf5storage.savemat(
file_name="humanoid_walking_periodic.mat",
mdict=humanoid_walking_periodic,
truncate_existing=True,
)
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import math

import casadi as cs
import hdf5storage
import idyntree.bindings as idyntree
import liecasadi
import numpy as np
Expand Down Expand Up @@ -603,6 +604,20 @@ def get_references(
file_name_stem="humanoid_single_jump_flat",
)

print("Saving data to humanoid_single_jump_flat.mat")

humanoid_single_jump_flat = {
"output": output.to_dict(),
"guess": planner_guess.to_dict(
flatten=False, output_conversion=hippopt.OptimizationObject.DMConversion
),
}
hdf5storage.savemat(
file_name="humanoid_single_jump_flat.mat",
mdict=humanoid_single_jump_flat,
truncate_existing=True,
)

plotter_settings = hp_rp.FootContactStatePlotterSettings()
plotter_settings.terrain = planner_settings.terrain
left_foot_plotter = hp_rp.FootContactStatePlotter(plotter_settings)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import logging

import casadi as cs
import hdf5storage
import idyntree.bindings as idyntree
import liecasadi
import numpy as np
Expand Down Expand Up @@ -592,6 +593,20 @@ def get_references(
file_name_stem="humanoid_walking_ramp",
)

print("Saving data to humanoid_walking_ramp.mat")

humanoid_walking_ramp = {
"output": output.to_dict(),
"guess": planner_guess.to_dict(
flatten=False, output_conversion=hippopt.OptimizationObject.DMConversion
),
}
hdf5storage.savemat(
file_name="humanoid_walking_ramp.mat",
mdict=humanoid_walking_ramp,
truncate_existing=True,
)

plotter_settings = hp_rp.FootContactStatePlotterSettings()
plotter_settings.terrain = planner_settings.terrain
left_foot_plotter = hp_rp.FootContactStatePlotter(plotter_settings)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import logging

import casadi as cs
import hdf5storage
import idyntree.bindings as idyntree
import liecasadi
import numpy as np
Expand Down Expand Up @@ -584,6 +585,21 @@ def get_references(
file_name_stem="humanoid_walking_step",
)

print("Saving data to humanoid_walking_step.mat")

humanoid_walking_step = {
"output": output.to_dict(),
"guess": planner_guess.to_dict(
flatten=False,
output_conversion=hippopt.OptimizationObject.DMConversion,
),
}
hdf5storage.savemat(
file_name="humanoid_walking_step.mat",
mdict=humanoid_walking_step,
truncate_existing=True,
)

plotter_settings = hp_rp.FootContactStatePlotterSettings()
plotter_settings.terrain = planner_settings.terrain
left_foot_plotter = hp_rp.FootContactStatePlotter(plotter_settings)
Expand Down
Loading
Loading