Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Resource manager workers #17

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
780 changes: 780 additions & 0 deletions resource-manager/python/frinx_worker/resource_manager/__init__.py

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions resource-manager/python/frinx_worker/resource_manager/env.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
import os

RESOURCE_MANAGER_URL_BASE = os.getenv('RESOURCE_MANAGER_URL_BASE', 'http://localhost/api/resource')
HELPDESK_URL = os.getenv('HELPDESK_URL', 'https://frinxhelpdesk.atlassian.net/servicedesk/customer/portals')
29 changes: 29 additions & 0 deletions resource-manager/python/frinx_worker/resource_manager/errors.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
from frinx_api.resource_manager import ID

from frinx_worker.resource_manager.env import HELPDESK_URL


class ResourceManagerWorkerError(Exception):
"""Exception class for resource-manager workers."""


class PoolNotFoundError(ResourceManagerWorkerError, KeyError):
"""KeyError"""
def __init__(self, pool_id: ID, *args):
super().__init__(f'Pool with ID "{pool_id}" was not founded.', *args)


class ResourceNotFoundError(ResourceManagerWorkerError, KeyError):
"""KeyError"""
def __init__(self, resource: str, *args):
super().__init__(f'Resource "{resource}" was not founded.', *args)


class ResourceManagerWorkerRuntimeError(ResourceManagerWorkerError, RuntimeError):
"""RuntimeError"""
def __init__(self):
super().__init__(f'{self.__class__.__name__}, please report this issue on {HELPDESK_URL}. Thank you!')


class InvalidQueryError(ResourceManagerWorkerRuntimeError):
"""422 Client Error: Unprocessable Entity for url: ..."""
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
from enum import Enum
from typing import Any
from typing import Optional

from frinx.common.worker.task_def import TaskInput
from pydantic import BaseModel
from pydantic import ConfigDict
from pydantic import Field
from pydantic import IPvAnyAddress
from pydantic import NonNegativeInt
from pydantic import RootModel
from pydantic.alias_generators import to_camel

from frinx_worker.resource_manager.type_aliases import IPv4PrefixTypeAlias

Data = RootModel[Optional[Any]]


class Result(BaseModel):
data: Optional[Any]


class CustomConfiguredTaskInput(TaskInput):
model_config = ConfigDict(alias_generator=to_camel)


# FIXME: refactor, each pool has own properties, this one is probably for ipv pools
class PoolProperties(BaseModel):
address: IPvAnyAddress
prefix: int
subnet: Optional[bool] = False


class IPv4NetMaskSize(RootModel):
root: NonNegativeInt = Field(le=2**32)


class Report(RootModel):
root: dict[IPv4PrefixTypeAlias, IPv4NetMaskSize]


class ResourceTypeEnum(Enum):
RANDOM_SIGNED_INT32 = 'random_signed_int32'
ROUTE_DISTINGUISHER = 'route_distinguisher'
IPV6_PREFIX = 'ipv6_prefix'
IPV4_PREFIX = 'ipv4_prefix'
VLAN_RANGE = 'vlan_range'
UNIQUE_ID = 'unique_id'
IPV6 = 'ipv6'
IPV4 = 'ipv4'
VLAN = 'vlan'
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
from ipaddress import IPv4Address
from ipaddress import IPv6Address
from typing import Literal
from typing import Optional
from typing import TypeAlias
from typing import Union

from graphql_pydantic_converter.graphql_types import Mutation
from graphql_pydantic_converter.graphql_types import Query
from pydantic.v1.errors import IPv4AddressError
from pydantic.v1.errors import IPv6AddressError

GraphQueryRenderer: TypeAlias = Union[Query, Mutation]
IPvAddress: TypeAlias = Union[IPv4Address, IPv6Address]
IPAddressError: TypeAlias = Union[IPv4AddressError, IPv6AddressError]
IPAddressDict: TypeAlias = dict[str, IPvAddress]
OptionalIPAddressDict: TypeAlias = dict[str, Optional[IPvAddress]]
ISODateTimeString: TypeAlias = str # not validated datetime (string) format YYYY-MM-DD-hh
CursorID: TypeAlias = str # not validated cursor ID, probably (16 alphabet string)
# TODO: define prefixes the "smarter way"
IPv4PrefixTypeAlias: TypeAlias = Literal[
'/0', '/1', '/2', '/3', '/4', '/5', '/6', '/7', '/8', '/9', '/10', '/11', '/12', '/13', '/14', '/15', '/16',
'/17','/18', '/19', '/20', '/21', '/22', '/23', '/24', '/25', '/26', '/27', '/28', '/29', '/30', '/31', '/32'
]
IPv6PrefixTypeAlias: TypeAlias = Literal[
'/0', '/1', '/2', '/3', '/4', '/5', '/6', '/7', '/8', '/9', '/10', '/11', '/12', '/13', '/14', '/15', '/16',
'/17', '/18', '/19', '/20', '/21', '/22', '/23', '/24', '/25', '/26', '/27', '/28', '/29', '/30', '/31',
'/32', '/33', '/34', '/35', '/36', '/37', '/38', '/39', '/40', '/41', '/42', '/43', '/44', '/45', '/46',
'/47', '/48', '/49', '/50', '/51', '/52', '/53', '/54', '/55', '/56', '/57', '/58', '/59', '/60', '/61',
'/62', '/63', '/64', '/65', '/66', '/67', '/68', '/69', '/70', '/71', '/72', '/73', '/74', '/75', '/76',
'/77', '/78', '/79', '/80', '/81', '/82', '/83', '/84', '/85', '/86', '/87', '/88', '/89', '/90', '/91',
'/92', '/93', '/94', '/95', '/96', '/97', '/98', '/99', '/100', '/101', '/102', '/103', '/104', '/105',
'/106', '/107', '/108', '/109', '/110', '/111', '/112', '/113', '/114', '/115', '/116', '/117', '/118',
'/119', '/120', '/121', '/122', '/123', '/124', '/125', '/126', '/127', '/128'
]
114 changes: 114 additions & 0 deletions resource-manager/python/frinx_worker/resource_manager/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
from collections import namedtuple
from collections.abc import Generator
from contextlib import contextmanager
from http import HTTPStatus
from operator import itemgetter
from typing import Any
from typing import Literal
from typing import Optional
from typing import Union

from frinx.common.conductor_enums import TaskResultStatus
from frinx.common.type_aliases import DictAny
from frinx.common.type_aliases import DictStr
from frinx.common.worker.task_def import TaskOutput
from frinx.common.worker.task_result import TaskResult
from frinx_api.resource_manager import ID
from frinx_api.resource_manager import AllocationStrategy
from frinx_api.resource_manager import PoolCapacityPayload
from frinx_api.resource_manager import QueryAllocationStrategiesQuery
from frinx_api.resource_manager import QueryAllocationStrategiesQueryResponse
from frinx_api.resource_manager import QueryPoolCapacityQuery
from frinx_api.resource_manager import QueryPoolCapacityQueryResponse
from frinx_api.resource_manager import QueryResourceTypesQuery
from frinx_api.resource_manager import QueryResourceTypesQueryResponse
from frinx_api.resource_manager import ResourceType
from graphql_pydantic_converter.graphql_types import concatenate_queries
from pydantic import IPvAnyAddress
from python_graphql_client import GraphqlClient
from requests.exceptions import HTTPError

from frinx_worker.resource_manager.env import RESOURCE_MANAGER_URL_BASE
from frinx_worker.resource_manager.errors import InvalidQueryError
from frinx_worker.resource_manager.errors import PoolNotFoundError
from frinx_worker.resource_manager.models_and_enums import ResourceTypeEnum
from frinx_worker.resource_manager.type_aliases import GraphQueryRenderer
from frinx_worker.resource_manager.type_aliases import IPvAddress


@contextmanager
def qraphql_client_manager(endpoint: str, headers: Optional[DictStr] = None,
**kwargs: Any) -> Generator[GraphqlClient, None, None]:
client = GraphqlClient(endpoint=endpoint, headers=headers or {}, **kwargs)
try:
yield client
except HTTPError as error:
match error.response.status_code:
case HTTPStatus.UNPROCESSABLE_ENTITY:
raise InvalidQueryError()
# TODO: Enhance the error handling.
case _:
raise error


def execute_query(graph_query: Union[GraphQueryRenderer, str], variables: Optional[DictAny] = None,
operation_name: Optional[str] = None, headers: Optional[DictStr] = None, **kwargs) -> DictAny:
with qraphql_client_manager(endpoint=RESOURCE_MANAGER_URL_BASE) as client:
return client.execute(
query=graph_query if isinstance(graph_query, str) else graph_query.render(),
variables=variables or {},
operation_name=operation_name,
headers=headers or {},
**kwargs)


def get_resource_type_and_allocation_strategy_id(resource_type: ResourceTypeEnum) -> tuple[ID, ID]:
query_resource = QueryResourceTypesQuery(byName=resource_type, payload=ResourceType(id=True))
query_allocation = QueryAllocationStrategiesQuery(byName=resource_type, payload=AllocationStrategy(id=True))
data = execute_query(concatenate_queries([query_resource, query_allocation]))
resource, allocation = QueryResourceTypesQueryResponse(**data), QueryAllocationStrategiesQueryResponse(**data)
return resource.data.query_resource_types[0].id, allocation.data.query_allocation_strategies[0].id


def get_free_and_utilized_capacity_of_pool(pool_id: ID) -> Optional[tuple[int, int]]:
query = QueryPoolCapacityQuery(
poolId=pool_id, payload=PoolCapacityPayload(freeCapacity=True, utilizedCapacity=True))
response_model = QueryPoolCapacityQueryResponse(**execute_query(query))
try:
return (int(response_model.data.query_pool_capacity.free_capacity),
int(response_model.data.query_pool_capacity.utilized_capacity))
except AttributeError:
raise PoolNotFoundError(pool_id)


def get_max_prefix_len(ipv: Literal[ResourceTypeEnum.IPV4, ResourceTypeEnum.IPV6]) -> int:
return 128 if ipv is ResourceTypeEnum.IPV6 else 32


def calculate_available_prefixes(free_capacity: int,
ip_version: Literal[ResourceTypeEnum.IPV4, ResourceTypeEnum.IPV6]) -> DictStr:
calculated_prefixes = {}
max_bit_size = get_max_prefix_len(ip_version)

for prefix in range(1, max_bit_size + 1):
prefix_capacity = pow(2, max_bit_size - prefix)
if prefix_capacity <= free_capacity:
result = free_capacity // prefix_capacity
calculated_prefixes[f'/{prefix}'] = str(result)

return calculated_prefixes


IPAddressOwner = namedtuple('IPAddressOwner', ['owner', 'ip_address'])


def sorted_owners_by_ip_addresses(*, reverse: bool = False,
**ip_addresses: Optional[Union[IPvAddress, IPvAnyAddress]]) -> list[IPAddressOwner]:
return [IPAddressOwner(*_) for _ in sorted(ip_addresses.items(), key=itemgetter(1), reverse=reverse)]


def execute_graph_query__return_task_result(task_output: type[TaskOutput],
graph_query: Union[GraphQueryRenderer, str]) -> TaskResult:
result = execute_query(graph_query)
status = TaskResultStatus.FAILED if 'errors' in result else TaskResultStatus.COMPLETED
return TaskResult(status=status, output=task_output(result=result))
6 changes: 3 additions & 3 deletions resource-manager/python/poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading