From 38f8fdaa46d9cb72e8cedceda112970c43dffc06 Mon Sep 17 00:00:00 2001 From: Miroslav Hudec Date: Mon, 9 Dec 2024 13:24:21 +0100 Subject: [PATCH] [uniconfig] Add ReadStructureData with jsonb filter --- .../frinx_worker/uniconfig/structured_data.py | 59 +++++++++++++++++++ 1 file changed, 59 insertions(+) diff --git a/uniconfig/python/frinx_worker/uniconfig/structured_data.py b/uniconfig/python/frinx_worker/uniconfig/structured_data.py index 95136a6..c772801 100644 --- a/uniconfig/python/frinx_worker/uniconfig/structured_data.py +++ b/uniconfig/python/frinx_worker/uniconfig/structured_data.py @@ -67,6 +67,65 @@ def execute(self, worker_input: WorkerInput) -> TaskResult[WorkerOutput]: return handle_response(response, self.WorkerOutput) + class ReadStructuredDataWithJsonBFilter(WorkerImpl): + from frinx_api.uniconfig.rest_api import ReadStructuredData as UniconfigApi + + class ExecutionProperties(TaskExecutionProperties): + exclude_empty_inputs: bool = True + + class WorkerDefinition(TaskDefinition): + name: str = "UNICONFIG_read_structured_device_data_with_jsonb_filter" + description: str = ( + "Read device configuration or operational data in structured format e.g. openconfig " + "and filter the output using jsonb (JsonPath) filter" + ) + labels: list[str] = ["BASICS", "UNICONFIG", "OPENCONFIG"] + + class WorkerInput(TaskInput): + # url + node_id: str + uri: str | None = None + topology_id: str = "uniconfig" + # jsonb request parameters + jsonb_filter: str + add_parent_structure: bool = True + # uniconfig + transaction_id: str | None = None + uniconfig_server_id: str | None = None + uniconfig_url_base: str = UNICONFIG_URL_BASE + + class WorkerOutput(TaskOutput): + output: DictAny + + def execute(self, worker_input: WorkerInput) -> TaskResult[WorkerOutput]: + uri = "" + if worker_input.uri: + if not worker_input.uri.startswith("/"): + uri = f"/{worker_input.uri}" + else: + uri = worker_input.uri + + escaped_node_id = escape_uniconfig_uri_key(worker_input.node_id) + url = worker_input.uniconfig_url_base + self.UniconfigApi.uri.format( + topology_id=worker_input.topology_id, node_id=escaped_node_id, uri=uri + ) + + response = requests.request( + url=url, + method="POST", + cookies=uniconfig_zone_to_cookie( + uniconfig_server_id=worker_input.uniconfig_server_id, transaction_id=worker_input.transaction_id + ), + headers={"Content-Type": "application/filter-data+json"}, + params=UNICONFIG_REQUEST_PARAMS, + json={ + "query": worker_input.jsonb_filter, + "addParentStructure": worker_input.add_parent_structure + } + ) + + return handle_response(response, self.WorkerOutput) + class WriteStructuredData(WorkerImpl): from frinx_api.uniconfig.rest_api import ReadStructuredData as UniconfigApi