diff --git a/.github/workflows/ci_cd.yml b/.github/workflows/ci_cd.yml index 06b81e92..cfc98c37 100644 --- a/.github/workflows/ci_cd.yml +++ b/.github/workflows/ci_cd.yml @@ -25,7 +25,7 @@ jobs: steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v3 - uses: conda-incubator/setup-miniconda@v2 with: diff --git a/src/hippopt/__init__.py b/src/hippopt/__init__.py index 927b4b4f..e1632b3d 100644 --- a/src/hippopt/__init__.py +++ b/src/hippopt/__init__.py @@ -1,9 +1,11 @@ from . import base +from .base.continuous_variable import ContinuousVariable, TContinuousVariable +from .base.opti_solver import OptiSolver from .base.optimization_object import ( OptimizationObject, StorageType, TOptimizationObject, default_storage_field, ) +from .base.optimization_problem import ExpressionType, OptimizationProblem from .base.parameter import Parameter, TParameter -from .base.variable import TVariable, Variable diff --git a/src/hippopt/base/__init__.py b/src/hippopt/base/__init__.py index f2a23186..89b9c61d 100644 --- a/src/hippopt/base/__init__.py +++ b/src/hippopt/base/__init__.py @@ -1 +1,7 @@ -from . import optimization_object, parameter, variable +from . import ( + continuous_variable, + opti_solver, + optimization_object, + optimization_problem, + parameter, +) diff --git a/src/hippopt/base/variable.py b/src/hippopt/base/continuous_variable.py similarity index 57% rename from src/hippopt/base/variable.py rename to src/hippopt/base/continuous_variable.py index 64912985..c1f85fb6 100644 --- a/src/hippopt/base/variable.py +++ b/src/hippopt/base/continuous_variable.py @@ -3,12 +3,12 @@ from hippopt.base.optimization_object import OptimizationObject -TVariable = TypeVar("TVariable", bound="Variable") +TContinuousVariable = TypeVar("TContinuousVariable", bound="ContinuousVariable") @dataclasses.dataclass -class Variable(OptimizationObject): +class ContinuousVariable(OptimizationObject): """""" - StorageType: ClassVar[str] = "variable" + StorageType: ClassVar[str] = "continuous_variable" StorageTypeMetadata: ClassVar[dict[str, Any]] = dict(StorageType=StorageType) diff --git a/src/hippopt/base/opti_solver.py b/src/hippopt/base/opti_solver.py new file mode 100644 index 00000000..388cccaf --- /dev/null +++ b/src/hippopt/base/opti_solver.py @@ -0,0 +1,428 @@ +import copy +import dataclasses +from typing import Any, ClassVar, List, Tuple + +import casadi as cs +import numpy as np + +from hippopt.base.continuous_variable import ContinuousVariable +from hippopt.base.optimization_object import OptimizationObject, TOptimizationObject +from hippopt.base.optimization_solver import OptimizationSolver, SolverOutput +from hippopt.base.parameter import Parameter + + +class ProblemNotSolvedException(Exception): + def __init__(self): + super().__init__("The problem has not been solved yet.") + + +@dataclasses.dataclass +class OptiSolver(OptimizationSolver): + DefaultSolverType: ClassVar[str] = "ipopt" + _inner_solver: str = dataclasses.field(default=DefaultSolverType) + problem_type: dataclasses.InitVar[str] = dataclasses.field(default="nlp") + + _options_plugin: dict[str, Any] = dataclasses.field(default_factory=dict) + _options_solver: dict[str, Any] = dataclasses.field(default_factory=dict) + options_solver: dataclasses.InitVar[dict[str, Any]] = dataclasses.field( + default=None + ) + options_plugin: dataclasses.InitVar[dict[str, Any]] = dataclasses.field( + default=None + ) + + _cost: cs.MX = dataclasses.field(default=None) + _solver: cs.Opti = dataclasses.field(default=None) + _opti_solution: cs.OptiSol = dataclasses.field(default=None) + _output_solution: TOptimizationObject | List[ + TOptimizationObject + ] = dataclasses.field(default=None) + _output_cost: float = dataclasses.field(default=None) + _variables: TOptimizationObject | List[TOptimizationObject] = dataclasses.field( + default=None + ) + + def __post_init__( + self, + problem_type: str, + options_solver: dict[str, Any] = None, + options_plugin: dict[str, Any] = None, + ): + self._solver = cs.Opti(problem_type) + self._options_solver = ( + options_solver if isinstance(options_solver, dict) else {} + ) + self._options_plugin = ( + options_plugin if isinstance(options_plugin, dict) else {} + ) + self._solver.solver( + self._inner_solver, self._options_plugin, self._options_solver + ) + + def _generate_objects_from_instance( + self, input_structure: TOptimizationObject + ) -> TOptimizationObject: + output = copy.deepcopy(input_structure) + + for field in dataclasses.fields(output): + has_storage_field = OptimizationObject.StorageTypeField in field.metadata + + if ( + has_storage_field + and field.metadata[OptimizationObject.StorageTypeField] + is ContinuousVariable.StorageType + ): + value = dataclasses.asdict(output)[field.name] + + if isinstance(value, np.ndarray): + if value.ndim > 2: + raise ValueError( + "Field " + + field.name + + " has number of dimensions greater than 2." + ) + if value.ndim < 2: + value = np.expand_dims(value, axis=1) + + output.__setattr__(field.name, self._solver.variable(*value.shape)) + continue + + if ( + has_storage_field + and field.metadata[OptimizationObject.StorageTypeField] + is Parameter.StorageType + ): + value = dataclasses.asdict(output)[field.name] + + if isinstance(value, np.ndarray): + if value.ndim > 2: + raise ValueError( + "Field " + + field.name + + " has number of dimensions greater than 2." + ) + if value.ndim < 2: + value = np.expand_dims(value, axis=1) + + output.__setattr__(field.name, self._solver.parameter(*value.shape)) + continue + + composite_value = output.__getattribute__(field.name) + + is_list = isinstance(composite_value, list) + list_of_optimization_objects = is_list and all( + isinstance(elem, OptimizationObject) for elem in composite_value + ) + + if ( + isinstance(composite_value, OptimizationObject) + or list_of_optimization_objects + ): + output.__setattr__( + field.name, self.generate_optimization_objects(composite_value) + ) + + self._variables = output + return output + + def _generate_objects_from_list( + self, input_structure: List[TOptimizationObject] + ) -> List[TOptimizationObject]: + list_of_optimization_objects = isinstance(input_structure, list) and all( + isinstance(elem, OptimizationObject) for elem in input_structure + ) + + assert ( + isinstance(input_structure, OptimizationObject) + or list_of_optimization_objects + ) + + output = copy.deepcopy(input_structure) + for i in range(len(output)): + output[i] = self.generate_optimization_objects(output[i]) + + self._variables = output + return output + + def _generate_solution_output( + self, variables: TOptimizationObject | List[TOptimizationObject] + ) -> TOptimizationObject | List[TOptimizationObject]: + output = copy.deepcopy(variables) + + if isinstance(variables, list): + i = 0 + for element in variables: + output[i] = self._generate_solution_output(element) + i += 1 + + return output + + for field in dataclasses.fields(variables): + has_storage_field = OptimizationObject.StorageTypeField in field.metadata + + if has_storage_field and ( + ( + field.metadata[OptimizationObject.StorageTypeField] + is ContinuousVariable.StorageType + ) + or ( + field.metadata[OptimizationObject.StorageTypeField] + is Parameter.StorageType + ) + ): + var = dataclasses.asdict(variables)[field.name] + output.__setattr__(field.name, np.array(self._opti_solution.value(var))) + continue + + composite_variable = variables.__getattribute__(field.name) + + is_list = isinstance(composite_variable, list) + list_of_optimization_objects = is_list and all( + isinstance(elem, OptimizationObject) for elem in composite_variable + ) + + if ( + isinstance(composite_variable, OptimizationObject) + or list_of_optimization_objects + ): + output.__setattr__( + field.name, self._generate_solution_output(composite_variable) + ) + + return output + + def _set_initial_guess_internal( + self, + initial_guess: TOptimizationObject, + corresponding_variable: TOptimizationObject, + ) -> None: + for field in dataclasses.fields(initial_guess): + has_storage_field = OptimizationObject.StorageTypeField in field.metadata + + if ( + has_storage_field + and field.metadata[OptimizationObject.StorageTypeField] + is ContinuousVariable.StorageType + ): + guess = dataclasses.asdict(initial_guess)[field.name] + + if guess is None: + continue + + if not isinstance(guess, np.ndarray): + raise ValueError( + "The guess for the field " + + field.name + + " is not an numpy array." + ) + + if not hasattr(corresponding_variable, field.name): + raise ValueError( + "The guess has the field " + + field.name + + " but it is not present in the optimization variables" + ) + + corresponding_variable_value = corresponding_variable.__getattribute__( + field.name + ) + + input_shape = ( + guess.shape if len(guess.shape) > 1 else (guess.shape[0], 1) + ) + + if corresponding_variable_value.shape != input_shape: + raise ValueError( + "The guess has the field " + + field.name + + " but its dimension does not match with the corresponding optimization variable" + ) + + self._solver.set_initial(corresponding_variable_value, guess) + continue + + if ( + has_storage_field + and field.metadata[OptimizationObject.StorageTypeField] + is Parameter.StorageType + ): + guess = dataclasses.asdict(initial_guess)[field.name] + + if guess is None: + continue + + if not isinstance(guess, np.ndarray): + raise ValueError( + "The guess for the field " + + field.name + + " is not an numpy array." + ) + + if not hasattr(corresponding_variable, field.name): + raise ValueError( + "The guess has the field " + + field.name + + " but it is not present in the optimization parameters" + ) + + corresponding_parameter_value = corresponding_variable.__getattribute__( + field.name + ) + + input_shape = ( + guess.shape if len(guess.shape) > 1 else (guess.shape[0], 1) + ) + + if corresponding_parameter_value.shape != input_shape: + raise ValueError( + "The guess has the field " + + field.name + + " but its dimension does not match with the corresponding optimization variable" + ) + + self._solver.set_value(corresponding_parameter_value, guess) + continue + + composite_variable_guess = initial_guess.__getattribute__(field.name) + + if isinstance(composite_variable_guess, OptimizationObject): + if not hasattr(corresponding_variable, field.name): + raise ValueError( + "The guess has the field " + + field.name + + " but it is not present in the optimization structure" + ) + + self._set_initial_guess_internal( + initial_guess=composite_variable_guess, + corresponding_variable=corresponding_variable.__getattribute__( + field.name + ), + ) + continue + + is_list = isinstance(composite_variable_guess, list) + list_of_optimization_objects = is_list and all( + isinstance(elem, OptimizationObject) + for elem in composite_variable_guess + ) + + if list_of_optimization_objects: + if not hasattr(corresponding_variable, field.name): + raise ValueError( + "The guess has the field " + + field.name + + " but it is not present in the optimization structure" + ) + corresponding_nested_variable = corresponding_variable.__getattribute__( + field.name + ) + + if not isinstance(corresponding_nested_variable, list): + raise ValueError( + "The guess has the field " + + field.name + + " as list, but the corresponding structure is not a list" + ) + + i = 0 + for element in composite_variable_guess: + if i >= len(corresponding_nested_variable): + raise ValueError( + "The input guess is the list " + + field.name + + " but the corresponding variable structure is not a list" + ) + + self._set_initial_guess_internal( + initial_guess=element, + corresponding_variable=corresponding_nested_variable[i], + ) + i += 1 + + def generate_optimization_objects( + self, input_structure: TOptimizationObject | List[TOptimizationObject], **kwargs + ) -> TOptimizationObject | List[TOptimizationObject]: + if isinstance(input_structure, OptimizationObject): + return self._generate_objects_from_instance(input_structure=input_structure) + return self._generate_objects_from_list(input_structure=input_structure) + + def get_optimization_objects( + self, + ) -> TOptimizationObject | List[TOptimizationObject]: + return self._variables + + def set_initial_guess( + self, initial_guess: TOptimizationObject | List[TOptimizationObject] + ) -> None: + if isinstance(initial_guess, list): + if not isinstance(self._variables, list): + raise ValueError( + "The input guess is a list, but the specified variables structure is not" + ) + + i = 0 + for element in initial_guess: + if i >= len(self._variables): + raise ValueError( + "The input guess is a list, but the specified variables structure is not" + ) + + self._set_initial_guess_internal( + initial_guess=element, corresponding_variable=self._variables[i] + ) + i += 1 + return + + self._set_initial_guess_internal( + initial_guess=initial_guess, corresponding_variable=self._variables + ) + + def set_opti_options( + self, + inner_solver: str = None, + options_solver: dict[str, Any] = None, + options_plugin: dict[str, Any] = None, + ) -> None: + if inner_solver is not None: + self._inner_solver = inner_solver + if options_plugin is not None: + self._options_plugin = options_plugin + if options_solver is not None: + self._options_solver = options_solver + + self._solver.solver( + self._inner_solver, self._options_plugin, self._options_solver + ) + + def solve(self) -> SolverOutput: + self._solver.minimize(self._cost) + self._opti_solution = self._solver.solve() + self._output_cost = self._opti_solution.value(self._cost) + self._output_solution = self._generate_solution_output(self._variables) + return SolverOutput( + _values=self._output_solution, _cost_value=self._output_cost + ) + + def get_solution(self) -> TOptimizationObject | List[TOptimizationObject]: + if self._output_solution is None: + raise ProblemNotSolvedException + return self._output_solution + + def get_cost_value(self) -> float: + if self._output_cost is None: + raise ProblemNotSolvedException + return self._output_cost + + def add_cost(self, input_cost: cs.MX) -> None: + if self._cost is None: + self._cost = input_cost + return + + self._cost += input_cost + + def add_constraint(self, input_constraint: cs.MX) -> None: + self._solver.subject_to(input_constraint) + + def cost_function(self) -> cs.MX: + return self._cost diff --git a/src/hippopt/base/optimization_object.py b/src/hippopt/base/optimization_object.py index 4d75da95..91d7b4f9 100644 --- a/src/hippopt/base/optimization_object.py +++ b/src/hippopt/base/optimization_object.py @@ -13,6 +13,7 @@ @dataclasses.dataclass class OptimizationObject(abc.ABC): StorageType: ClassVar[str] = "generic" + StorageTypeField: ClassVar[str] = "StorageType" StorageTypeMetadata: ClassVar[dict[str, Any]] = dict(StorageType=StorageType) def get_default_initialization( @@ -32,10 +33,9 @@ def get_default_initialized_object( """ output = copy.deepcopy(self) - output_dict = dataclasses.asdict(output) for field in dataclasses.fields(output): - if "StorageType" in field.metadata: + if self.StorageTypeField in field.metadata: output.__setattr__( field.name, output.get_default_initialization(field.name) ) diff --git a/src/hippopt/base/optimization_problem.py b/src/hippopt/base/optimization_problem.py new file mode 100644 index 00000000..d07cd548 --- /dev/null +++ b/src/hippopt/base/optimization_problem.py @@ -0,0 +1,103 @@ +import abc +import dataclasses +import types +from enum import Enum +from typing import Generator, List + +import casadi as cs + +from hippopt.base.opti_solver import OptiSolver +from hippopt.base.optimization_object import OptimizationObject, TOptimizationObject +from hippopt.base.optimization_solver import OptimizationSolver, TOptimizationSolver + + +class ExpressionType(Enum): + skip = 0 + subject_to = 1 + minimize = 2 + + +@dataclasses.dataclass +class OptimizationProblem(abc.ABC): + optimization_solver: dataclasses.InitVar[OptimizationSolver] = dataclasses.field( + default=None + ) + _solver: TOptimizationSolver = dataclasses.field(default=None) + + def __post_init__(self, optimization_solver: TOptimizationSolver = None): + self._solver = ( + optimization_solver + if isinstance(optimization_solver, OptimizationSolver) + else OptiSolver() + ) + + def generate_optimization_objects( + self, input_structure: OptimizationObject | List[TOptimizationObject] + ) -> TOptimizationObject | List[TOptimizationObject]: + return self._solver.generate_optimization_objects( + input_structure=input_structure + ) + + def add_cost( + self, + expression: cs.MX | Generator[cs.MX, None, None], + scaling: float | cs.MX = 1.0, + ) -> None: + if isinstance(expression, types.GeneratorType): + for expr in expression: + self.add_cost(expr, scaling) + else: + assert isinstance(expression, cs.MX) + if expression.is_op(cs.OP_LE) or expression.is_op(cs.OP_LT): + raise ValueError( + "The conversion from an inequality to a cost is not yet supported" + ) + if expression.is_op(cs.OP_EQ): + error_expr = expression.dep(0) - expression.dep(1) + self._solver.add_cost(scaling * cs.sumsqr(error_expr)) + else: + self._solver.add_cost(scaling * expression) # noqa + + def add_constraint( + self, + expression: cs.MX | Generator[cs.MX, None, None], + expected_value: float | cs.MX = 0.0, + ) -> None: + if isinstance(expression, types.GeneratorType): + for expr in expression: + self.add_constraint(expr, expected_value) + else: + assert isinstance(expression, cs.MX) + if ( + expression.is_op(cs.OP_LE) + or expression.is_op(cs.OP_LT) + or expression.is_op(cs.OP_EQ) + ): + self._solver.add_constraint(expression) + else: + if not expression.is_scalar(): + raise ValueError("The input expression is not supported.") + self._solver.add_constraint(expression == expected_value) # noqa + + def add_expression( + self, + mode: ExpressionType, + expression: cs.MX | Generator[cs.MX, None, None], + **kwargs, + ) -> None: + if isinstance(expression, types.GeneratorType): + for expr in expression: + self.add_expression(mode, expr) + else: + assert isinstance(expression, cs.MX) + match mode: + case ExpressionType.subject_to: + self.add_constraint(expression, **kwargs) + + case ExpressionType.minimize: + self.add_cost(expression, **kwargs) + case _: + pass + + def solver(self) -> TOptimizationSolver: + return self._solver diff --git a/src/hippopt/base/optimization_solver.py b/src/hippopt/base/optimization_solver.py new file mode 100644 index 00000000..9d8e00e4 --- /dev/null +++ b/src/hippopt/base/optimization_solver.py @@ -0,0 +1,60 @@ +import abc +import dataclasses +from typing import Generic, List, Tuple, TypeVar + +import casadi as cs + +from hippopt.base.optimization_object import TOptimizationObject + +TOptimizationSolver = TypeVar("TOptimizationSolver", bound="OptimizationSolver") +TGenericOptimizationObject = TypeVar("TGenericOptimizationObject") + + +@dataclasses.dataclass +class SolverOutput(Generic[TGenericOptimizationObject]): + values: TGenericOptimizationObject = dataclasses.field(default=None) + cost_value: float = None + + _values: dataclasses.InitVar[TGenericOptimizationObject] = dataclasses.field( + default=None + ) + _cost_value: dataclasses.InitVar[float] = dataclasses.field(default=None) + + def __post_init__(self, _values: TGenericOptimizationObject, _cost_value: float): + self.values = _values + self.cost_value = _cost_value + + +@dataclasses.dataclass +class OptimizationSolver(abc.ABC): + @abc.abstractmethod + def generate_optimization_objects( + self, input_structure: TOptimizationObject | List[TOptimizationObject], **kwargs + ) -> TOptimizationObject | List[TOptimizationObject]: + pass + + @abc.abstractmethod + def set_initial_guess( + self, initial_guess: TOptimizationObject | List[TOptimizationObject] + ) -> None: + pass + + @abc.abstractmethod + def solve(self) -> SolverOutput: + pass + + @abc.abstractmethod + def get_solution(self) -> TOptimizationObject | List[TOptimizationObject]: + pass + + @abc.abstractmethod + def get_cost_value(self) -> float: + pass + + @abc.abstractmethod + def add_cost(self, input_cost: cs.MX) -> None: + pass + + @abc.abstractmethod + def add_constraint(self, input_constraint: cs.MX) -> None: + pass diff --git a/src/hippopt/base/problem.py b/src/hippopt/base/problem.py deleted file mode 100644 index e69de29b..00000000 diff --git a/src/hippopt/base/solver.py b/src/hippopt/base/solver.py deleted file mode 100644 index e69de29b..00000000 diff --git a/test/test_base.py b/test/test_base.py index 8c7dc394..1759555f 100644 --- a/test/test_base.py +++ b/test/test_base.py @@ -1,27 +1,29 @@ import dataclasses +import casadi as cs import numpy as np from hippopt import ( + ContinuousVariable, OptimizationObject, + OptiSolver, Parameter, StorageType, TOptimizationObject, - Variable, default_storage_field, ) @dataclasses.dataclass -class TestVariable(OptimizationObject): - storage: StorageType = default_storage_field(cls=Variable) +class MyTestVariable(OptimizationObject): + storage: StorageType = default_storage_field(cls=ContinuousVariable) def __post_init__(self): self.storage = np.ones(shape=3) @dataclasses.dataclass -class TestParameter(OptimizationObject): +class MyTestParameter(OptimizationObject): storage: StorageType = default_storage_field(cls=Parameter) def __post_init__(self): @@ -29,14 +31,14 @@ def __post_init__(self): def test_zero_variable(): - test_var = TestVariable() + test_var = MyTestVariable() test_var_zero = test_var.get_default_initialized_object() assert test_var_zero.storage.shape == (3,) assert np.all(test_var_zero.storage == 0) def test_zero_parameter(): - test_par = TestParameter() + test_par = MyTestParameter() test_par_zero = test_par.get_default_initialized_object() assert test_par_zero.storage.shape == (3,) assert np.all(test_par_zero.storage == 0) @@ -44,7 +46,7 @@ def test_zero_parameter(): @dataclasses.dataclass class CustomInitializationVariable(OptimizationObject): - variable: StorageType = default_storage_field(cls=Variable) + variable: StorageType = default_storage_field(cls=ContinuousVariable) parameter: StorageType = default_storage_field(cls=Parameter) def __post_init__(self): @@ -71,18 +73,19 @@ def test_custom_initialization(): @dataclasses.dataclass class AggregateClass(OptimizationObject): - aggregated: CustomInitializationVariable + aggregated: CustomInitializationVariable = dataclasses.field( + default_factory=CustomInitializationVariable + ) other_parameter: StorageType = default_storage_field(cls=Parameter) other: str = "" def __post_init__(self): - self.aggregated = CustomInitializationVariable() self.other_parameter = np.ones(3) self.other = "untouched" def test_aggregated(): - test_var = AggregateClass(aggregated=CustomInitializationVariable()) + test_var = AggregateClass() test_var_init = test_var.get_default_initialized_object() assert test_var_init.aggregated.parameter.shape == (3,) assert np.all(test_var_init.aggregated.parameter == 0) @@ -91,3 +94,35 @@ def test_aggregated(): assert test_var_init.other_parameter.shape == (3,) assert np.all(test_var_init.other_parameter == 0) assert test_var_init.other == "untouched" + + +def test_generate_objects(): + test_var = AggregateClass() + solver = OptiSolver() + opti_var = solver.generate_optimization_objects(test_var) + assert isinstance(opti_var.aggregated.parameter, cs.MX) + assert opti_var.aggregated.parameter.shape == (3, 1) + assert isinstance(opti_var.aggregated.variable, cs.MX) + assert opti_var.aggregated.variable.shape == (3, 1) + assert isinstance(opti_var.other_parameter, cs.MX) + assert opti_var.other_parameter.shape == (3, 1) + assert opti_var.other == "untouched" + assert solver.get_optimization_objects() is opti_var + + +def test_generate_objects_list(): + test_var_list = [] + for _ in range(2): + test_var_list.append(AggregateClass()) + solver = OptiSolver() + opti_var_list = solver.generate_optimization_objects(test_var_list) + assert len(opti_var_list) == 2 + for opti_var in opti_var_list: + assert isinstance(opti_var.aggregated.parameter, cs.MX) + assert opti_var.aggregated.parameter.shape == (3, 1) + assert isinstance(opti_var.aggregated.variable, cs.MX) + assert opti_var.aggregated.variable.shape == (3, 1) + assert isinstance(opti_var.other_parameter, cs.MX) + assert opti_var.other_parameter.shape == (3, 1) + assert opti_var.other == "untouched" + assert solver.get_optimization_objects() is opti_var_list diff --git a/test/test_optimization_problem.py b/test/test_optimization_problem.py new file mode 100644 index 00000000..06e218da --- /dev/null +++ b/test/test_optimization_problem.py @@ -0,0 +1,258 @@ +import dataclasses + +import casadi as cs +import numpy as np +import pytest + +from hippopt import ( + ContinuousVariable, + ExpressionType, + OptimizationObject, + OptimizationProblem, + Parameter, + StorageType, + default_storage_field, +) + + +@dataclasses.dataclass +class MyTestVar(OptimizationObject): + variable: StorageType = default_storage_field(ContinuousVariable) + size: dataclasses.InitVar[int] = dataclasses.field(default=3) + + def __post_init__(self, size: int = 3): + self.variable = np.zeros(size) + + +def test_opti_solver(): + size = 4 + problem = OptimizationProblem() + var = problem.generate_optimization_objects(input_structure=MyTestVar(size=size)) + np.random.seed(123) + a = 10.0 * np.random.rand(size) + 0.01 + b = 20.0 * np.random.rand(size) - 10.0 + c = 20.0 * np.random.rand(size) - 10.0 + + problem.add_expression( + mode=ExpressionType.minimize, + expression=( + a[k] * cs.power(var.variable[k], 2) + b[k] * var.variable[k] + for k in range(size) + ), + ) + + problem.add_expression( + mode=ExpressionType.subject_to, + expression=(var.variable[k] >= c[k] for k in range(size)), # noqa + ) + + output = problem.solver().solve() + + expected_x = np.zeros(size) + expected_cost = 0 + for i in range(size): + expected = -b[i] / (2 * a[i]) + expected_x[i] = expected if expected >= c[i] else c[i] + expected_cost += ( + -b[i] ** 2 / (4 * a[i]) + if expected >= c[i] + else a[i] * (c[i] ** 2) + b[i] * c[i] + ) + + assert output.values.variable == pytest.approx(expected_x) + assert output.cost_value == pytest.approx(expected_cost) + + assert problem.solver().get_solution().variable == pytest.approx(expected_x) + assert problem.solver().get_cost_value() == pytest.approx(expected_cost) + + +@dataclasses.dataclass +class MyTestVarAndPar(OptimizationObject): + composite: MyTestVar = dataclasses.field(default_factory=MyTestVar) + parameter: StorageType = default_storage_field(Parameter) + + def __post_init__(self): + self.parameter = np.zeros(3) + + +def test_opti_solver_with_parameters(): + problem = OptimizationProblem() + initial_guess = MyTestVarAndPar() + var = problem.generate_optimization_objects(input_structure=MyTestVarAndPar()) + np.random.seed(123) + a = 10.0 * np.random.rand(3) + 0.01 + b = 20.0 * np.random.rand(3) - 10.0 + c = 20.0 * np.random.rand(3) - 10.0 + + initial_guess.parameter = c + + problem.add_expression( + mode=ExpressionType.minimize, + expression=( + a[k] * cs.power(var.composite.variable[k], 2) + + b[k] * var.composite.variable[k] + for k in range(0, 3) + ), + ) + + problem.add_expression( + mode=ExpressionType.subject_to, + expression=( # noqa + var.composite.variable[k] >= var.parameter[k] for k in range(3) + ), + ) + + problem.solver().set_initial_guess(initial_guess=initial_guess) + + output = problem.solver().solve() + + expected_x = np.zeros(3) + expected_cost = 0 + for i in range(3): + expected = -b[i] / (2 * a[i]) + expected_x[i] = expected if expected >= c[i] else c[i] + expected_cost += ( + -b[i] ** 2 / (4 * a[i]) + if expected >= c[i] + else a[i] * (c[i] ** 2) + b[i] * c[i] + ) + + assert output.values.composite.variable == pytest.approx(expected_x) + assert output.cost_value == pytest.approx(expected_cost) + assert output.values.parameter == pytest.approx(c) + + assert problem.solver().get_solution().composite.variable == pytest.approx( + expected_x + ) + assert problem.solver().get_cost_value() == pytest.approx(expected_cost) + + +def test_opti_solver_with_parameters_and_lists(): + problem = OptimizationProblem() + initial_guess = [] + for _ in range(3): + initial_guess.append(MyTestVarAndPar()) + + var = problem.generate_optimization_objects(input_structure=initial_guess) + np.random.seed(123) + + a = [] + b = [] + c = [] + + for j in range(len(initial_guess)): + a.append(10.0 * np.random.rand(3) + 0.01) + b.append(20.0 * np.random.rand(3) - 10.0) + c.append(20.0 * np.random.rand(3) - 10.0) + initial_guess[j].parameter = c[j] + + problem.add_cost( + a[j][k] * cs.power(var[j].composite.variable[k], 2) + + b[j][k] * var[j].composite.variable[k] + for j in range(len(initial_guess)) + for k in range(0, 3) + ) + + problem.add_constraint( + var[j].composite.variable[k] >= c[j][k] # noqa + for j in range(len(initial_guess)) + for k in range(3) + ) + + problem.solver().set_initial_guess(initial_guess=initial_guess) + + output = problem.solver().solve() + + expected_x = np.zeros(3) + expected_cost = 0 + for i in range(len(initial_guess)): + for j in range(3): + expected = -b[i][j] / (2 * a[i][j]) + expected_x[j] = expected if expected >= c[i][j] else c[i][j] + expected_cost += ( + -b[i][j] ** 2 / (4 * a[i][j]) + if expected >= c[i][j] + else a[i][j] * (c[i][j] ** 2) + b[i][j] * c[i][j] + ) + + assert output.values[i].composite.variable == pytest.approx(expected_x) + assert output.values[i].parameter == pytest.approx(c[i]) + + assert output.cost_value == pytest.approx(expected_cost) + assert problem.solver().get_cost_value() == pytest.approx(expected_cost) + + +@dataclasses.dataclass +class SwitchVar(OptimizationObject): + x: StorageType = default_storage_field(ContinuousVariable) + y: StorageType = default_storage_field(ContinuousVariable) + + def __post_init__(self): + self.x = np.zeros(1) + self.y = np.zeros(1) + + +def test_switch_costs(): + initial_problem = OptimizationProblem() + variables = initial_problem.generate_optimization_objects(SwitchVar()) + a = 10 + initial_problem.add_expression(ExpressionType.minimize, variables.x * variables.x) + initial_problem.add_expression( + ExpressionType.minimize, a * variables.y * variables.y + ) + initial_problem.add_expression( + ExpressionType.subject_to, variables.x + variables.y == a - 1 + ) # noqa + output = initial_problem.solver().solve() + expected_cost = a + (a - 2) ** 2 + assert output.cost_value == pytest.approx(expected=expected_cost, rel=0.1) + assert output.values.x == pytest.approx(a - 2, rel=0.1) + + new_problem = OptimizationProblem() + new_variables = new_problem.generate_optimization_objects(SwitchVar()) + new_problem.add_expression( + ExpressionType.minimize, a * new_variables.y * new_variables.y + ) + new_problem.add_expression( + ExpressionType.subject_to, new_variables.x + new_variables.y == a - 1 + ) # noqa + new_problem.add_expression( + ExpressionType.subject_to, + new_variables.x * new_variables.x + 1, + expected_value=1, + ) + output = new_problem.solver().solve() + expected_cost = a * (a - 1) ** 2 + assert output.cost_value == pytest.approx(expected=expected_cost, rel=0.1) + assert output.values.x == pytest.approx(0, abs=1e-4) + + +def test_switch_constraints(): + initial_problem = OptimizationProblem() + variables = initial_problem.generate_optimization_objects(SwitchVar()) + a = 10 + initial_problem.add_expression(ExpressionType.minimize, (variables.x - 5) ** 2) + initial_problem.add_expression( + ExpressionType.minimize, a * variables.y * variables.y + ) + initial_problem.add_expression( + ExpressionType.subject_to, variables.x + variables.y == a - 1 + ) # noqa + initial_output = initial_problem.solver().solve() + + new_problem = OptimizationProblem() + new_variables = new_problem.generate_optimization_objects(SwitchVar()) + new_problem.add_expression( + ExpressionType.minimize, a * new_variables.y * new_variables.y + ) + new_problem.add_expression( + ExpressionType.subject_to, new_variables.x + new_variables.y == a - 1 + ) # noqa + new_problem.add_expression( + ExpressionType.minimize, new_variables.x == 5, scaling=1.0 + ) + output = new_problem.solver().solve() + assert output.cost_value == pytest.approx( + expected=initial_output.cost_value, rel=0.1 + ) + assert output.values.x == pytest.approx(initial_output.values.x)