Skip to content

Commit

Permalink
[uniconfig] Add ReadStructureData with jsonb filter
Browse files Browse the repository at this point in the history
  • Loading branch information
mirohudec committed Jan 10, 2025
1 parent 9d782e1 commit 38f8fda
Showing 1 changed file with 59 additions and 0 deletions.
59 changes: 59 additions & 0 deletions uniconfig/python/frinx_worker/uniconfig/structured_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,65 @@ def execute(self, worker_input: WorkerInput) -> TaskResult[WorkerOutput]:

return handle_response(response, self.WorkerOutput)

class ReadStructuredDataWithJsonBFilter(WorkerImpl):
from frinx_api.uniconfig.rest_api import ReadStructuredData as UniconfigApi

class ExecutionProperties(TaskExecutionProperties):
exclude_empty_inputs: bool = True

class WorkerDefinition(TaskDefinition):
name: str = "UNICONFIG_read_structured_device_data_with_jsonb_filter"
description: str = (
"Read device configuration or operational data in structured format e.g. openconfig "
"and filter the output using jsonb (JsonPath) filter"
)
labels: list[str] = ["BASICS", "UNICONFIG", "OPENCONFIG"]

class WorkerInput(TaskInput):
# url
node_id: str
uri: str | None = None
topology_id: str = "uniconfig"
# jsonb request parameters
jsonb_filter: str
add_parent_structure: bool = True
# uniconfig
transaction_id: str | None = None
uniconfig_server_id: str | None = None
uniconfig_url_base: str = UNICONFIG_URL_BASE

class WorkerOutput(TaskOutput):
output: DictAny

def execute(self, worker_input: WorkerInput) -> TaskResult[WorkerOutput]:
uri = ""
if worker_input.uri:
if not worker_input.uri.startswith("/"):
uri = f"/{worker_input.uri}"
else:
uri = worker_input.uri

escaped_node_id = escape_uniconfig_uri_key(worker_input.node_id)
url = worker_input.uniconfig_url_base + self.UniconfigApi.uri.format(
topology_id=worker_input.topology_id, node_id=escaped_node_id, uri=uri
)

response = requests.request(
url=url,
method="POST",
cookies=uniconfig_zone_to_cookie(
uniconfig_server_id=worker_input.uniconfig_server_id, transaction_id=worker_input.transaction_id
),
headers={"Content-Type": "application/filter-data+json"},
params=UNICONFIG_REQUEST_PARAMS,
json={
"query": worker_input.jsonb_filter,
"addParentStructure": worker_input.add_parent_structure
}
)

return handle_response(response, self.WorkerOutput)

class WriteStructuredData(WorkerImpl):
from frinx_api.uniconfig.rest_api import ReadStructuredData as UniconfigApi

Expand Down

0 comments on commit 38f8fda

Please sign in to comment.