diff --git a/.github/workflows/ci_cd.yml b/.github/workflows/ci_cd.yml index 0c5ac5d..fc78b6e 100644 --- a/.github/workflows/ci_cd.yml +++ b/.github/workflows/ci_cd.yml @@ -37,7 +37,7 @@ jobs: - name: Dependencies shell: bash -l {0} run: | - mamba install python=${{ matrix.python }} casadi pytest liecasadi adam-robotics idyntree meshcat-python ffmpeg-python matplotlib resolve-robotics-uri-py git + mamba install python=${{ matrix.python }} casadi pytest liecasadi adam-robotics idyntree meshcat-python ffmpeg-python matplotlib resolve-robotics-uri-py git hdf5storage mamba list - name: Install diff --git a/README.md b/README.md index 5cf54c9..727dff9 100644 --- a/README.md +++ b/README.md @@ -18,7 +18,7 @@ hippopt is an open-source framework for generating whole-body trajectories for l ## Installation It is suggested to use [``mamba``](https://github.com/conda-forge/miniforge). ```bash -conda install -c conda-forge -c robotology python=3.11 casadi pytest liecasadi adam-robotics idyntree meshcat-python ffmpeg-python matplotlib resolve-robotics-uri-py +conda install -c conda-forge -c robotology python=3.11 casadi pytest liecasadi adam-robotics idyntree meshcat-python ffmpeg-python matplotlib resolve-robotics-uri-py hdf5storage pip install --no-deps -e .[all] ``` diff --git a/setup.cfg b/setup.cfg index 094d3d3..7ec8e3f 100644 --- a/setup.cfg +++ b/setup.cfg @@ -65,6 +65,7 @@ robot_planning= turnkey_planners= idyntree resolve-robotics-uri-py + hdf5storage visualization= ffmpeg-python idyntree diff --git a/src/hippopt/base/optimization_object.py b/src/hippopt/base/optimization_object.py index f7b07df..1cbff8f 100644 --- a/src/hippopt/base/optimization_object.py +++ b/src/hippopt/base/optimization_object.py @@ -68,25 +68,44 @@ def _scan( input_dict: dict | None = None, output_filter: Callable[[str, Any, dict], bool] | None = None, input_conversion: Callable[[str, Any], Any] | None = None, - ) -> (dict, dict): + output_conversion: Callable[[str, Any], Any] | None = None, + output_flat: bool = True, + ) -> tuple[dict, dict] | tuple[list, list]: output_dict = {} metadata_dict = {} if isinstance(input_object, list): - assert all( + if not all( isinstance(elem, OptimizationObject) or isinstance(elem, list) for elem in input_object - ) + ): + raise ValueError( + "The input object is a list, but not all elements are" + " OptimizationObject instances." + ) + output_list = [] + output_metadata_list = [] + if not output_flat and name_prefix != "": + output_dict[name_prefix] = output_list + metadata_dict[name_prefix] = output_metadata_list + for i, elem in enumerate(input_object): inner_dict, inner_metadata = OptimizationObject._scan( input_object=elem, - name_prefix=name_prefix + f"[{str(i)}].", + name_prefix=name_prefix + f"[{str(i)}]." if output_flat else "", parent_metadata=parent_metadata, input_dict=input_dict, output_filter=output_filter, input_conversion=input_conversion, + output_conversion=output_conversion, + output_flat=output_flat, ) output_dict.update(inner_dict) + output_list.append(inner_dict) metadata_dict.update(inner_metadata) + output_metadata_list.append(inner_metadata) + + if not output_flat and name_prefix == "": + return output_list, output_metadata_list return output_dict, metadata_dict assert isinstance(input_object, OptimizationObject) @@ -131,14 +150,24 @@ def _scan( separator = "" if list_of_optimization_objects else "." inner_dict, inner_metadata = OptimizationObject._scan( input_object=composite_value, - name_prefix=name_prefix + field.name + separator, + name_prefix=( + name_prefix + field.name + separator if output_flat else "" + ), parent_metadata=new_parent_metadata, input_dict=input_dict, output_filter=output_filter, input_conversion=input_conversion, + output_conversion=output_conversion, + output_flat=output_flat, ) - output_dict.update(inner_dict) - metadata_dict.update(inner_metadata) + + if output_flat: + output_dict.update(inner_dict) + metadata_dict.update(inner_metadata) + else: + output_dict[field.name] = inner_dict + metadata_dict[field.name] = inner_metadata + continue if OptimizationObject.StorageTypeField in field.metadata: @@ -157,15 +186,20 @@ def _scan( parent_metadata[OptimizationObject.StorageTypeField] ) - composite_value = OptimizationObject._convert_to_np_array( + composite_value_edited = OptimizationObject._convert_to_np_array( composite_value ) - value_is_list = isinstance(composite_value, list) + value_is_list = isinstance(composite_value_edited, list) value_list = composite_value if value_is_list else [composite_value] - name_radix = name_prefix + field.name + name_radix = name_prefix + field.name if output_flat else field.name value_from_dict = [] + + if not output_flat and value_is_list: + output_dict[field.name] = [] + metadata_dict[field.name] = [] + for i, val in enumerate(value_list): - postfix = f"[{i}]" if value_is_list else "" + postfix = f"[{i}]" if value_is_list and output_flat else "" full_name = name_radix + postfix if input_dict is not None and full_name in input_dict: @@ -177,17 +211,27 @@ def _scan( value_from_dict.append(converted_input) output_value = ( - OptimizationObject._convert_to_np_array(composite_value[i]) - if value_is_list - else composite_value + composite_value[i] if value_is_list else composite_value + ) + + output_value = ( + output_conversion(full_name, output_value) + if output_conversion is not None + else output_value ) + output_value = OptimizationObject._convert_to_np_array(output_value) + if output_filter is not None: if not output_filter(full_name, output_value, value_metadata): continue - metadata_dict[full_name] = value_metadata - output_dict[full_name] = output_value + if not output_flat and value_is_list: + output_dict[full_name].append(output_value) + metadata_dict[full_name].append(value_metadata) + else: + output_dict[full_name] = output_value + metadata_dict[full_name] = value_metadata if len(value_from_dict) > 0: input_object.__setattr__( @@ -197,15 +241,26 @@ def _scan( continue + if not output_flat and name_prefix != "": + nested_output = {name_prefix: output_dict} + nested_metadata = {name_prefix: metadata_dict} + return nested_output, nested_metadata + return output_dict, metadata_dict def to_dict( self, prefix: str = "", output_filter: Callable[[str, Any, dict], bool] | None = None, + output_conversion: Callable[[str, Any], Any] | None = None, + flatten: bool = True, ) -> dict: output_dict, _ = OptimizationObject._scan( - input_object=self, name_prefix=prefix, output_filter=output_filter + input_object=self, + name_prefix=prefix, + output_filter=output_filter, + output_conversion=output_conversion, + output_flat=flatten, ) return output_dict @@ -213,9 +268,15 @@ def to_dicts( self, prefix: str = "", output_filter: Callable[[str, Any, dict], bool] | None = None, + output_conversion: Callable[[str, Any], Any] | None = None, + flatten: bool = True, ) -> (dict, dict): output_dict, metadata_dict = OptimizationObject._scan( - input_object=self, name_prefix=prefix, output_filter=output_filter + input_object=self, + name_prefix=prefix, + output_filter=output_filter, + output_conversion=output_conversion, + output_flat=flatten, ) return output_dict, metadata_dict @@ -232,16 +293,30 @@ def from_dict( input_conversion=input_conversion, ) - def to_list(self) -> list: + def to_list( + self, + output_filter: Callable[[str, Any, dict], bool] | None = None, + output_conversion: Callable[[str, Any], Any] | None = None, + ) -> list: output_list = [] - as_dict = self.to_dict() + as_dict = self.to_dict( + output_filter=output_filter, output_conversion=output_conversion + ) for key in sorted(as_dict.keys()): output_list.append(as_dict[key]) return output_list - def to_mx(self) -> cs.MX: - return cs.vertcat(*self.to_list()) + def to_mx( + self, + output_filter: Callable[[str, Any, dict], bool] | None = None, + output_conversion: Callable[[str, Any], Any] | None = None, + ) -> cs.MX: + return cs.vertcat( + *self.to_list( + output_filter=output_filter, output_conversion=output_conversion + ) + ) @classmethod def default_storage_metadata(cls, **kwargs) -> dict: diff --git a/src/hippopt/base/problem.py b/src/hippopt/base/problem.py index b4d024b..7678c88 100644 --- a/src/hippopt/base/problem.py +++ b/src/hippopt/base/problem.py @@ -55,6 +55,29 @@ def __post_init__( self.cost_values = _cost_values self.constraint_multipliers = _constraint_multipliers + def to_dict(self) -> dict: + def set_nested_value(d, input_key, value): + keys = input_key.split(".") + assert all(isinstance(k, str) and len(k) > 0 for k in keys) + for key in keys[:-1]: + d = d.setdefault(key, {}) + d[keys[-1]] = value + + def flatten_to_nested_dict(flat_dict): + nested_dict = {} + for key, value in flat_dict.items(): + set_nested_value(nested_dict, key, value) + return nested_dict + + return { + "values": self.values.to_dict(flatten=False), + "cost_value": self.cost_value, + "cost_values": flatten_to_nested_dict(self.cost_values), + "constraint_multipliers": flatten_to_nested_dict( + self.constraint_multipliers + ), + } + @dataclasses.dataclass class Problem(abc.ABC, Generic[TGenericSolver, TInputObjects]): diff --git a/src/hippopt/turnkey_planners/humanoid_kinodynamic/.gitignore b/src/hippopt/turnkey_planners/humanoid_kinodynamic/.gitignore index 7100787..7850009 100644 --- a/src/hippopt/turnkey_planners/humanoid_kinodynamic/.gitignore +++ b/src/hippopt/turnkey_planners/humanoid_kinodynamic/.gitignore @@ -3,3 +3,4 @@ frames/* *.png *.mp4 +*.mat diff --git a/src/hippopt/turnkey_planners/humanoid_kinodynamic/main_periodic_step.py b/src/hippopt/turnkey_planners/humanoid_kinodynamic/main_periodic_step.py index 19f14c0..bdefc70 100644 --- a/src/hippopt/turnkey_planners/humanoid_kinodynamic/main_periodic_step.py +++ b/src/hippopt/turnkey_planners/humanoid_kinodynamic/main_periodic_step.py @@ -1,6 +1,7 @@ import logging import casadi as cs +import hdf5storage import idyntree.bindings as idyntree import liecasadi import numpy as np @@ -496,3 +497,17 @@ def get_references( save=True, file_name_stem="humanoid_walking_periodic", ) + + print("Saving data to humanoid_walking_periodic.mat") + + humanoid_walking_periodic = { + "output": output.to_dict(), + "guess": planner_guess.to_dict( + flatten=False, output_conversion=hippopt.OptimizationObject.DMConversion + ), + } + hdf5storage.savemat( + file_name="humanoid_walking_periodic.mat", + mdict=humanoid_walking_periodic, + truncate_existing=True, + ) diff --git a/src/hippopt/turnkey_planners/humanoid_kinodynamic/main_single_jump_on_flat_ground.py b/src/hippopt/turnkey_planners/humanoid_kinodynamic/main_single_jump_on_flat_ground.py index 2ad06da..128bf9e 100644 --- a/src/hippopt/turnkey_planners/humanoid_kinodynamic/main_single_jump_on_flat_ground.py +++ b/src/hippopt/turnkey_planners/humanoid_kinodynamic/main_single_jump_on_flat_ground.py @@ -3,6 +3,7 @@ import math import casadi as cs +import hdf5storage import idyntree.bindings as idyntree import liecasadi import numpy as np @@ -603,6 +604,20 @@ def get_references( file_name_stem="humanoid_single_jump_flat", ) + print("Saving data to humanoid_single_jump_flat.mat") + + humanoid_single_jump_flat = { + "output": output.to_dict(), + "guess": planner_guess.to_dict( + flatten=False, output_conversion=hippopt.OptimizationObject.DMConversion + ), + } + hdf5storage.savemat( + file_name="humanoid_single_jump_flat.mat", + mdict=humanoid_single_jump_flat, + truncate_existing=True, + ) + plotter_settings = hp_rp.FootContactStatePlotterSettings() plotter_settings.terrain = planner_settings.terrain left_foot_plotter = hp_rp.FootContactStatePlotter(plotter_settings) diff --git a/src/hippopt/turnkey_planners/humanoid_kinodynamic/main_walking_on_ramp.py b/src/hippopt/turnkey_planners/humanoid_kinodynamic/main_walking_on_ramp.py index 0dcd5ce..93a588a 100644 --- a/src/hippopt/turnkey_planners/humanoid_kinodynamic/main_walking_on_ramp.py +++ b/src/hippopt/turnkey_planners/humanoid_kinodynamic/main_walking_on_ramp.py @@ -1,6 +1,7 @@ import logging import casadi as cs +import hdf5storage import idyntree.bindings as idyntree import liecasadi import numpy as np @@ -592,6 +593,20 @@ def get_references( file_name_stem="humanoid_walking_ramp", ) + print("Saving data to humanoid_walking_ramp.mat") + + humanoid_walking_ramp = { + "output": output.to_dict(), + "guess": planner_guess.to_dict( + flatten=False, output_conversion=hippopt.OptimizationObject.DMConversion + ), + } + hdf5storage.savemat( + file_name="humanoid_walking_ramp.mat", + mdict=humanoid_walking_ramp, + truncate_existing=True, + ) + plotter_settings = hp_rp.FootContactStatePlotterSettings() plotter_settings.terrain = planner_settings.terrain left_foot_plotter = hp_rp.FootContactStatePlotter(plotter_settings) diff --git a/src/hippopt/turnkey_planners/humanoid_kinodynamic/main_walking_on_stairs.py b/src/hippopt/turnkey_planners/humanoid_kinodynamic/main_walking_on_stairs.py index bfeb476..1b32c55 100644 --- a/src/hippopt/turnkey_planners/humanoid_kinodynamic/main_walking_on_stairs.py +++ b/src/hippopt/turnkey_planners/humanoid_kinodynamic/main_walking_on_stairs.py @@ -1,6 +1,7 @@ import logging import casadi as cs +import hdf5storage import idyntree.bindings as idyntree import liecasadi import numpy as np @@ -584,6 +585,21 @@ def get_references( file_name_stem="humanoid_walking_step", ) + print("Saving data to humanoid_walking_step.mat") + + humanoid_walking_step = { + "output": output.to_dict(), + "guess": planner_guess.to_dict( + flatten=False, + output_conversion=hippopt.OptimizationObject.DMConversion, + ), + } + hdf5storage.savemat( + file_name="humanoid_walking_step.mat", + mdict=humanoid_walking_step, + truncate_existing=True, + ) + plotter_settings = hp_rp.FootContactStatePlotterSettings() plotter_settings.terrain = planner_settings.terrain left_foot_plotter = hp_rp.FootContactStatePlotter(plotter_settings) diff --git a/test/test_opti_generate_objects.py b/test/test_opti_generate_objects.py index 5ab4817..9fce35a 100644 --- a/test/test_opti_generate_objects.py +++ b/test/test_opti_generate_objects.py @@ -1,4 +1,3 @@ -import copy import dataclasses import casadi as cs @@ -78,30 +77,6 @@ def test_generate_objects(): assert (len(opti_var.to_list())) == len(test_var_as_list) expected_len = 7 + 3 * 7 + 3 assert opti_var.to_mx().shape == (expected_len, 1) - as_dict = opti_var.to_dict() - assert all( - expected in as_dict - for expected in [ - "aggregated.variable", - "aggregated.parameter", - "aggregated.scalar", - "other_parameter", - "aggregated_list[0].variable", - "aggregated_list[0].parameter", - "aggregated_list[0].scalar", - "aggregated_list[1].variable", - "aggregated_list[1].parameter", - "aggregated_list[1].scalar", - "aggregated_list[2].variable", - "aggregated_list[2].parameter", - "aggregated_list[2].scalar", - ] - ) - assert "other" not in as_dict - dict_copy = copy.deepcopy(as_dict) - dict_copy["aggregated.scalar"] = 7.0 - opti_var.from_dict(dict_copy) - assert opti_var.aggregated.scalar == 7.0 def test_generate_objects_list(): diff --git a/test/test_optimization_object.py b/test/test_optimization_object.py new file mode 100644 index 0000000..ed1d6a9 --- /dev/null +++ b/test/test_optimization_object.py @@ -0,0 +1,230 @@ +import copy +import dataclasses + +import numpy as np + +from hippopt import ( + CompositeType, + OptimizationObject, + OverridableVariable, + Parameter, + StorageType, + Variable, + default_composite_field, + default_storage_field, +) + + +@dataclasses.dataclass +class CustomVariable(OptimizationObject): + variable: StorageType = default_storage_field(cls=Variable) + parameter: StorageType = default_storage_field(cls=Parameter) + scalar: StorageType = default_storage_field(cls=Variable) + + def __post_init__(self): + self.variable = np.ones(shape=3) + self.parameter = np.ones(shape=3) + self.scalar = 1.0 + + +@dataclasses.dataclass +class AggregateClass(OptimizationObject): + aggregated: CompositeType[CustomVariable] = default_composite_field( + factory=CustomVariable + ) + aggregated_list: CompositeType[list[CustomVariable]] = default_composite_field( + factory=list + ) + other_parameter: StorageType = default_storage_field(cls=Parameter) + other: str = "" + + def __post_init__(self): + self.other_parameter = np.ones(3) + self.other = "untouched" + for _ in range(3): + self.aggregated_list.append(CustomVariable()) + + +@dataclasses.dataclass +class CustomOverridableVariable(OptimizationObject): + overridable: StorageType = default_storage_field(cls=OverridableVariable) + not_overridable: StorageType = default_storage_field(cls=Variable) + + def __post_init__(self): + self.overridable = 0.0 + self.not_overridable = 0.0 + + +@dataclasses.dataclass +class CustomCompositeOverridableVariable(OptimizationObject): + composite: CompositeType[CustomOverridableVariable] = default_composite_field( + cls=Parameter, factory=CustomOverridableVariable + ) + + +def test_to_dict_flat(): + test_var = AggregateClass() + test_var_as_dict = test_var.to_dict() + assert ( + len(test_var_as_dict) == 3 + 3 * 3 + 1 + ) # 3 for aggregated, 3*3 for aggregated_list, 1 for other_parameter + + assert all( + expected in test_var_as_dict + for expected in [ + "aggregated.variable", + "aggregated.parameter", + "aggregated.scalar", + "other_parameter", + "aggregated_list[0].variable", + "aggregated_list[0].parameter", + "aggregated_list[0].scalar", + "aggregated_list[1].variable", + "aggregated_list[1].parameter", + "aggregated_list[1].scalar", + "aggregated_list[2].variable", + "aggregated_list[2].parameter", + "aggregated_list[2].scalar", + ] + ) + assert "other" not in test_var_as_dict + dict_copy = copy.deepcopy(test_var_as_dict) + dict_copy["aggregated.scalar"] = 7.0 + + +def test_to_dict_metadata(): + test_var = CustomCompositeOverridableVariable() + _, metadata_dict = test_var.to_dicts() + assert ( + len(metadata_dict) == 2 + ) # 1 for composite.overridable, 1 for composite.not_overridable + assert all( + expected in metadata_dict + for expected in [ + "composite.overridable", + "composite.not_overridable", + ] + ) + assert ( + metadata_dict["composite.overridable"][OptimizationObject.StorageTypeField] + == Parameter.StorageTypeValue + ) + assert ( + metadata_dict["composite.not_overridable"][OptimizationObject.StorageTypeField] + == Variable.StorageTypeValue + ) + + +def test_to_dict_not_flat(): + test_var = AggregateClass() + test_var_as_dict = test_var.to_dict(flatten=False) + assert ( + len(test_var_as_dict) == 3 + ) # 1 for aggregated, 1 for aggregated_list, 1 for other_parameter + assert all( + expected in test_var_as_dict + for expected in [ + "aggregated", + "aggregated_list", + "other_parameter", + ] + ) + assert all( + expected in test_var_as_dict["aggregated"] + for expected in [ + "variable", + "parameter", + "scalar", + ] + ) + + assert test_var_as_dict["aggregated"]["scalar"] == 1.0 + assert len(test_var_as_dict["aggregated_list"]) == 3 + + test_var_as_dict = test_var.to_dict(flatten=False, prefix="test") + assert len(test_var_as_dict) == 1 + assert "test" in test_var_as_dict + assert all( + expected in test_var_as_dict["test"] + for expected in [ + "aggregated", + "aggregated_list", + "other_parameter", + ] + ) + + +def test_to_dict_metadata_not_flat(): + test_var = CustomCompositeOverridableVariable() + _, metadata_dict = test_var.to_dicts(flatten=False) + assert ( + len(metadata_dict) == 1 + ) # 1 for composite.overridable, 1 for composite.not_overridable + assert "composite" in metadata_dict + assert len(metadata_dict["composite"]) == 2 + assert all( + expected in metadata_dict["composite"] + for expected in [ + "overridable", + "not_overridable", + ] + ) + assert ( + metadata_dict["composite"]["overridable"][OptimizationObject.StorageTypeField] + == Parameter.StorageTypeValue + ) + assert ( + metadata_dict["composite"]["not_overridable"][ + OptimizationObject.StorageTypeField + ] + == Variable.StorageTypeValue + ) + + +def test_to_list(): + test_var = AggregateClass() + test_var_as_list = test_var.to_list() + assert (len(test_var_as_list)) == 3 + 3 * 3 + 1 + + test_var_as_dict = test_var.to_dict() + key_to_index = {} + for i, key in enumerate(sorted(test_var_as_dict.keys())): + key_to_index[key] = i + + assert test_var_as_list[key_to_index["aggregated.variable"]].shape == (3, 1) + assert test_var_as_list[key_to_index["aggregated.parameter"]].shape == (3, 1) + assert test_var_as_list[key_to_index["aggregated.scalar"]].shape == (1, 1) + + +def test_from_dict(): + test_var = AggregateClass() + test_var_as_dict = test_var.to_dict() + test_var_as_dict["aggregated.scalar"] = 7.0 + test_var.from_dict(test_var_as_dict) + assert test_var.aggregated.scalar == 7.0 + + +def test_to_dict_filtered(): + test_var = AggregateClass() + test_var.aggregated.scalar = None + test_var_as_dict = test_var.to_dict(output_filter=OptimizationObject.IsValueFilter) + assert "aggregated.scalar" not in test_var_as_dict + + +def test_to_dict_converted(): + test_var = AggregateClass() + test_var_as_dict = test_var.to_dict( + output_conversion=lambda _, value: (42 if isinstance(value, float) else value), + ) + assert test_var_as_dict["aggregated.scalar"] == 42 + + +def test_from_dict_converted(): + test_var = AggregateClass() + test_var_as_dict = test_var.to_dict() + test_var_as_dict["aggregated.scalar"] = 7.0 + test_var.from_dict( + test_var_as_dict, + input_conversion=lambda _, value: (42 if isinstance(value, float) else value), + ) + assert test_var.aggregated.scalar == 42