diff --git a/.github/workflows/ci_cd.yml b/.github/workflows/ci_cd.yml index cfc98c37..d00f5683 100644 --- a/.github/workflows/ci_cd.yml +++ b/.github/workflows/ci_cd.yml @@ -31,11 +31,14 @@ jobs: with: miniforge-variant: Mambaforge miniforge-version: latest + channels: conda-forge,robotology + channel-priority: true - name: Dependencies shell: bash -l {0} run: | - mamba install python=${{ matrix.python }} casadi pytest + mamba install python=${{ matrix.python }} casadi pytest liecasadi adam-robotics idyntree meshcat-python ffmpeg-python matplotlib + mamba list - name: Install shell: bash -l {0} diff --git a/.gitignore b/.gitignore index ea7da93b..3cec1127 100644 --- a/.gitignore +++ b/.gitignore @@ -130,3 +130,6 @@ dmypy.json # Pycharm files .idea/ + +# VSCode files +.vscode/ diff --git a/setup.cfg b/setup.cfg index fb353f48..9f5c1adf 100644 --- a/setup.cfg +++ b/setup.cfg @@ -59,9 +59,22 @@ style = isort testing= pytest +robot_planning= + liecasadi + adam-robotics +turnkey_planners= + idyntree +visualization= + ffmpeg-python + idyntree + meshcat-python + matplotlib all = %(style)s %(testing)s + %(robot_planning)s + %(turnkey_planners)s + %(visualization)s [options.packages.find] where = src diff --git a/src/hippopt/__init__.py b/src/hippopt/__init__.py index 57281618..2c93d583 100644 --- a/src/hippopt/__init__.py +++ b/src/hippopt/__init__.py @@ -1,26 +1,30 @@ -from . import base, integrators +import hippopt.base.opti_callback as opti_callback + from .base.dynamics import Dynamics, TypedDynamics, dot from .base.multiple_shooting_solver import MultipleShootingSolver from .base.opti_solver import OptiFailure, OptiSolver -from .base.optimal_control_problem import OptimalControlProblem +from .base.optimal_control_problem import ( + OptimalControlProblem, + OptimalControlProblemInstance, +) from .base.optimization_object import ( + CompositeType, OptimizationObject, StorageType, TimeExpansion, TOptimizationObject, + default_composite_field, default_storage_field, default_storage_metadata, time_varying_metadata, ) -from .base.optimization_problem import OptimizationProblem +from .base.optimization_problem import OptimizationProblem, OptimizationProblemInstance from .base.optimization_solver import SolutionNotAvailableException -from .base.parameter import Parameter, TParameter -from .base.problem import ExpressionType, ProblemNotSolvedException +from .base.parameter import OverridableParameter, Parameter, TParameter +from .base.problem import ExpressionType, Output, ProblemNotSolvedException from .base.single_step_integrator import ( SingleStepIntegrator, TSingleStepIntegrator, step, ) -from .base.variable import TVariable, Variable -from .integrators.forward_euler import ForwardEuler -from .integrators.implicit_trapezoid import ImplicitTrapezoid +from .base.variable import OverridableVariable, TVariable, Variable diff --git a/src/hippopt/base/__init__.py b/src/hippopt/base/__init__.py index 42cf0b72..b8c8935c 100644 --- a/src/hippopt/base/__init__.py +++ b/src/hippopt/base/__init__.py @@ -1,6 +1,7 @@ from . import ( dynamics, multiple_shooting_solver, + opti_callback, opti_solver, optimal_control_problem, optimal_control_solver, diff --git a/src/hippopt/base/dynamics.py b/src/hippopt/base/dynamics.py index f2fed503..5af62621 100644 --- a/src/hippopt/base/dynamics.py +++ b/src/hippopt/base/dynamics.py @@ -10,26 +10,45 @@ @dataclasses.dataclass class DynamicsRHS: - _f: cs.Function = dataclasses.field(default=None) + _f: cs.Function | list[str] = dataclasses.field(default=None) _names_map: dict[str, str] = dataclasses.field(default=None) _names_map_inv: dict[str, str] = dataclasses.field(default=None) - f: dataclasses.InitVar[cs.Function] = None + f: dataclasses.InitVar[cs.Function | str | list[str] | cs.MX] = None names_map_in: dataclasses.InitVar[dict[str, str]] = None - def __post_init__(self, f: cs.Function, names_map_in: dict[str, str]): + def __post_init__( + self, f: cs.Function | str | list[str] | cs.MX, names_map_in: dict[str, str] + ): """ Create the DynamicsRHS object - :param f: The CasADi function describing the dynamics. The output order should match the list provided - in the dot function. - :param names_map_in: A dict describing how to switch from the input names to those used in the function. - The key is the name provided by the user, while the value is the input name expected by the function. - It is also possible to specify labels for nested variables using ".", e.g. "a.b" corresponds - to the variable "b" within "a". + :param f: The CasADi function describing the dynamics. The output order should + match the list provided in the dot function. As an alternative, if the + dynamics is trivial (e.g. dot(x) = y), it is possible to pass directly the name + of the variable in the right-hand-side, or the list of variables in case the + left-hand-side is a list. + :param names_map_in: A dict describing how to switch from the input names to + those used in the function. + The key is the name provided by the user, while the value is the input name + expected by the function. + Refer to the specific optimal control problem for a specification of the + label convention. This is valid only for the keys. If time is an input, its label needs to be provided using the "dot" function. :return: Nothing """ - self._f = f + if isinstance(f, str): + self._f = [f] + elif isinstance(f, list) or isinstance(f, cs.Function): + self._f = f + elif isinstance(f, cs.MX): + inputs = cs.symvar(f) + input_names = [el.name() for el in inputs] + self._f = cs.Function( + "dynamics_rhs", inputs, [f], input_names, ["dynamics_rhs_output"] + ) + else: + raise ValueError("Unsupported input f") + self._names_map = names_map_in if names_map_in else {} self._names_map_inv = {v: k for k, v in self._names_map.items()} # inverse dict @@ -48,10 +67,19 @@ def evaluate( key = name if name not in self._names_map else self._names_map[name] input_dict[key] = variables[name] + if isinstance(self._f, list): + return input_dict + + assert isinstance(self._f, cs.Function) return self._f(**input_dict) def input_names(self) -> list[str]: - function_inputs = self._f.name_in() + if isinstance(self._f, list): + function_inputs = self._f + else: + assert isinstance(self._f, cs.Function) + function_inputs = self._f.name_in() + output = [] for el in function_inputs: output_name = self._names_map_inv[el] if el in self._names_map_inv else el @@ -60,6 +88,10 @@ def input_names(self) -> list[str]: return output def outputs(self) -> list[str]: + if isinstance(self._f, list): + return self._f + + assert isinstance(self._f, cs.Function) return self._f.name_out() @@ -67,36 +99,84 @@ def outputs(self) -> list[str]: class DynamicsLHS: _x: list[str] = dataclasses.field(default=None) x: dataclasses.InitVar[list[str] | str] = None - _t_label: str = "t" - t_label: dataclasses.InitVar[str] = None + _t_label: str | cs.MX = "t" + t_label: dataclasses.InitVar[str | cs.MX] = None - def __post_init__(self, x: list[str] | str, t_label: str): + def __post_init__( + self, x: str | list[str] | cs.MX | list[cs.MX], t_label: str | cs.MX = None + ): """ Constructs the DynamicsLHS object :param x: List of variable names on the left hand side of dot{x} = f(y). - The list can contain empty strings if some output of f needs to be discarded. If one output - needs to be mapped to a nested item, use "." as separator, e.g. "a.b" - :param t_label: The label of the time variable. Default "t" + The list can contain empty strings if some output of f needs to be discarded. + Refer to the specific optimal control problem for a specification of the + label convention. + The input can also be of type cs.MX. This allows using the symbolic + structure provided by the optimal control solver. The input cannot be an + expression. Also in the case, the input can be a list too, and can contain + None if some outputs of f have to be discarded. + :param t_label: The label of the time variable. Default "t". It can also be a + cs.MX. In this case, its name will be used :return: Nothing """ - self._x = x if isinstance(x, list) else [x] - self._t_label = t_label if isinstance(t_label, str) else "t" - def equal(self, f: cs.Function, names_map: dict[str, str] = None) -> TDynamics: + def input_to_string( + input_value: str | cs.MX, default_string: str = None + ) -> str: + if isinstance(input_value, str): + return input_value + + if input_value is None: + return "" + + if not isinstance(input_value, cs.MX): + if default_string is not None: + return default_string + + raise ValueError("The input can be only a string, a cs.MX, or None.") + + if not input_value.is_symbolic(): + raise ValueError("The input MX has to be symbolic.") + return input_value.name() + + if isinstance(x, list): + self._x = [input_to_string(el) for el in x] + else: + self._x = [input_to_string(x)] + + self._t_label = input_to_string(input_value=t_label, default_string="t") + + def equal( + self, f: cs.Function | str | list[str] | cs.MX, names_map: dict[str, str] = None + ) -> TDynamics: rhs = DynamicsRHS(f=f, names_map_in=names_map) if len(rhs.outputs()) != len(self._x): raise ValueError( - "The number of outputs of the dynamics function does not match the specified number of state variables." + "The number of outputs of the dynamics function does not match" + " the specified number of state variables." ) return TypedDynamics(lhs=self, rhs=rhs) def __eq__( - self, other: cs.Function | tuple[cs.Function, dict[str, str]] + self, + other: cs.Function + | str + | list[str] + | cs.MX + | tuple[cs.Function, dict[str, str]] + | tuple[str, dict[str, str]] + | tuple[list[str], dict[str, str]] + | tuple[cs.MX, dict[str, str]], ) -> TDynamics: if isinstance(other, tuple): return self.equal(f=other[0], names_map=other[1]) - assert isinstance(other, cs.Function) + assert ( + isinstance(other, cs.Function) + or isinstance(other, str) + or isinstance(other, list) + or isinstance(other, cs.MX) + ) return self.equal(f=other) def state_variables(self) -> list[str]: @@ -106,7 +186,7 @@ def time_label(self) -> str: return self._t_label -def dot(x: str | list[str], t: str = "t") -> TDynamicsLHS: +def dot(x: str | list[str] | cs.MX | list[cs.MX], t: str | cs.MX = "t") -> TDynamicsLHS: return DynamicsLHS(x=x, t_label=t) diff --git a/src/hippopt/base/multiple_shooting_solver.py b/src/hippopt/base/multiple_shooting_solver.py index 2e6d70ee..a52f19d8 100644 --- a/src/hippopt/base/multiple_shooting_solver.py +++ b/src/hippopt/base/multiple_shooting_solver.py @@ -18,20 +18,29 @@ from .problem import ExpressionType, Problem from .single_step_integrator import SingleStepIntegrator, step +FlattenedVariableIterator = Callable[[], Iterator[cs.MX]] +FlattenedVariableTuple = tuple[int, FlattenedVariableIterator] +FlattenedVariableDict = dict[str, FlattenedVariableTuple] + @dataclasses.dataclass class MultipleShootingSolver(OptimalControlSolver): optimization_solver: dataclasses.InitVar[OptimizationSolver] = dataclasses.field( default=None ) + _optimization_solver: TOptimizationSolver = dataclasses.field(default=None) default_integrator: dataclasses.InitVar[SingleStepIntegrator] = dataclasses.field( default=None ) + _default_integrator: SingleStepIntegrator = dataclasses.field(default=None) - _flattened_variables: list[ - dict[str, tuple[int, Callable[[], Iterator[cs.MX]]]] + + _flattened_variables: FlattenedVariableDict = dataclasses.field(default=None) + + _symbolic_structure: TOptimizationObject | list[ + TOptimizationObject ] = dataclasses.field(default=None) def __post_init__( @@ -50,15 +59,14 @@ def __post_init__( if isinstance(default_integrator, SingleStepIntegrator) else ImplicitTrapezoid() ) - self._flattened_variables = [] + self._flattened_variables = {} + @staticmethod def _extend_structure_to_horizon( - self, input_structure: TOptimizationObject | list[TOptimizationObject], **kwargs + input_structure: TOptimizationObject | list[TOptimizationObject], **kwargs ) -> TOptimizationObject | list[TOptimizationObject]: if "horizon" not in kwargs and "horizons" not in kwargs: - return self._optimization_solver.generate_optimization_objects( - input_structure=input_structure, **kwargs - ) + return input_structure default_horizon_length = int(1) if "horizon" in kwargs: @@ -110,7 +118,8 @@ def _extend_structure_to_horizon( "Field " + field.name + "is not a Numpy array. Cannot expand it to the horizon." - ' Consider using "TimeExpansion.List" as time_expansion strategy.' + ' Consider using "TimeExpansion.List"' + " as time_expansion strategy." ) if field_value.ndim > 1 and field_value.shape[1] > 1: @@ -118,11 +127,15 @@ def _extend_structure_to_horizon( "Cannot expand " + field.name + " since it is already a matrix." - ' Consider using "TimeExpansion.List" as time_expansion strategy.' + ' Consider using "TimeExpansion.List"' + " as time_expansion strategy." ) + # Repeat the vector along the horizon + if field_value.ndim < 2: + field_value = np.expand_dims(field_value, axis=1) output.__setattr__( - field.name, np.zeros((field_value.shape[0], horizon_length)) - ) # This is only needed to get the structure for the optimization variables. + field.name, np.tile(field_value, (1, horizon_length)) + ) else: output_value = [] for _ in range(horizon_length): @@ -136,7 +149,8 @@ def _extend_structure_to_horizon( OptimizationObject.TimeDependentField not in field.metadata and not custom_horizon ): - continue # We expand nested variables (following two cases) only if time dependent + continue + # We expand nested variables (following two cases) only if time dependent if isinstance(field_value, OptimizationObject): output_value = [] @@ -148,7 +162,9 @@ def _extend_structure_to_horizon( if isinstance(field_value, list) and all( isinstance(elem, OptimizationObject) for elem in field_value - ): # Nested variables are extended only if it is set as time dependent or if it has custom horizon + ): + # Nested variables are extended only if it is set as time dependent + # or if it has custom horizon if not len(field_value): # skip empty lists continue @@ -169,6 +185,7 @@ def generate_optimization_objects( ) -> TOptimizationObject | list[TOptimizationObject]: if isinstance(input_structure, list): expanded_structure = [] + self._symbolic_structure = [] for element in input_structure: expanded_structure.append( self._extend_structure_to_horizon(input_structure=element, **kwargs) @@ -178,10 +195,12 @@ def generate_optimization_objects( input_structure=expanded_structure, **kwargs ) - for var in variables: - self._flattened_variables.append( - self._generate_flattened_optimization_objects(object_in=var) + for k in range(len(variables)): + flattened, symbolic = self._generate_flattened_and_symbolic_objects( + object_in=variables[k], base_string="[" + str(k) + "]." ) + self._flattened_variables = self._flattened_variables | flattened + self._symbolic_structure.append(symbolic) else: expanded_structure = self._extend_structure_to_horizon( @@ -190,32 +209,37 @@ def generate_optimization_objects( variables = self._optimization_solver.generate_optimization_objects( input_structure=expanded_structure, **kwargs ) - self._flattened_variables.append( - self._generate_flattened_optimization_objects(object_in=variables) + flattened, symbolic = self._generate_flattened_and_symbolic_objects( + object_in=variables ) + self._flattened_variables = flattened + self._symbolic_structure = symbolic return variables - def _generate_flattened_optimization_objects( + def _generate_flattened_and_symbolic_objects( # TODO: remove some indentation self, - object_in: TOptimizationObject | list[TOptimizationObject], + object_in: TOptimizationObject, top_level: bool = True, base_string: str = "", base_iterator: tuple[ int, Callable[[], Iterator[TOptimizationObject | list[TOptimizationObject]]] ] = None, - ) -> dict[str, tuple[int, Callable[[], Iterator[cs.MX]]]]: + ) -> tuple[FlattenedVariableDict, TOptimizationObject]: assert (bool(top_level) != bool(base_iterator is not None)) or ( not top_level and base_iterator is None ) # Cannot be top level and have base iterator - output = {} + + output_dict = {} + output_symbolic = copy.deepcopy(object_in) + for field in dataclasses.fields(object_in): field_value = object_in.__getattribute__(field.name) time_dependent = ( OptimizationObject.TimeDependentField in field.metadata and field.metadata[OptimizationObject.TimeDependentField] - and top_level # only top level variables can be time dependent + and top_level # only top level variables can be time-dependent ) expand_storage = ( @@ -234,8 +258,9 @@ def _generate_flattened_optimization_objects( if OptimizationObject.StorageTypeField in field.metadata: # storage if not time_dependent: if base_iterator is not None: - # generators cannot be rewound, but we might need to reuse them. Hence, we store - # a lambda that can return a generator. Since in python it is not possible + # generators cannot be rewound, but we might need to reuse them. + # Hence, we store a lambda that can return a generator. + # Since in python it is not possible # to have capture lists as in C++, we use partial from functools # to store the inputs of the lambda within itself # (inspired from https://stackoverflow.com/a/10101476) @@ -249,7 +274,7 @@ def _generate_flattened_optimization_objects( field.name, base_iterator[1], ) - output[base_string + field.name] = ( + output_dict[base_string + field.name] = ( base_iterator[0], new_generator, ) @@ -257,10 +282,20 @@ def _generate_flattened_optimization_objects( constant_generator = partial( (lambda value: itertools.repeat(value)), field_value ) - output[base_string + field.name] = ( + output_dict[base_string + field.name] = ( 1, constant_generator, ) + + output_symbolic.__setattr__( + field.name, + cs.MX.sym( + base_string + field.name, + field_value.rows(), + field_value.columns(), + ), + ) + continue if expand_storage: @@ -270,24 +305,42 @@ def _generate_flattened_optimization_objects( field_value, n, ) - output[base_string + field.name] = (n, new_generator) + output_dict[base_string + field.name] = (n, new_generator) + output_symbolic.__setattr__( + field.name, + cs.MX.sym( + base_string + field.name, + field_value.rows(), + 1, + ), + ) continue assert isinstance( field_value, list ) # time dependent and not expand_storage n = len(field_value) # list case + assert n > 0 new_generator = partial( (lambda value, dim: (value[i] for i in range(dim))), field_value, n, ) - output[base_string + field.name] = (n, new_generator) + output_dict[base_string + field.name] = (n, new_generator) + first_element = field_value[0] # Assume all elements have same dims + output_symbolic.__setattr__( + field.name, + cs.MX.sym( + base_string + field.name, + first_element.rows(), + first_element.columns(), + ), + ) continue if isinstance( field_value, OptimizationObject - ): # aggregate (cannot be time dependent) + ): # aggregate (cannot be time-dependent) generator = ( partial( ( @@ -303,7 +356,10 @@ def _generate_flattened_optimization_objects( else None ) - output = output | self._generate_flattened_optimization_objects( + ( + inner_dict, + inner_symbolic, + ) = self._generate_flattened_and_symbolic_objects( object_in=field_value, top_level=False, base_string=base_string + field.name + ".", @@ -311,12 +367,19 @@ def _generate_flattened_optimization_objects( if generator is not None else None, ) + + output_dict = output_dict | inner_dict + output_symbolic.__setattr__( + field.name, + inner_symbolic, + ) continue if isinstance(field_value, list) and all( isinstance(elem, OptimizationObject) for elem in field_value ): # list[aggregate] if not time_dependent: + symbolic_list = output_symbolic.__getattribute__(field.name) for k in range(len(field_value)): generator = ( partial( @@ -333,7 +396,11 @@ def _generate_flattened_optimization_objects( if base_iterator is not None else None ) - output = output | self._generate_flattened_optimization_objects( + + ( + inner_dict, + inner_symbolic, + ) = self._generate_flattened_and_symbolic_objects( object_in=field_value[k], top_level=False, base_string=base_string @@ -345,6 +412,10 @@ def _generate_flattened_optimization_objects( if generator is not None else None, ) + + output_dict = output_dict | inner_dict + symbolic_list[k] = inner_symbolic + continue if not len(field_value): @@ -352,26 +423,32 @@ def _generate_flattened_optimization_objects( iterable = iter(field_value) first = next(iterable) - all_same = all( + assert all( isinstance(el, type(first)) for el in iterable ) # check that each element has same type - if not all_same: - continue - - # If we are time dependent (and hence top_level has to be true), there is no base generator + # If we are time-dependent (and hence top_level has to be true), + # there is no base generator new_generator = partial( (lambda value: (val for val in value)), field_value ) - output = output | self._generate_flattened_optimization_objects( - object_in=first, # since they are al equal, expand only the first and exploit base_iterator + ( + inner_dict, + inner_symbolic, + ) = self._generate_flattened_and_symbolic_objects( + # since they are al equal, expand only the first + # and exploit the base_iterator + object_in=first, top_level=False, base_string=base_string + field.name + ".", # we don't flatten the list base_iterator=(len(field_value), new_generator), ) + + output_dict = output_dict | inner_dict + output_symbolic.__setattr__(field.name, inner_symbolic) continue if ( @@ -379,6 +456,8 @@ def _generate_flattened_optimization_objects( and time_dependent and all(isinstance(elem, list) for elem in field_value) ): # list[list[aggregate]], only time dependent + # The inner list is the time dependency + symbolic_list = output_symbolic.__getattribute__(field.name) for k in range(len(field_value)): inner_list = field_value[k] @@ -387,25 +466,29 @@ def _generate_flattened_optimization_objects( iterable = iter(inner_list) first = next(iterable) - all_same = all( + assert all( isinstance(el, type(first)) for el in iterable ) # check that each element has same type - if not all_same: - break - new_generator = partial( (lambda value: (val for val in value)), inner_list ) - output = output | self._generate_flattened_optimization_objects( + + ( + inner_dict, + inner_symbolic, + ) = self._generate_flattened_and_symbolic_objects( object_in=first, top_level=False, base_string=base_string + field.name + "[" + str(k) + "].", base_iterator=(len(inner_list), new_generator), ) + + output_dict = output_dict | inner_dict + symbolic_list[k] = inner_symbolic continue - return output + return output_dict, output_symbolic def get_optimization_objects( self, @@ -420,13 +503,16 @@ def get_problem(self) -> Problem: def get_flattened_optimization_objects( self, - ) -> list[dict[str, tuple[int, Callable[[], Iterator[cs.MX]]]]]: + ) -> FlattenedVariableDict: return self._flattened_variables + def get_symbolic_structure(self) -> TOptimizationObject | list[TOptimizationObject]: + return self._symbolic_structure + def add_dynamics( self, dynamics: TDynamics, - x0: dict[str, cs.MX] = None, + x0: dict[str, cs.MX] | dict[cs.MX, cs.MX] | cs.MX = None, t0: cs.MX = cs.MX(0.0), mode: ExpressionType = ExpressionType.subject_to, name: str = None, @@ -435,34 +521,51 @@ def add_dynamics( ) -> None: """ Add a dynamics to the optimal control problem - :param dynamics: The dynamics to add. The variables involved need to have a name corresponding to the name of - a flattened variable. If the variable is nested, you can use "." as separator (e.g. "a.b" will - look for variable b within a). If there is a list, you can use "[k]" with "k" the element - to pick. For example "a.b[k].c" will look for variable "c" defined in the k-th element of "b" - within "a". Only the top level variables can be time dependent. In this case, "a" could be time - dependent and being a list, but this is automatically detected, and there is no need to specify - the time-dependency. The "[k]" keyword is used only in case the list is not time-dependent. - :param x0: The initial state. It is a dictionary with the key equal to the state variable name. If no dict - is provided, or a given variable is not found in the dictionary, the initial condition is not set. + :param dynamics: The dynamics to add. The variables involved need to have a + name corresponding to the name of a flattened variable. + If the variable is nested, you can use "." as separator + (e.g. "a.b" will look for variable b within a). + If there is a list, you can use "[k]" with "k" the element + to pick. For example "a.b[k].c" will look for variable "c" + defined in the k-th element of "b" within "a". + Only the top level variables can be time-dependent. + In this case, "a" could be time-dependent and being a list, + but this is automatically detected, and there is no need to + specify the time-dependency. The "[k]" keyword is used only + in case the list is not time-dependent. + In case we have a list of optimization objects, prepend "[k]." + to name of the variable, with k the top level index. + :param x0: The initial state. It is a dictionary with the key equal to the state + variable name, or its corresponding symbolic variable. + It can also be a single MX in case there is only one state variable. + If no dict is provided, or a given variable is not + found in the dictionary, the initial condition is not set. :param t0: The initial time - :param mode: Optional argument to set the mode with which the dynamics is added to the problem. + :param mode: Optional argument to set the mode with which the + dynamics is added to the problem. Default: constraint :param name: The name used when adding the dynamics expression. :param x0_name: The name used when adding the initial condition expression. :param kwargs: Additional arguments. There are some required arguments: - - "dt": the integration time delta. It can either be a float in case it is - constant, or a string to indicate the (flattened) name of the - variable to use. + - "dt": the integration time delta. + It can be a float in case it + is constant, or a string to indicate + the (flattened) name of the + variable to use, or a cs.MX when + using the corresponding variable in + the symbolic structure Optional arguments: - - "top_level_index": this defines the index to use in case we have a - list of optimization objects. This is used only if - the top level is a list. - - "max_steps": the number of integration steps. If not specified, the entire - horizon of the specific state variable is used - - "integrator": specify the `SingleStepIntegrator` to use. This needs to be - a type - - optional arguments of the `Problem.add_expression` method. + - "max_steps": the number of integration + steps. If not specified, the + entire horizon of the + specific state variable + is used + - "integrator": specify the + `SingleStepIntegrator` to + use. This needs to be a type + - optional arguments of the + `Problem.add_expression` method. :return: None """ if "dt" not in kwargs: @@ -470,14 +573,6 @@ def add_dynamics( "MultipleShootingSolver needs dt to be specified when adding a dynamics" ) - top_level_index = 0 - if isinstance(self.get_optimization_objects(), list): - if "top_level_index" not in kwargs: - raise ValueError( - "The optimization objects are in a list, but top_level_index has not been specified." - ) - top_level_index = kwargs["top_level_index"] - dt_in = kwargs["dt"] max_n = 0 @@ -487,21 +582,23 @@ def add_dynamics( if not isinstance(max_n, int) or max_n < 2: raise ValueError( - "max_steps is specified, but it needs to be an integer greater than 1" + "max_steps is specified, but it needs to be an integer" + " greater than 1" ) dt_size = 1 if isinstance(dt_in, float): dt_generator = itertools.repeat(cs.MX(dt_in)) - elif isinstance(dt_in, str): - if dt_in not in self._flattened_variables[top_level_index]: + elif isinstance(dt_in, str) or isinstance(dt_in, cs.MX): + dt_in = dt_in.name() if isinstance(dt_in, cs.MX) else dt_in + if dt_in not in self._flattened_variables: raise ValueError( "The specified dt name is not found in the optimization variables" ) - dt_var_tuple = self._flattened_variables[top_level_index][dt_in] + dt_var_tuple = self._flattened_variables[dt_in] dt_size = dt_var_tuple[0] - dt_generator = dt_var_tuple[1] + dt_generator = dt_var_tuple[1]() else: raise ValueError("Unsupported dt type") @@ -513,17 +610,18 @@ def add_dynamics( if not issubclass(integrator, SingleStepIntegrator): raise ValueError( - "The integrator has been defined, but it is not a subclass of SingleStepIntegrator" + "The integrator has been defined, but it is not " + "a subclass of SingleStepIntegrator" ) variables = {} n = max_n for var in dynamics.state_variables(): - if var not in self._flattened_variables[top_level_index]: + if var not in self._flattened_variables: raise ValueError( "Variable " + var + " not found in the optimization variables." ) - var_tuple = self._flattened_variables[top_level_index][var] + var_tuple = self._flattened_variables[var] var_n = var_tuple[0] if n == 0: if var_n < 2: @@ -545,13 +643,13 @@ def add_dynamics( additional_inputs = {} for inp in dynamics.input_names(): - if inp not in self._flattened_variables[top_level_index]: + if inp not in self._flattened_variables: raise ValueError( "Variable " + inp + " not found in the optimization variables." ) if inp not in variables: - inp_tuple = self._flattened_variables[top_level_index][inp] + inp_tuple = self._flattened_variables[inp] inp_n = inp_tuple[0] @@ -559,7 +657,8 @@ def add_dynamics( raise ValueError( "The input " + inp - + " is time dependent, but it has a too small prediction horizon." + + " is time dependent, but it has a too small " + "prediction horizon." ) additional_inputs[inp] = (inp_tuple[0], inp_tuple[1]()) @@ -575,17 +674,30 @@ def add_dynamics( initial_conditions = {} if x0 is not None: - for var in x_k: - if var in x0: - initial_conditions[var] = x0[var] + if not isinstance(x0, dict): + if len(x_k) > 1: + raise ValueError( + "The initial condition is a single MX, but the dynamics " + "has more than one state variable." + ) + x0 = {list(x_k.keys())[0]: x0} + for var in x0: + var_name = var if isinstance(var, str) else var.name() # noqa + if var_name in x_k: + initial_conditions[var_name] = x0[var] + + if x0_name is None and name is not None: + x0_name = name + "[0]" - # In the following, we add the initial condition expressions through the problem interface. + # In the following, we add the initial condition expressions + # through the problem interface. # In this way, we can exploit the machinery handling the generators, # and we can switch the dynamics from constraints to costs self.get_problem().add_expression( mode=mode, expression=( - cs.MX(x_k[name] == x0[name]) for name in initial_conditions + cs.MX(x_k[name] == initial_conditions[name]) + for name in initial_conditions ), name=x0_name, **kwargs @@ -606,11 +718,12 @@ def add_dynamics( t0=t0 + cs.MX(i) * dt, ) - name = base_name + "[" + str(i) + "]" + name = base_name + "[" + str(i + 1) + "]" - # In the following, we add the dynamics expressions through the problem interface, rather than the - # solver interface. In this way, we can exploit the machinery handling the generators, - # and we can switch the dynamics from constraints to costs + # In the following, we add the dynamics expressions through the problem + # interface, rather than the solver interface. In this way, we can exploit + # the machinery handling the generators, and we can switch the dynamics + # from constraints to costs self.get_problem().add_expression( mode=mode, expression=(cs.MX(x_next[name] == integrated[name]) for name in x_next), @@ -621,11 +734,151 @@ def add_dynamics( x_k = x_next u_k = u_next + def add_expression_to_horizon( + self, + expression: cs.MX, + mode: ExpressionType = ExpressionType.subject_to, + apply_to_first_elements: bool = False, + name: str = None, + **kwargs + ) -> None: + """ + Add an expression to the whole horizon of the optimal control problem + :param expression: The expression to add. Use the symbolic_structure to set up + expression + :param mode: Optional argument to set the mode with which the + dynamics is added to the problem. + Default: constraint + :param apply_to_first_elements: Flag to define if the constraint need to be + applied also to the first elements. If True + the expression will be applied also to the first + elements. Default: False + :param name: The name used when adding the expression. + :param kwargs: Optional arguments: + - "max_steps": the number of integration + steps. If not specified, the + number of steps is determined from the + variables involved. + - optional arguments of the + `Problem.add_expression` method. + :return: None + """ + + input_variables = cs.symvar(expression) + input_variables_names = [var.name() for var in input_variables] + base_name = name if name is not None else str(expression) + + max_n = 1 + max_iterations_set = False + + if "max_steps" in kwargs: + max_n = kwargs["max_steps"] + max_iterations_set = True + if not isinstance(max_n, int) or max_n < 1: + raise ValueError( + "max_steps is specified, but it needs to be an integer" + " greater than 0" + ) + + variables_generators = [] + n = 1 + for var in input_variables_names: + if var not in self._flattened_variables: + raise ValueError( + "Variable " + var + " not found in the optimization variables." + ) + var_tuple = self._flattened_variables[var] + var_n = var_tuple[0] + n = var_n if n == 1 or 1 < var_n < n else n + + # With var_tuple[1]() we get a new generator for the specific variable + variables_generators.append(var_tuple[1]()) + + if max_iterations_set: + n = min(max_n, n) + + for i in range(n): + x_k = [next(var) for var in variables_generators] + + name = base_name + "[" + str(i) + "]" + + if i == 0 and not apply_to_first_elements and not n == 1: + continue + + # In the following, we add the expressions through the problem + # interface, rather than the solver interface. In this way, we can exploit + # the machinery handling the generators, and we can switch the expression + # from constraints to costs + self.get_problem().add_expression( + mode=mode, + expression=cs.substitute([expression], input_variables, x_k)[0], + name=name, + **kwargs + ) + + def initial(self, variable: str | cs.MX) -> cs.MX: + """ + Get the initial value of a variable + :param variable: The name of the flattened variable. + If the variable is nested, you can use "." as separator + (e.g. "a.b" will look for variable b within a). + If there is a list, you can use "[k]" with "k" the element + to pick. For example "a.b[k].c" will look for variable "c" + defined in the k-th element of "b" within "a". + As an alternative it is possible to use the corresponding + variable from the symbolic structure. + :return: The first value of the variable + """ + if isinstance(variable, cs.MX): + variable = variable.name() + + if variable not in self._flattened_variables: + raise ValueError( + "Variable " + variable + " not found in the optimization variables." + ) + + return next(self._flattened_variables[variable][1]()) + + def final(self, variable: str | cs.MX) -> cs.MX: + """ + Get the final value of a variable + :param variable: The name of the flattened variable. + If the variable is nested, you can use "." as separator + (e.g. "a.b" will look for variable b within a). + If there is a list, you can use "[k]" with "k" the element + to pick. For example "a.b[k].c" will look for variable "c" + defined in the k-th element of "b" within "a". + As an alternative it is possible to use the corresponding + variable from the symbolic structure. + :return: The last value of the variable + """ + if isinstance(variable, cs.MX): + variable = variable.name() + + if variable not in self._flattened_variables: + raise ValueError( + "Variable " + variable + " not found in the optimization variables." + ) + + flattened = self._flattened_variables[variable] + + if flattened[0] == 1: + return next(flattened[1]()) + else: + generator = flattened[1]() + final_value = None + for value in generator: + final_value = value + return final_value + def set_initial_guess( self, initial_guess: TOptimizationObject | list[TOptimizationObject] ): self._optimization_solver.set_initial_guess(initial_guess=initial_guess) + def get_initial_guess(self) -> TOptimizationObject | list[TOptimizationObject]: + return self._optimization_solver.get_initial_guess() + def solve(self) -> None: self._optimization_solver.solve() diff --git a/src/hippopt/base/opti_callback.py b/src/hippopt/base/opti_callback.py new file mode 100644 index 00000000..3cfc2576 --- /dev/null +++ b/src/hippopt/base/opti_callback.py @@ -0,0 +1,361 @@ +import abc +import logging +import weakref +from typing import final + +import casadi as cs + + +class Callback(cs.OptiCallback, abc.ABC): + """Abstract class of an Opti callback.""" + + def __init__(self) -> None: + cs.OptiCallback.__init__(self) + + @final + def __call__(self, i: int) -> None: + self.call(i) + + @abc.abstractmethod + def call(self, i) -> None: + pass + + +class CallbackCriterion(abc.ABC): + """""" + + def __init__(self) -> None: + """""" + self.opti = None + + @abc.abstractmethod + def satisfied(self) -> bool: + pass + + @abc.abstractmethod + def update(self) -> None: + pass + + @abc.abstractmethod + def reset(self) -> None: + pass + + def __or__( + self, stopping_criterion: "CallbackCriterion" + ) -> "CombinedCallbackCriterion": + if not isinstance(stopping_criterion, CallbackCriterion): + raise TypeError(stopping_criterion) + + return OrCombinedCallbackCriterion(lhs=self, rhs=stopping_criterion) + + def __ror__(self, other): + return self.__or__(other) + + def __and__( + self, stopping_criterion: "CallbackCriterion" + ) -> "CombinedCallbackCriterion": + if not isinstance(stopping_criterion, CallbackCriterion): + raise TypeError(stopping_criterion) + + return AndCombinedCallbackCriterion(lhs=self, rhs=stopping_criterion) + + def __rand__(self, other): + return self.__and__(other) + + def set_opti(self, opti: cs.Opti) -> None: + """""" + # In theory, the callback is included in opti, + # so the weakref is to avoid circular references + self.opti = weakref.proxy(opti) + self.reset() + + +class BestCost(CallbackCriterion): + """""" + + def __init__(self) -> None: + """""" + + CallbackCriterion.__init__(self) + + self.best_cost = None + self.reset() + + @final + def reset(self) -> None: + """""" + + self.best_cost = cs.inf + + @final + def satisfied(self) -> bool: + """""" + + return self._get_current_cost() < self.best_cost + + @final + def update(self) -> None: + """""" + + best_cost = self._get_current_cost() + + _logger = logging.getLogger(f"[hippopt::{self.__class__.__name__}]") + _logger.debug(f"New best cost: {best_cost} (old: {self.best_cost})") + + self.best_cost = self._get_current_cost() + + def _get_current_cost(self) -> float: + """""" + + return self.opti.debug.value(self.opti.f) + + +class AcceptableCost(CallbackCriterion): + """""" + + def __init__(self, acceptable_cost: float = cs.inf) -> None: + """""" + + CallbackCriterion.__init__(self) + + self.acceptable_cost = acceptable_cost + + self.best_acceptable_cost = None + self.reset() + + @final + def reset(self) -> None: + """""" + + self.best_acceptable_cost = cs.inf + + def satisfied(self) -> bool: + """""" + + return self._get_current_cost() < self.acceptable_cost + + def update(self) -> None: + """""" + + current_cost = self._get_current_cost() + + if current_cost < self.best_acceptable_cost: + _logger = logging.getLogger(f"[hippopt::{self.__class__.__name__}]") + _logger.debug( + f"[New acceptable cost: {current_cost}" + f" (old: {self.best_acceptable_cost})" + ) + + self.best_acceptable_cost = current_cost + + def _get_current_cost(self) -> float: + """""" + + return self.opti.debug.value(self.opti.f) + + +class AcceptablePrimalInfeasibility(CallbackCriterion): + """""" + + def __init__(self, acceptable_primal_infeasibility: float = cs.inf) -> None: + """""" + + CallbackCriterion.__init__(self) + + self.acceptable_primal_infeasibility = acceptable_primal_infeasibility + + self.best_acceptable_primal_infeasibility = None + self.reset() + + @final + def reset(self) -> None: + """""" + + self.best_acceptable_primal_infeasibility = cs.inf + + def satisfied(self) -> bool: + """""" + + return ( + self._get_current_primal_infeasibility() + < self.acceptable_primal_infeasibility + ) + + def update(self) -> None: + """""" + + current_primal_infeasibility = self._get_current_primal_infeasibility() + + if current_primal_infeasibility < self.best_acceptable_primal_infeasibility: + _logger = logging.getLogger(f"[hippopt::{self.__class__.__name__}]") + _logger.debug( + f"New acceptable primal infeasibility: " + f"{current_primal_infeasibility} " + f"(old: {self.best_acceptable_primal_infeasibility})" + ) + + self.best_acceptable_primal_infeasibility = current_primal_infeasibility + + def _get_current_primal_infeasibility(self) -> float: + """""" + + return self.opti.debug.stats()["iterations"]["inf_pr"][-1] + + +class BestPrimalInfeasibility(CallbackCriterion): + """""" + + def __init__(self) -> None: + """""" + + CallbackCriterion.__init__(self) + + self.best_primal_infeasibility = None + self.reset() + + @final + def reset(self) -> None: + """""" + + self.best_primal_infeasibility = cs.inf + + def satisfied(self) -> bool: + """""" + + return self._get_current_primal_infeasibility() < self.best_primal_infeasibility + + def update(self) -> None: + """""" + + best_primal_infeasibility = self._get_current_primal_infeasibility() + + _logger = logging.getLogger(f"[hippopt::{self.__class__.__name__}]") + _logger.debug( + f"New best primal infeasibility: {best_primal_infeasibility}" + f" (old: {self.best_primal_infeasibility})" + ) + + self.best_primal_infeasibility = best_primal_infeasibility + + def _get_current_primal_infeasibility(self) -> float: + """""" + + return self.opti.debug.stats()["iterations"]["inf_pr"][-1] + + +class CombinedCallbackCriterion(CallbackCriterion, abc.ABC): + """""" + + def __init__(self, lhs: CallbackCriterion, rhs: CallbackCriterion) -> None: + """""" + + CallbackCriterion.__init__(self) + self.lhs = lhs + self.rhs = rhs + + @final + def reset(self) -> None: + """""" + + self.lhs.reset() + self.rhs.reset() + + @final + def update(self) -> None: + """""" + + self.lhs.update() + self.rhs.update() + + @final + def set_opti(self, opti: cs.Opti) -> None: + """""" + + self.lhs.set_opti(opti) + self.rhs.set_opti(opti) + + +class OrCombinedCallbackCriterion(CombinedCallbackCriterion): + """""" + + @final + def satisfied(self) -> bool: + """""" + + return self.lhs.satisfied() or self.rhs.satisfied() + + +class AndCombinedCallbackCriterion(CombinedCallbackCriterion): + """""" + + @final + def satisfied(self) -> bool: + """""" + + return self.lhs.satisfied() and self.rhs.satisfied() + + +class SaveBestUnsolvedVariablesCallback(Callback): + """Class to save the best unsolved variables.""" + + def __init__( + self, + criterion: CallbackCriterion, + opti: cs.Opti, + variables: list[cs.MX], + parameters: list[cs.MX], + costs: list[cs.MX], + constraints: list[cs.MX], + ) -> None: + """""" + + Callback.__init__(self) + + self.criterion = criterion + # In theory, the callback is included in opti, + # so the weakref is to avoid circular references + self.opti = weakref.proxy(opti) + self.criterion.set_opti(opti) + self.variables = variables + self.parameters = parameters + self.cost = costs + self.constraints = constraints + + self.best_iteration = None + self.best_objects = {} + self.best_cost = None + self.best_cost_values = {} + self.best_constraint_multipliers = {} + self.ignore_map = {obj: False for obj in self.variables + self.parameters} + + def call(self, i: int) -> None: + """""" + + if self.criterion.satisfied(): + self.criterion.update() + + _logger = logging.getLogger(f"[hippopt::{self.__class__.__name__}]") + _logger.info(f"[i={i}] New best intermediate variables") + + self.best_iteration = i + self.best_cost = self.opti.debug.value(self.opti.f) + for variable in self.variables: + if self.ignore_map[variable]: + continue + try: + self.best_objects[variable] = self.opti.debug.value(variable) + except Exception as err: # noqa + self.ignore_map[variable] = True + for parameter in self.parameters: + if self.ignore_map[parameter]: + continue + self.best_objects[parameter] = self.opti.debug.value(parameter) + self.ignore_map[parameter] = True # Parameters are saved only once + + self.best_cost_values = { + cost: self.opti.debug.value(cost) for cost in self.cost + } + self.best_constraint_multipliers = { + constraint: self.opti.debug.value(self.opti.dual(constraint)) + for constraint in self.constraints + } diff --git a/src/hippopt/base/opti_solver.py b/src/hippopt/base/opti_solver.py index a73ae00b..715c974a 100644 --- a/src/hippopt/base/opti_solver.py +++ b/src/hippopt/base/opti_solver.py @@ -1,10 +1,15 @@ import copy import dataclasses +import logging from typing import Any, ClassVar import casadi as cs import numpy as np +from hippopt.base.opti_callback import ( + CallbackCriterion, + SaveBestUnsolvedVariablesCallback, +) from hippopt.base.optimization_object import ( OptimizationObject, StorageType, @@ -21,14 +26,31 @@ class OptiFailure(Exception): + def __init__(self, message: Exception, callback_used: bool): + callback_info = "" + if callback_used: + callback_info = ( + " and the callback did not manage to save an intermediate solution" + ) + super().__init__( + f"Opti failed to solve the problem{callback_info}. Message: {str(message)}" + ) + + +class InitialGuessFailure(Exception): def __init__(self, message: Exception): - super().__init__("Opti failed to solve the problem. Message: " + str(message)) + super().__init__( + f"Failed to set the initial guess. Message: {message}. " + "Use 'fill_initial_guess=False' to avoid filling the " + "initial guess automatically." + ) @dataclasses.dataclass class OptiSolver(OptimizationSolver): DefaultSolverType: ClassVar[str] = "ipopt" _inner_solver: str = dataclasses.field(default=DefaultSolverType) + inner_solver: dataclasses.InitVar[str] = dataclasses.field(default=None) problem_type: dataclasses.InitVar[str] = dataclasses.field(default="nlp") _options_plugin: dict[str, Any] = dataclasses.field(default_factory=dict) @@ -39,12 +61,22 @@ class OptiSolver(OptimizationSolver): options_plugin: dataclasses.InitVar[dict[str, Any]] = dataclasses.field( default=None ) + _callback_criterion: CallbackCriterion = dataclasses.field(default=None) + callback_criterion: dataclasses.InitVar[CallbackCriterion] = dataclasses.field( + default=None + ) + _callback: SaveBestUnsolvedVariablesCallback = dataclasses.field(default=None) + _callback_save_costs: bool = dataclasses.field(default=True) + _callback_save_constraint_multipliers: bool = dataclasses.field(default=True) + callback_save_costs: dataclasses.InitVar[bool] = dataclasses.field(default=None) + callback_save_constraint_multipliers: dataclasses.InitVar[bool] = dataclasses.field( + default=None + ) _cost: cs.MX = dataclasses.field(default=None) _cost_expressions: dict[str, cs.MX] = dataclasses.field(default=None) _constraint_expressions: dict[str, cs.MX] = dataclasses.field(default=None) _solver: cs.Opti = dataclasses.field(default=None) - _opti_solution: cs.OptiSol = dataclasses.field(default=None) _output_solution: TOptimizationObject | list[ TOptimizationObject ] = dataclasses.field(default=None) @@ -55,14 +87,29 @@ class OptiSolver(OptimizationSolver): default=None ) _problem: Problem = dataclasses.field(default=None) + _guess: TOptimizationObject | list[TOptimizationObject] = dataclasses.field( + default=None + ) + _objects_type_map: dict[cs.MX, str] = dataclasses.field(default=None) + _free_parameters: list[str] = dataclasses.field(default=None) + _parameters_map: dict[cs.MX, str] = dataclasses.field(default=None) + _variables_map: dict[cs.MX, str] = dataclasses.field(default=None) + _logger: logging.Logger = dataclasses.field(default=None) def __post_init__( self, - problem_type: str, + inner_solver: str = DefaultSolverType, + problem_type: str = "nlp", options_solver: dict[str, Any] = None, options_plugin: dict[str, Any] = None, + callback_criterion: CallbackCriterion = None, + callback_save_costs: bool = True, + callback_save_constraint_multipliers: bool = True, ): self._solver = cs.Opti(problem_type) + self._inner_solver = ( + inner_solver if inner_solver is not None else self.DefaultSolverType + ) self._options_solver = ( options_solver if isinstance(options_solver, dict) else {} ) @@ -72,8 +119,18 @@ def __post_init__( self._solver.solver( self._inner_solver, self._options_plugin, self._options_solver ) + self._callback_criterion = callback_criterion + self._callback_save_costs = callback_save_costs + self._callback_save_constraint_multipliers = ( + callback_save_constraint_multipliers + ) self._cost_expressions = {} self._constraint_expressions = {} + self._objects_type_map = {} + self._free_parameters = [] + self._parameters_map = {} + self._variables_map = {} + self._logger = logging.getLogger("[hippopt::OptiSolver]") def _generate_opti_object( self, storage_type: str, name: str, value: StorageType @@ -86,19 +143,37 @@ def _generate_opti_object( raise ValueError( "Field " + name + " has number of dimensions greater than 2." ) + if value.ndim == 0: + raise ValueError("Field " + name + " is a zero-dimensional vector.") + if value.ndim < 2: value = np.expand_dims(value, axis=1) + if isinstance(value, float): + value = value * np.ones((1, 1)) + if storage_type is Variable.StorageTypeValue: - return self._solver.variable(*value.shape) + self._logger.debug("Creating variable " + name) + opti_object = self._solver.variable(*value.shape) + self._objects_type_map[opti_object] = Variable.StorageTypeValue + self._variables_map[opti_object] = name + return opti_object if storage_type is Parameter.StorageTypeValue: - return self._solver.parameter(*value.shape) + self._logger.debug("Creating parameter " + name) + opti_object = self._solver.parameter(*value.shape) + self._objects_type_map[opti_object] = Parameter.StorageTypeValue + self._free_parameters.append(name) + self._parameters_map[opti_object] = name + return opti_object raise ValueError("Unsupported input storage type") def _generate_objects_from_instance( - self, input_structure: TOptimizationObject + self, + input_structure: TOptimizationObject, + parent_metadata: dict, + base_name: str, ) -> TOptimizationObject: output = copy.deepcopy(input_structure) @@ -110,30 +185,75 @@ def _generate_objects_from_instance( isinstance(elem, OptimizationObject) or isinstance(elem, list) for elem in composite_value ) + list_of_float = is_list and all( + isinstance(elem, float) for elem in composite_value + ) + if list_of_float: + composite_value = np.array(composite_value) + is_list = False if ( isinstance(composite_value, OptimizationObject) or list_of_optimization_objects ): + new_parent_metadata = parent_metadata + has_composite_metadata = ( + OptimizationObject.CompositeTypeField in field.metadata + and field.metadata[OptimizationObject.CompositeTypeField] + is not None + ) + if has_composite_metadata: + composite_metadata = field.metadata[ + OptimizationObject.CompositeTypeField + ] + use_old_metadata = ( + parent_metadata is not None + and OptimizationObject.OverrideIfCompositeField + in composite_metadata + and composite_metadata[ + OptimizationObject.OverrideIfCompositeField + ] + ) + + if not use_old_metadata: + new_parent_metadata = composite_metadata + output.__setattr__( - field.name, self.generate_optimization_objects(composite_value) + field.name, + self.generate_optimization_objects( + input_structure=composite_value, + fill_initial_guess=False, + _parent_metadata=new_parent_metadata, + _base_name=base_name + field.name + ".", + ), ) continue if OptimizationObject.StorageTypeField in field.metadata: - value_list = [] - value_field = dataclasses.asdict(output)[field.name] - value_list.append(value_field) - - value_list = value_field if is_list else value_list + value_list = composite_value if is_list else [composite_value] output_value = [] for value in value_list: + should_override = ( + OptimizationObject.OverrideIfCompositeField in field.metadata + and field.metadata[OptimizationObject.OverrideIfCompositeField] + ) + parent_can_override = ( + parent_metadata is not None + and OptimizationObject.StorageTypeField in parent_metadata + ) + if should_override and parent_can_override: + storage_type = parent_metadata[ + OptimizationObject.StorageTypeField + ] + else: + storage_type = field.metadata[ + OptimizationObject.StorageTypeField + ] + output_value.append( self._generate_opti_object( - storage_type=field.metadata[ - OptimizationObject.StorageTypeField - ], - name=field.name, + storage_type=storage_type, + name=base_name + field.name, value=value, ) ) @@ -147,28 +267,53 @@ def _generate_objects_from_instance( return output def _generate_objects_from_list( - self, input_structure: list[TOptimizationObject] + self, + input_structure: list[TOptimizationObject], + parent_metadata: dict, + base_name: str, ) -> list[TOptimizationObject]: assert isinstance(input_structure, list) output = copy.deepcopy(input_structure) for i in range(len(output)): - output[i] = self.generate_optimization_objects(output[i]) + output[i] = self.generate_optimization_objects( + input_structure=output[i], + fill_initial_guess=False, + _parent_metadata=parent_metadata, + _base_name=base_name + "[" + str(i) + "].", + ) self._variables = output return output + def _get_opti_solution( + self, variable: cs.MX, input_solution: cs.OptiSol | dict + ) -> StorageType: + try: + if isinstance(input_solution, dict): + return input_solution[variable] + return input_solution.value(variable) + except Exception as err: # noqa + self._logger.debug( + "Failed to get the solution for variable " + + self._variables_map[variable] + + ". Message: " + + str(err) + ) + return None + def _generate_solution_output( self, variables: TOptimizationObject | list[TOptimizationObject] | list[list[TOptimizationObject]], + input_solution: cs.OptiSol | dict, ) -> TOptimizationObject | list[TOptimizationObject]: output = copy.deepcopy(variables) if isinstance(variables, list): for i in range(len(variables)): - output[i] = self._generate_solution_output(variables[i]) + output[i] = self._generate_solution_output(variables[i], input_solution) return output for field in dataclasses.fields(variables): @@ -188,9 +333,11 @@ def _generate_solution_output( if isinstance(var, list): output_val = [] for el in var: - output_val.append(np.array(self._opti_solution.value(el))) + output_val.append( + np.array(self._get_opti_solution(el, input_solution)) + ) else: - output_val = np.array(self._opti_solution.value(var)) + output_val = np.array(self._get_opti_solution(var, input_solution)) output.__setattr__(field.name, output_val) continue @@ -208,19 +355,29 @@ def _generate_solution_output( or list_of_optimization_objects ): output.__setattr__( - field.name, self._generate_solution_output(composite_variable) + field.name, + self._generate_solution_output(composite_variable, input_solution), ) return output - def _set_opti_guess( - self, storage_type: str, variable: cs.MX, value: np.ndarray - ) -> None: - match storage_type: + def _set_opti_guess(self, variable: cs.MX, value: np.ndarray) -> None: + match self._objects_type_map[variable]: case Variable.StorageTypeValue: + self._logger.debug( + "Setting initial value for variable " + + self._variables_map[variable] + ) self._solver.set_initial(variable, value) case Parameter.StorageTypeValue: + self._logger.debug( + "Setting initial value for parameter " + + self._parameters_map[variable] + ) self._solver.set_value(variable, value) + parameter_name = self._parameters_map[variable] + if parameter_name in self._free_parameters: + self._free_parameters.remove(parameter_name) return @@ -260,6 +417,16 @@ def _set_initial_guess_internal( ) return + # Check that the initial guess is an optimization object + if not isinstance(initial_guess, OptimizationObject): + raise ValueError( + "The initial guess for the variable " + + base_name + + " is not an optimization object." + + " It is of type " + + str(type(initial_guess)) + ) + for field in dataclasses.fields(initial_guess): guess = initial_guess.__getattribute__(field.name) @@ -280,83 +447,43 @@ def _set_initial_guess_internal( ) if isinstance(corresponding_value, list): - if not isinstance(guess, list): - raise ValueError( - "The guess for the field " - + base_name - + field.name - + " is supposed to be a list." - ) - - if len(corresponding_value) == len(guess): - raise ValueError( - "The guess for the field " - + base_name - + field.name - + " is a list of the wrong size. Expected: " - + str(len(corresponding_value)) - + ". Guess: " - + str(len(guess)) - ) - - for i in range(len(corresponding_value)): - if not isinstance(guess[i], np.ndarray): - raise ValueError( - "The guess for the field " - + base_name - + field.name - + "[" - + str(i) - + "] is not an numpy array." - ) + self._set_list_object_guess_internal( + base_name, corresponding_value, field, guess + ) + continue - input_shape = ( - guess[i].shape - if len(guess[i].shape) > 1 - else (guess[i].shape[0], 1) - ) + if isinstance(guess, float): + guess = guess * np.ones((1, 1)) - if corresponding_value[i].shape != input_shape: - raise ValueError( - "The dimension of the guess for the field " - + base_name - + field.name - + "[" - + str(i) - + "] does not match with the corresponding optimization variable" - ) + if isinstance(guess, list) and all( + isinstance(elem, float) or isinstance(elem, int) for elem in guess + ): + guess = np.array(guess) - self._set_opti_guess( - storage_type=field.metadata[ - OptimizationObject.StorageTypeField - ], - variable=corresponding_value[i], - value=guess[i], - ) - continue - - if not isinstance(guess, np.ndarray): + if not isinstance(guess, np.ndarray) and not isinstance(guess, cs.DM): raise ValueError( "The guess for the field " + base_name + field.name - + " is not an numpy array." + + " is neither an numpy nor a DM array." ) + if len(guess.shape) == 0: + continue + input_shape = ( guess.shape if len(guess.shape) > 1 else (guess.shape[0], 1) ) if corresponding_value.shape != input_shape: raise ValueError( - "The guess has the field " - + base_name - + field.name - + " but its dimension does not match with the corresponding optimization variable" + f"The guess has the field {base_name}{field.name} " + f"but its dimension ({input_shape}) does not match with the" + f" corresponding optimization variable " + f"({corresponding_value.shape})." ) self._set_opti_guess( - storage_type=field.metadata[OptimizationObject.StorageTypeField], variable=corresponding_value, value=guess, ) @@ -364,14 +491,18 @@ def _set_initial_guess_internal( composite_variable_guess = initial_guess.__getattribute__(field.name) - if isinstance(composite_variable_guess, OptimizationObject): - if not hasattr(corresponding_variable, field.name): - raise ValueError( - "The guess has the field " - + base_name - + field.name - + " but it is not present in the optimization structure" - ) + if not isinstance( + composite_variable_guess, OptimizationObject + ) and not isinstance(composite_variable_guess, list): + continue + + if not hasattr(corresponding_variable, field.name): + raise ValueError( + "The guess has the field " + + base_name + + field.name + + " but it is not present in the optimization structure" + ) self._set_initial_guess_internal( initial_guess=composite_variable_guess, @@ -382,12 +513,107 @@ def _set_initial_guess_internal( ) continue + def _set_list_object_guess_internal( + self, + base_name: str, + corresponding_value: list, + field: dataclasses.Field, + guess: list, + ) -> None: + if not isinstance(guess, list): + raise ValueError( + "The guess for the field " + + base_name + + field.name + + " is supposed to be a list. " + + "Received " + + str(type(guess)) + + " instead." + ) + if len(corresponding_value) != len(guess): + raise ValueError( + "The guess for the field " + + base_name + + field.name + + " is a list of the wrong size. Expected: " + + str(len(corresponding_value)) + + ". Guess: " + + str(len(guess)) + ) + for i in range(len(corresponding_value)): + value = guess[i] + if isinstance(value, float): + value = value * np.ones((1, 1)) + + if not isinstance(value, np.ndarray): + raise ValueError( + "The field " + + base_name + + field.name + + "[" + + str(i) + + "] is marked as a variable or a parameter. Its guess " + + "is supposed to be an array (or even a float if scalar)." + ) + + input_shape = value.shape if len(value.shape) > 1 else (value.shape[0], 1) + + if corresponding_value[i].shape != input_shape: + raise ValueError( + "The dimension of the guess for the field " + + base_name + + field.name + + "[" + + str(i) + + "] does not match with the corresponding" + + " optimization variable" + ) + + self._set_opti_guess( + variable=corresponding_value[i], + value=value, + ) + def generate_optimization_objects( self, input_structure: TOptimizationObject | list[TOptimizationObject], **kwargs ) -> TOptimizationObject | list[TOptimizationObject]: + if not isinstance(input_structure, OptimizationObject) and not isinstance( + input_structure, list + ): + raise ValueError( + "The input structure is neither an optimization object nor a list." + ) + + parent_metadata = ( + kwargs["_parent_metadata"] if "_parent_metadata" in kwargs else None + ) + + base_name = kwargs["_base_name"] if "_base_name" in kwargs else "" + if isinstance(input_structure, OptimizationObject): - return self._generate_objects_from_instance(input_structure=input_structure) - return self._generate_objects_from_list(input_structure=input_structure) + output = self._generate_objects_from_instance( + input_structure=input_structure, + parent_metadata=parent_metadata, + base_name=base_name, + ) + else: + output = self._generate_objects_from_list( + input_structure=input_structure, + parent_metadata=parent_metadata, + base_name=base_name, + ) + + fill_initial_guess = ( + kwargs["fill_initial_guess"] if "fill_initial_guess" in kwargs else True + ) + + if fill_initial_guess: + try: + self.set_initial_guess(initial_guess=input_structure) + except Exception as err: + raise InitialGuessFailure(err) + + return output def get_optimization_objects( self, @@ -409,6 +635,11 @@ def set_initial_guess( initial_guess=initial_guess, corresponding_variable=self._variables ) + self._guess = initial_guess + + def get_initial_guess(self) -> TOptimizationObject | list[TOptimizationObject]: + return self._guess + def set_opti_options( self, inner_solver: str = None, @@ -429,20 +660,88 @@ def set_opti_options( def solve(self) -> None: self._cost = self._cost if self._cost is not None else cs.MX(0) self._solver.minimize(self._cost) + if len(self._free_parameters): + raise ValueError( + "The following parameters are not set: " + str(self._free_parameters) + ) + use_callback = self._callback_criterion is not None + if use_callback: + variables = [] + parameters = [] + for obj in self._objects_type_map: + if self._objects_type_map[obj] is Variable.StorageTypeValue: + variables.append(obj) + elif self._objects_type_map[obj] is Parameter.StorageTypeValue: + parameters.append(obj) + + self._callback = SaveBestUnsolvedVariablesCallback( + criterion=self._callback_criterion, + opti=self._solver, + variables=variables, + parameters=parameters, + costs=list(self._cost_expressions.values()) + if self._callback_save_costs + else [], + constraints=list(self._constraint_expressions.values()) + if self._callback_save_constraint_multipliers + else [], + ) + self._solver.callback(self._callback) try: - self._opti_solution = self._solver.solve() + opti_solution = self._solver.solve() except Exception as err: # noqa - raise OptiFailure(message=err) + if use_callback and self._callback.best_iteration is not None: + self._logger.warning( + "Opti failed to solve the problem, but the callback managed to save" + " an intermediate solution at " + f"iteration {self._callback.best_iteration}." + ) + self._output_cost = self._callback.best_cost + self._output_solution = self._generate_solution_output( + variables=self._variables, + input_solution=self._callback.best_objects, + ) + self._cost_values = ( + { + name: float( + self._callback.best_cost_values[ + self._cost_expressions[name] + ] + ) + for name in self._cost_expressions + } + if self._callback_save_costs + else {} + ) + self._constraint_values = ( + { + name: np.array( + ( + self._callback.best_constraint_multipliers[ + self._constraint_expressions[name] + ] + ) + ) + for name in self._constraint_expressions + } + if self._callback_save_constraint_multipliers + else {} + ) + return + + raise OptiFailure(message=err, callback_used=use_callback) - self._output_cost = self._opti_solution.value(self._cost) - self._output_solution = self._generate_solution_output(self._variables) + self._output_cost = opti_solution.value(self._cost) + self._output_solution = self._generate_solution_output( + variables=self._variables, input_solution=opti_solution + ) self._cost_values = { - name: float(self._opti_solution.value(self._cost_expressions[name])) + name: float(opti_solution.value(self._cost_expressions[name])) for name in self._cost_expressions } self._constraint_values = { name: np.array( - self._opti_solution.value( + opti_solution.value( self._solver.dual(self._constraint_expressions[name]) ) ) @@ -498,3 +797,11 @@ def get_cost_values(self) -> dict[str, float]: def get_constraint_multipliers(self) -> dict[str, np.ndarray]: return self._constraint_values + + def get_object_type(self, obj: cs.MX) -> str: + if obj not in self._objects_type_map: + raise ValueError("The object is not an optimization object.") + return self._objects_type_map[obj] + + def get_free_parameters_names(self) -> list[str]: + return self._free_parameters diff --git a/src/hippopt/base/optimal_control_problem.py b/src/hippopt/base/optimal_control_problem.py index 615ae3f3..610ad26d 100644 --- a/src/hippopt/base/optimal_control_problem.py +++ b/src/hippopt/base/optimal_control_problem.py @@ -16,6 +16,36 @@ ) +@dataclasses.dataclass +class OptimalControlProblemInstance: + problem: TOptimalControlProblem = dataclasses.field(default=None) + all_variables: TInputObjects = dataclasses.field(default=None) + symbolic_structure: TInputObjects = dataclasses.field(default=None) + + _problem: dataclasses.InitVar[TOptimalControlProblem] = dataclasses.field( + default=None + ) + _all_variables: dataclasses.InitVar[TInputObjects] = dataclasses.field(default=None) + _symbolic_structure: dataclasses.InitVar[TInputObjects] = dataclasses.field( + default=None + ) + + def __post_init__( + self, + _problem: TOptimalControlProblem, + _all_variables: TInputObjects, + _symbolic_structure: TInputObjects, + ): + self.problem = _problem + self.all_variables = _all_variables + self.symbolic_structure = _symbolic_structure + + def __iter__(self): + return iter([self.problem, self.all_variables, self.symbolic_structure]) + # Cannot use convert to tuple here since it would perform a deepcopy + # and would include InitVars too + + @dataclasses.dataclass class OptimalControlProblem(Problem[TOptimalControlSolver, TInputObjects]): optimal_control_solver: dataclasses.InitVar[ @@ -39,19 +69,23 @@ def create( input_structure: TInputObjects, optimal_control_solver: TOptimalControlSolver = None, **kwargs - ) -> tuple[TOptimalControlProblem, TInputObjects]: + ) -> OptimalControlProblemInstance: new_problem = cls( optimal_control_solver=optimal_control_solver, ) new_problem._solver.generate_optimization_objects( input_structure=input_structure, **kwargs ) - return new_problem, new_problem._solver.get_optimization_objects() + return OptimalControlProblemInstance( + _problem=new_problem, + _all_variables=new_problem._solver.get_optimization_objects(), + _symbolic_structure=new_problem._solver.get_symbolic_structure(), + ) def add_dynamics( self, dynamics: TDynamics, - x0: dict[str, cs.MX] = None, + x0: dict[str, cs.MX] | dict[cs.MX, cs.MX] | cs.MX = None, t0: cs.MX = cs.MX(0.0), mode: ExpressionType = ExpressionType.subject_to, name: str = None, @@ -67,3 +101,25 @@ def add_dynamics( x0_name=x0_name, **kwargs ) + + def add_expression_to_horizon( + self, + expression: cs.MX, + mode: ExpressionType = ExpressionType.subject_to, + apply_to_first_elements: bool = False, + name: str = None, + **kwargs + ) -> None: + self.solver().add_expression_to_horizon( + expression=expression, + mode=mode, + apply_to_first_elements=apply_to_first_elements, + name=name, + **kwargs + ) + + def initial(self, variable: str | cs.MX) -> cs.MX: + return self.solver().initial(variable=variable) + + def final(self, variable: str | cs.MX) -> cs.MX: + return self.solver().final(variable=variable) diff --git a/src/hippopt/base/optimal_control_solver.py b/src/hippopt/base/optimal_control_solver.py index 713d3072..83abf79e 100644 --- a/src/hippopt/base/optimal_control_solver.py +++ b/src/hippopt/base/optimal_control_solver.py @@ -5,6 +5,7 @@ import casadi as cs from .dynamics import TDynamics +from .optimization_object import TOptimizationObject from .optimization_solver import OptimizationSolver from .problem import ExpressionType @@ -17,7 +18,7 @@ class OptimalControlSolver(OptimizationSolver): def add_dynamics( self, dynamics: TDynamics, - x0: dict[str, cs.MX] = None, + x0: dict[str, cs.MX] | dict[cs.MX, cs.MX] | cs.MX = None, t0: cs.MX = cs.MX(0.0), mode: ExpressionType = ExpressionType.subject_to, name: str = None, @@ -25,3 +26,26 @@ def add_dynamics( **kwargs ) -> None: pass + + @abc.abstractmethod + def get_symbolic_structure(self) -> TOptimizationObject | list[TOptimizationObject]: + pass + + @abc.abstractmethod + def add_expression_to_horizon( + self, + expression: cs.MX, + mode: ExpressionType = ExpressionType.subject_to, + apply_to_first_elements: bool = False, + name: str = None, + **kwargs + ) -> None: + pass + + @abc.abstractmethod + def initial(self, variable: str | cs.MX) -> cs.MX: + pass + + @abc.abstractmethod + def final(self, variable: str | cs.MX) -> cs.MX: + pass diff --git a/src/hippopt/base/optimization_object.py b/src/hippopt/base/optimization_object.py index d852e227..17cefd14 100644 --- a/src/hippopt/base/optimization_object.py +++ b/src/hippopt/base/optimization_object.py @@ -7,7 +7,9 @@ import numpy as np TOptimizationObject = TypeVar("TOptimizationObject", bound="OptimizationObject") -StorageType = cs.MX | np.ndarray | list[cs.MX] | list[np.ndarray] +TGenericCompositeObject = TypeVar("TGenericCompositeObject") +CompositeType = TGenericCompositeObject | list[TGenericCompositeObject] +StorageType = CompositeType[cs.MX] | CompositeType[np.ndarray] | CompositeType[float] class TimeExpansion(Enum): @@ -21,10 +23,51 @@ class OptimizationObject(abc.ABC): StorageTypeField: ClassVar[str] = "StorageType" TimeDependentField: ClassVar[str] = "TimeDependent" TimeExpansionField: ClassVar[str] = "TimeExpansion" + OverrideIfCompositeField: ClassVar[str] = "OverrideIfComposite" + CompositeTypeField: ClassVar[str] = "CompositeType" StorageTypeMetadata: ClassVar[dict[str, Any]] = dict( - StorageType=StorageType, TimeDependent=False, TimeExpansion=TimeExpansion.List + StorageType=StorageTypeValue, + TimeDependent=False, + TimeExpansion=TimeExpansion.List, + OverrideIfComposite=False, ) + def to_list(self) -> list: + output_list = [] + for field in dataclasses.fields(self): + composite_value = self.__getattribute__(field.name) + + is_list = isinstance(composite_value, list) + list_of_optimization_objects = is_list and all( + isinstance(elem, OptimizationObject) or isinstance(elem, list) + for elem in composite_value + ) + list_of_float = is_list and all( + isinstance(elem, float) for elem in composite_value + ) + if list_of_float: + is_list = False + + if list_of_optimization_objects: + for elem in composite_value: + output_list += elem.to_list() + continue + + if isinstance(composite_value, OptimizationObject): + output_list += composite_value.to_list() + continue + + if OptimizationObject.StorageTypeField in field.metadata: + value_list = composite_value if is_list else [composite_value] + for value in value_list: + output_list.append(value) + continue + + return output_list + + def to_mx(self) -> cs.MX: + return cs.vertcat(*self.to_list()) + @classmethod def default_storage_metadata(cls, **kwargs) -> dict: pass @@ -42,3 +85,17 @@ def default_storage_field(cls: Type[OptimizationObject], **kwargs): def time_varying_metadata(time_varying: bool = True): return {OptimizationObject.TimeDependentField: time_varying} + + +def default_composite_field( + cls: Type[OptimizationObject] = None, factory=None, time_varying: bool = True +): + cls_dict = time_varying_metadata(time_varying) + cls_dict[OptimizationObject.CompositeTypeField] = ( + cls.StorageTypeMetadata if cls is not None else None + ) + + return dataclasses.field( + default_factory=factory, + metadata=cls_dict, + ) diff --git a/src/hippopt/base/optimization_problem.py b/src/hippopt/base/optimization_problem.py index 218fe214..4426eecc 100644 --- a/src/hippopt/base/optimization_problem.py +++ b/src/hippopt/base/optimization_problem.py @@ -8,6 +8,30 @@ TOptimizationProblem = TypeVar("TOptimizationProblem", bound="OptimizationProblem") +@dataclasses.dataclass +class OptimizationProblemInstance: + problem: TOptimizationProblem = dataclasses.field(default=None) + variables: TInputObjects = dataclasses.field(default=None) + + _problem: dataclasses.InitVar[TOptimizationProblem] = dataclasses.field( + default=None + ) + _variables: dataclasses.InitVar[TInputObjects] = dataclasses.field(default=None) + + def __post_init__( + self, + _problem: TOptimizationProblem, + _variables: TInputObjects, + ): + self.problem = _problem + self.variables = _variables + + def __iter__(self): + return iter([self.problem, self.variables]) + # Cannot convert to tuple here since it would perform a deepcopy + # and would include InitVars too + + @dataclasses.dataclass class OptimizationProblem(Problem[TOptimizationSolver, TInputObjects]): optimization_solver: dataclasses.InitVar[OptimizationSolver] = dataclasses.field( @@ -31,9 +55,12 @@ def create( input_structure: TInputObjects, optimization_solver: TOptimizationSolver = None, **kwargs - ) -> tuple[TOptimizationProblem, TInputObjects]: + ) -> OptimizationProblemInstance: new_problem = cls(optimization_solver=optimization_solver) new_problem._solver.generate_optimization_objects( input_structure=input_structure, **kwargs ) - return new_problem, new_problem._solver.get_optimization_objects() + return OptimizationProblemInstance( + _problem=new_problem, + _variables=new_problem._solver.get_optimization_objects(), + ) diff --git a/src/hippopt/base/optimization_solver.py b/src/hippopt/base/optimization_solver.py index 03f4c1c6..a4d742dc 100644 --- a/src/hippopt/base/optimization_solver.py +++ b/src/hippopt/base/optimization_solver.py @@ -49,6 +49,10 @@ def set_initial_guess( ) -> None: pass + @abc.abstractmethod + def get_initial_guess(self) -> TOptimizationObject | list[TOptimizationObject]: + pass + @abc.abstractmethod def solve(self) -> None: pass diff --git a/src/hippopt/base/parameter.py b/src/hippopt/base/parameter.py index 0cc3b26e..933791b3 100644 --- a/src/hippopt/base/parameter.py +++ b/src/hippopt/base/parameter.py @@ -16,6 +16,7 @@ class Parameter(OptimizationObject): StorageType=StorageTypeValue, TimeDependent=False, TimeExpansion=TimeExpansion.List, + OverrideIfComposite=False, ) @classmethod @@ -29,3 +30,15 @@ def default_storage_metadata( cls_dict[OptimizationObject.TimeExpansionField] = time_expansion return cls_dict + + +@dataclasses.dataclass +class OverridableParameter(Parameter): + """""" + + StorageTypeMetadata: ClassVar[dict[str, Any]] = dict( + StorageType=Parameter.StorageTypeValue, + TimeDependent=False, + TimeExpansion=TimeExpansion.List, + OverrideIfComposite=True, + ) diff --git a/src/hippopt/base/problem.py b/src/hippopt/base/problem.py index 6ef990ba..be6a6653 100644 --- a/src/hippopt/base/problem.py +++ b/src/hippopt/base/problem.py @@ -66,6 +66,9 @@ def set_initial_guess( ) -> None: self.solver().set_initial_guess(initial_guess) + def get_initial_guess(self) -> TOptimizationObject | list[TOptimizationObject]: + return self.solver().get_initial_guess() + def add_cost( self, expression: cs.MX | Generator[cs.MX, None, None], @@ -156,7 +159,7 @@ def get_constraint_expressions(self) -> dict[str, cs.MX]: def solver(self) -> TGenericSolver: return self._solver - def solve(self) -> Output: + def solve(self) -> Output[TInputObjects]: self.solver().solve() self._output = Output( _cost_value=self.solver().get_cost_value(), @@ -166,7 +169,7 @@ def solve(self) -> Output: ) return self._output - def get_output(self) -> Output: + def get_output(self) -> Output[TInputObjects]: if self._output is None: raise ProblemNotSolvedException diff --git a/src/hippopt/base/variable.py b/src/hippopt/base/variable.py index d51b3655..7353bb04 100644 --- a/src/hippopt/base/variable.py +++ b/src/hippopt/base/variable.py @@ -23,6 +23,7 @@ class Variable(OptimizationObject): TimeDependent=True, TimeExpansion=TimeExpansion.List, VariableType=VariableType.continuous, + OverrideIfComposite=False, ) @classmethod @@ -38,3 +39,15 @@ def default_storage_metadata( cls_dict[cls.VariableTypeField] = variable_type return cls_dict + + +@dataclasses.dataclass +class OverridableVariable(Variable): + """""" + + StorageTypeMetadata: ClassVar[dict[str, Any]] = dict( + StorageType=Variable.StorageTypeValue, + TimeDependent=True, + TimeExpansion=TimeExpansion.List, + OverrideIfComposite=True, + ) diff --git a/src/hippopt/integrators/__init__.py b/src/hippopt/integrators/__init__.py index efef41b4..0d50829e 100644 --- a/src/hippopt/integrators/__init__.py +++ b/src/hippopt/integrators/__init__.py @@ -1 +1,3 @@ from . import forward_euler, implicit_trapezoid +from .forward_euler import ForwardEuler +from .implicit_trapezoid import ImplicitTrapezoid diff --git a/test/test_integrators.py b/test/test_integrators.py index abbfbb6f..963e01e7 100644 --- a/test/test_integrators.py +++ b/test/test_integrators.py @@ -4,7 +4,8 @@ import casadi as cs import pytest -from hippopt import ForwardEuler, ImplicitTrapezoid, dot, step +from hippopt import dot, step +from hippopt.integrators import ForwardEuler, ImplicitTrapezoid def get_test_function() -> cs.Function: @@ -53,6 +54,18 @@ def test_dynamics_creation(): assert dynamics.time_name() == "time" +def test_dynamics_creation_with_mx(): + _x = cs.MX.sym("x", 1) + _lambda = cs.MX.sym("lambda", 1) + _time = cs.MX.sym("time", 1) + + dynamics = dot(_x, _time).equal(_lambda * _x, {"lam": "lambda"}) + + assert dynamics.state_variables() == ["x"] + assert dynamics.input_names() == ["lam", "x"] + assert dynamics.time_name() == "time" + + def test_forward_euler(): f = get_test_function() diff --git a/test/test_multiple_shooting.py b/test/test_multiple_shooting.py index 7d74a9a6..772147b9 100644 --- a/test/test_multiple_shooting.py +++ b/test/test_multiple_shooting.py @@ -5,8 +5,8 @@ import pytest from hippopt import ( + CompositeType, ExpressionType, - ForwardEuler, MultipleShootingSolver, OptimalControlProblem, OptimizationObject, @@ -14,8 +14,10 @@ StorageType, TimeExpansion, Variable, + default_composite_field, default_storage_field, dot, + integrators, time_varying_metadata, ) @@ -33,18 +35,16 @@ def __post_init__(self): @dataclasses.dataclass class MyCompositeTestVar(OptimizationObject): - composite: MyTestVarMS | list[MyTestVarMS] = dataclasses.field( - default_factory=MyTestVarMS, metadata=time_varying_metadata() - ) - fixed: MyTestVarMS | list[MyTestVarMS] = dataclasses.field( - default_factory=MyTestVarMS + composite: CompositeType[MyTestVarMS] = default_composite_field(factory=MyTestVarMS) + fixed: CompositeType[MyTestVarMS] = default_composite_field( + factory=MyTestVarMS, time_varying=False ) extended: StorageType = default_storage_field( cls=Variable, time_expansion=TimeExpansion.Matrix ) - composite_list: list[MyTestVarMS] | list[list[MyTestVarMS]] = dataclasses.field( - default=None, metadata=time_varying_metadata() + composite_list: CompositeType[list[MyTestVarMS]] = default_composite_field( + factory=list[MyTestVarMS] ) fixed_list: list[MyTestVarMS] = dataclasses.field(default=None) @@ -109,21 +109,21 @@ def test_composite_variables_custom_horizon(): def test_flattened_variables_simple(): horizon_len = 10 - problem, var = OptimalControlProblem.create( + problem, var, _ = OptimalControlProblem.create( input_structure=MyTestVarMS(), horizon=horizon_len ) var_flat = problem.solver().get_flattened_optimization_objects() assert "string" not in var_flat - assert var_flat[0]["variable"][0] == horizon_len - assert var_flat[0]["parameter"][0] == 1 - assert next(var_flat[0]["parameter"][1]()) is var.parameter + assert var_flat["variable"][0] == horizon_len + assert var_flat["parameter"][0] == 1 + assert next(var_flat["parameter"][1]()) is var.parameter assert ( - next(var_flat[0]["parameter"][1]()) is var.parameter + next(var_flat["parameter"][1]()) is var.parameter ) # check that we can use the generator twice - variable_gen = var_flat[0]["variable"][1]() + variable_gen = var_flat["variable"][1]() assert all(next(variable_gen) is v for v in var.variable) - variable_gen = var_flat[0]["variable"][1]() + variable_gen = var_flat["variable"][1]() assert all( next(variable_gen) is v for v in var.variable ) # check that we can use the generator twice @@ -136,52 +136,75 @@ def test_flattened_variables_composite(): for _ in range(3): structure.append(MyCompositeTestVar()) - problem, var = OptimalControlProblem.create( + problem, var, _ = OptimalControlProblem.create( input_structure=structure, horizon=horizon_len ) var_flat = problem.solver().get_flattened_optimization_objects() - assert len(var_flat) == 3 assert len(var) == 3 for j in range(3): - assert var_flat[j]["composite.variable"][0] == horizon_len - assert var_flat[j]["composite.parameter"][0] == horizon_len - par_gen = var_flat[j]["composite.parameter"][1]() + assert var_flat["[" + str(j) + "].composite.variable"][0] == horizon_len + assert var_flat["[" + str(j) + "].composite.parameter"][0] == horizon_len + par_gen = var_flat["[" + str(j) + "].composite.parameter"][1]() assert all(next(par_gen) is c.parameter for c in var[j].composite) - variable_gen = var_flat[j]["composite.variable"][1]() + variable_gen = var_flat["[" + str(j) + "].composite.variable"][1]() assert all(next(variable_gen) is c.variable for c in var[j].composite) - assert next(var_flat[j]["fixed.variable"][1]()) is var[j].fixed.variable - assert next(var_flat[j]["fixed.parameter"][1]()) is var[j].fixed.parameter + assert ( + next(var_flat["[" + str(j) + "].fixed.variable"][1]()) + is var[j].fixed.variable + ) + assert ( + next(var_flat["[" + str(j) + "].fixed.parameter"][1]()) + is var[j].fixed.parameter + ) for i in range(3): assert all(isinstance(c.variable, cs.MX) for c in var[j].composite_list[i]) - variable_gen = var_flat[j]["composite_list[" + str(i) + "].variable"][1]() + variable_gen = var_flat[ + "[" + str(j) + "].composite_list[" + str(i) + "].variable" + ][1]() assert ( - var_flat[j]["composite_list[" + str(i) + "].variable"][0] == horizon_len + var_flat["[" + str(j) + "].composite_list[" + str(i) + "].variable"][0] + == horizon_len ) assert all( next(variable_gen) is c.variable for c in var[j].composite_list[i] ) assert all(isinstance(c.parameter, cs.MX) for c in var[j].composite_list[i]) - parameter_gen = var_flat[j]["composite_list[" + str(i) + "].parameter"][1]() + parameter_gen = var_flat[ + "[" + str(j) + "].composite_list[" + str(i) + "].parameter" + ][1]() assert all( next(parameter_gen) is c.parameter for c in var[j].composite_list[i] ) assert ( - var_flat[j]["composite_list[" + str(i) + "].parameter"][0] + var_flat["[" + str(j) + "].composite_list[" + str(i) + "].parameter"][0] == horizon_len ) assert ( - next(var_flat[j]["fixed_list[" + str(i) + "].variable"][1]()) + next( + var_flat["[" + str(j) + "].fixed_list[" + str(i) + "].variable"][ + 1 + ]() + ) is var[j].fixed_list[i].variable ) - assert var_flat[j]["fixed_list[" + str(i) + "].variable"][0] == 1 assert ( - next(var_flat[j]["fixed_list[" + str(i) + "].parameter"][1]()) + var_flat["[" + str(j) + "].fixed_list[" + str(i) + "].variable"][0] == 1 + ) + assert ( + next( + var_flat["[" + str(j) + "].fixed_list[" + str(i) + "].parameter"][ + 1 + ]() + ) is var[j].fixed_list[i].parameter ) - assert var_flat[j]["fixed_list[" + str(i) + "].parameter"][0] == 1 + assert ( + var_flat["[" + str(j) + "].fixed_list[" + str(i) + "].parameter"][0] + == 1 + ) @dataclasses.dataclass @@ -213,37 +236,42 @@ def get_dynamics(): @dataclasses.dataclass class MassFallingTestVariables(OptimizationObject): - masses: list[MassFallingState] = dataclasses.field( + masses: CompositeType[list[MassFallingState]] = dataclasses.field( metadata=time_varying_metadata(), default=None ) g: StorageType = default_storage_field(Parameter) + foo: StorageType = default_storage_field(Variable) def __post_init__(self): - self.g = -9.81 * np.ones(1) + self.g = -9.81 self.masses = [] - for _ in range(2): + for _ in range(3): self.masses.append(MassFallingState()) + self.foo = np.zeros((3, 1)) def test_multiple_shooting(): guess = MassFallingTestVariables() guess.masses = None + guess.foo = None horizon = 100 dt = 0.01 initial_position = 1.0 initial_velocity = 0 - problem, var = OptimalControlProblem.create( + problem, var, symbolic = OptimalControlProblem.create( input_structure=MassFallingTestVariables(), horizon=horizon, ) + assert problem.initial(symbolic.g) is problem.final(symbolic.g) + problem.add_dynamics( dot(["masses[0].x", "masses[0].v"]) == (MassFallingState.get_dynamics(), {"masses[0].x": "x", "masses[0].v": "v"}), dt=dt, - integrator=ForwardEuler, + integrator=integrators.ForwardEuler, ) initial_position_constraint = var.masses[0][0].x == initial_position @@ -256,11 +284,40 @@ def test_multiple_shooting(): == (MassFallingState.get_dynamics(), {"masses[1].x": "x", "masses[1].v": "v"}), dt=dt, x0={"masses[1].x": initial_position, "masses[1].v": initial_velocity}, - integrator=ForwardEuler, + integrator=integrators.ForwardEuler, mode=ExpressionType.minimize, x0_name="initial_condition", ) + problem.add_dynamics( + dot(symbolic.masses[2].x) == symbolic.masses[2].v, + dt=dt, + x0=initial_position, + integrator=integrators.ForwardEuler, + x0_name="initial_condition_simple_x", + ) + + problem.add_dynamics( + dot(symbolic.masses[2].v) == ["g"], + dt=dt, + x0={symbolic.masses[2].v: initial_velocity}, + integrator=integrators.ForwardEuler, + x0_name="initial_condition_simple_v", + ) + + problem.add_expression_to_horizon( + expression=(symbolic.foo >= 5), apply_to_first_elements=False + ) + + problem.add_constraint(expression=problem.initial(symbolic.foo) == 0) + problem.add_constraint(expression=problem.final(symbolic.foo) == 6.0) + + problem.add_expression_to_horizon( + expression=cs.sumsqr(symbolic.foo), + apply_to_first_elements=True, + mode=ExpressionType.minimize, + ) + problem.set_initial_guess(guess) sol = problem.solve() @@ -273,6 +330,8 @@ def test_multiple_shooting(): assert "initial_condition{0}" in problem.get_cost_expressions() assert "initial_condition{1}" in problem.get_cost_expressions() assert "initial_position" in sol.constraint_multipliers + assert "initial_condition_simple_x{0}" in sol.constraint_multipliers + assert "initial_condition_simple_v{0}" in sol.constraint_multipliers expected_position = initial_position expected_velocity = initial_velocity @@ -282,5 +341,13 @@ def test_multiple_shooting(): assert float(sol.values.masses[0][i].v) == pytest.approx(expected_velocity) assert float(sol.values.masses[1][i].x) == pytest.approx(expected_position) assert float(sol.values.masses[1][i].v) == pytest.approx(expected_velocity) + assert float(sol.values.masses[2][i].x) == pytest.approx(expected_position) + assert float(sol.values.masses[2][i].v) == pytest.approx(expected_velocity) + if i == 0: + assert sol.values.foo[i] == pytest.approx(0) + elif i == horizon - 1: + assert sol.values.foo[i] == pytest.approx(6) + else: + assert sol.values.foo[i] == pytest.approx(5) expected_position += dt * expected_velocity expected_velocity += dt * guess.g diff --git a/test/test_opti_generate_objects.py b/test/test_opti_generate_objects.py index 15aa9d18..bd9fa3f3 100644 --- a/test/test_opti_generate_objects.py +++ b/test/test_opti_generate_objects.py @@ -4,11 +4,15 @@ import numpy as np from hippopt import ( + CompositeType, OptimizationObject, OptiSolver, + OverridableParameter, + OverridableVariable, Parameter, StorageType, Variable, + default_composite_field, default_storage_field, ) @@ -17,25 +21,38 @@ class CustomVariable(OptimizationObject): variable: StorageType = default_storage_field(cls=Variable) parameter: StorageType = default_storage_field(cls=Parameter) + scalar: StorageType = default_storage_field(cls=Variable) def __post_init__(self): self.variable = np.ones(shape=3) self.parameter = np.ones(shape=3) + self.scalar = 1.0 @dataclasses.dataclass class AggregateClass(OptimizationObject): - aggregated: CustomVariable = dataclasses.field(default_factory=CustomVariable) + aggregated: CompositeType[CustomVariable] = default_composite_field( + factory=CustomVariable + ) + aggregated_list: CompositeType[list[CustomVariable]] = default_composite_field( + factory=list + ) other_parameter: StorageType = default_storage_field(cls=Parameter) other: str = "" def __post_init__(self): self.other_parameter = np.ones(3) self.other = "untouched" + for _ in range(3): + self.aggregated_list.append(CustomVariable()) def test_generate_objects(): test_var = AggregateClass() + test_var_as_list = test_var.to_list() + assert ( + len(test_var_as_list) == 3 + 3 * 3 + 1 + ) # 3 for aggregated, 3*3 for aggregated_list, 1 for other_parameter solver = OptiSolver() opti_var = solver.generate_optimization_objects(test_var) assert isinstance(opti_var.aggregated.parameter, cs.MX) @@ -44,8 +61,22 @@ def test_generate_objects(): assert opti_var.aggregated.variable.shape == (3, 1) assert isinstance(opti_var.other_parameter, cs.MX) assert opti_var.other_parameter.shape == (3, 1) + assert isinstance(opti_var.aggregated.scalar, cs.MX) + assert opti_var.aggregated.scalar.shape == (1, 1) + assert isinstance(opti_var.aggregated_list, list) + for opti_var_list in opti_var.aggregated_list: + assert isinstance(opti_var_list.parameter, cs.MX) + assert opti_var_list.parameter.shape == (3, 1) + assert isinstance(opti_var_list.variable, cs.MX) + assert opti_var_list.variable.shape == (3, 1) + assert isinstance(opti_var_list.scalar, cs.MX) + assert opti_var_list.scalar.shape == (1, 1) assert opti_var.other == "untouched" assert solver.get_optimization_objects() is opti_var + assert len(solver.get_free_parameters_names()) == 0 + assert (len(opti_var.to_list())) == len(test_var_as_list) + expected_len = 7 + 3 * 7 + 3 + assert opti_var.to_mx().shape == (expected_len, 1) def test_generate_objects_list(): @@ -60,7 +91,113 @@ def test_generate_objects_list(): assert opti_var.aggregated.parameter.shape == (3, 1) assert isinstance(opti_var.aggregated.variable, cs.MX) assert opti_var.aggregated.variable.shape == (3, 1) + assert isinstance(opti_var.aggregated.scalar, cs.MX) + assert opti_var.aggregated.scalar.shape == (1, 1) assert isinstance(opti_var.other_parameter, cs.MX) assert opti_var.other_parameter.shape == (3, 1) assert opti_var.other == "untouched" assert solver.get_optimization_objects() is opti_var_list + + +@dataclasses.dataclass +class CustomOverridableVariable(OptimizationObject): + overridable: StorageType = default_storage_field(cls=OverridableVariable) + not_overridable: StorageType = default_storage_field(cls=Variable) + + def __post_init__(self): + self.overridable = 0.0 + self.not_overridable = 0.0 + + +@dataclasses.dataclass +class CustomCompositeOverridableVariable(OptimizationObject): + composite: CompositeType[CustomOverridableVariable] = default_composite_field( + cls=Parameter, factory=CustomOverridableVariable + ) + + +def test_generate_variables_overridable(): + test_var = CustomCompositeOverridableVariable() + solver = OptiSolver() + opti_var = solver.generate_optimization_objects(test_var) + assert isinstance(opti_var.composite.overridable, cs.MX) + assert opti_var.composite.overridable.shape == (1, 1) + assert ( + solver.get_object_type(opti_var.composite.overridable) + == Parameter.StorageTypeValue + ) + assert isinstance(opti_var.composite.not_overridable, cs.MX) + assert opti_var.composite.not_overridable.shape == (1, 1) + assert ( + solver.get_object_type(opti_var.composite.not_overridable) + == Variable.StorageTypeValue + ) + + +@dataclasses.dataclass +class CustomOverridableParameter(OptimizationObject): + overridable: StorageType = default_storage_field(cls=OverridableParameter) + not_overridable: StorageType = default_storage_field(cls=Parameter) + + def __post_init__(self): + self.overridable = 0.0 + self.not_overridable = 0.0 + + +@dataclasses.dataclass +class CustomCompositeOverridableParameter(OptimizationObject): + composite: CompositeType[CustomOverridableParameter] = default_composite_field( + cls=Variable, factory=CustomOverridableParameter + ) + + +def test_generate_parameters_overridable(): + test_var = CustomCompositeOverridableParameter() + solver = OptiSolver() + opti_var = solver.generate_optimization_objects(test_var) + assert isinstance(opti_var.composite.overridable, cs.MX) + assert opti_var.composite.overridable.shape == (1, 1) + assert ( + solver.get_object_type(opti_var.composite.overridable) + == Variable.StorageTypeValue + ) + assert isinstance(opti_var.composite.not_overridable, cs.MX) + assert opti_var.composite.not_overridable.shape == (1, 1) + assert ( + solver.get_object_type(opti_var.composite.not_overridable) + == Parameter.StorageTypeValue + ) + + +@dataclasses.dataclass +class CustomCustomOverridableVariableInner(OptimizationObject): + composite: CompositeType[CustomOverridableVariable] = default_composite_field( + cls=OverridableVariable, factory=CustomOverridableVariable + ) + + +@dataclasses.dataclass +class CustomCustomOverridableVariableNested(OptimizationObject): + composite: CompositeType[ + CustomCustomOverridableVariableInner + ] = default_composite_field( + cls=OverridableParameter, factory=CustomCustomOverridableVariableInner + ) + + +def test_generate_nested_overridable_class(): + test_var = CustomCustomOverridableVariableNested() + solver = OptiSolver() + opti_var = solver.generate_optimization_objects(test_var) + assert isinstance(opti_var.composite.composite.overridable, cs.MX) + assert opti_var.composite.composite.overridable.shape == (1, 1) + assert ( + solver.get_object_type(opti_var.composite.composite.overridable) + == Parameter.StorageTypeValue + ) + assert isinstance(opti_var.composite.composite.not_overridable, cs.MX) + assert opti_var.composite.composite.not_overridable.shape == (1, 1) + assert ( + solver.get_object_type(opti_var.composite.composite.not_overridable) + == Variable.StorageTypeValue + ) diff --git a/test/test_optimization_problem.py b/test/test_optimization_problem.py index 3644ab9d..1c060020 100644 --- a/test/test_optimization_problem.py +++ b/test/test_optimization_problem.py @@ -9,10 +9,12 @@ OptiFailure, OptimizationObject, OptimizationProblem, + OptiSolver, Parameter, StorageType, Variable, default_storage_field, + opti_callback, ) @@ -263,3 +265,19 @@ def test_opti_failure(): print("Received error: ", err) else: assert False + + +def test_opti_callback(): + opti_solver = OptiSolver( + callback_criterion=opti_callback.BestCost() + | opti_callback.BestPrimalInfeasibility() + ) + problem, variables = OptimizationProblem.create( + input_structure=SwitchVar(), optimization_solver=opti_solver + ) + + problem.add_constraint(variables.x <= 1) + problem.add_constraint(variables.x >= 0) + problem.add_constraint(variables.x**2 == 10) + + problem.solve()