diff --git a/inventory/python/CHANGELOG.md b/inventory/python/CHANGELOG.md index e95b900..7ab7fa3 100644 --- a/inventory/python/CHANGELOG.md +++ b/inventory/python/CHANGELOG.md @@ -20,4 +20,9 @@ # 1.2.1 - Updated inventory AddDevice and UpdateDevice settings, so they can also contain empty values. - Useful when adding devices that have failed during the workflow. \ No newline at end of file + Useful when adding devices that have failed during the workflow. + +# 1.3.0 +- Bumped version of frinx-inventory-api to 2.1.0. +- Implemented new inventory workers for following mutations and query: + AddStreamMutation, UpdateStreamMutation, DeleteStreamMutation, StreamsQuery. diff --git a/inventory/python/frinx_worker/inventory/__init__.py b/inventory/python/frinx_worker/inventory/__init__.py index 16ddcc7..08f6d42 100644 --- a/inventory/python/frinx_worker/inventory/__init__.py +++ b/inventory/python/frinx_worker/inventory/__init__.py @@ -18,11 +18,17 @@ from frinx_api.inventory import AddDeviceInput from frinx_api.inventory import AddDeviceMutation from frinx_api.inventory import AddDevicePayload +from frinx_api.inventory import AddStreamInput +from frinx_api.inventory import AddStreamMutation +from frinx_api.inventory import AddStreamPayload +from frinx_api.inventory import Blueprint from frinx_api.inventory import CreateLabelInput from frinx_api.inventory import CreateLabelMutation from frinx_api.inventory import CreateLabelPayload from frinx_api.inventory import DeleteDeviceMutation from frinx_api.inventory import DeleteDevicePayload +from frinx_api.inventory import DeleteStreamMutation +from frinx_api.inventory import DeleteStreamPayload from frinx_api.inventory import Device from frinx_api.inventory import DeviceConnection from frinx_api.inventory import DeviceEdge @@ -30,6 +36,7 @@ from frinx_api.inventory import DeviceSize from frinx_api.inventory import DevicesQuery from frinx_api.inventory import FilterDevicesInput +from frinx_api.inventory import FilterStreamsInput from frinx_api.inventory import InstallDeviceMutation from frinx_api.inventory import InstallDevicePayload from frinx_api.inventory import Label @@ -37,16 +44,24 @@ from frinx_api.inventory import LabelEdge from frinx_api.inventory import LabelsQuery from frinx_api.inventory import PageInfo +from frinx_api.inventory import Stream +from frinx_api.inventory import StreamConnection +from frinx_api.inventory import StreamEdge +from frinx_api.inventory import StreamsQuery from frinx_api.inventory import UninstallDeviceMutation from frinx_api.inventory import UninstallDevicePayload from frinx_api.inventory import UpdateDeviceInput from frinx_api.inventory import UpdateDeviceMutation from frinx_api.inventory import UpdateDevicePayload +from frinx_api.inventory import UpdateStreamInput +from frinx_api.inventory import UpdateStreamMutation +from frinx_api.inventory import UpdateStreamPayload from frinx_api.inventory import Zone from frinx_api.inventory import ZoneEdge from frinx_api.inventory import ZonesConnection from frinx_api.inventory import ZonesQuery from graphql_pydantic_converter.graphql_types import QueryForm +from pydantic import Field from .utils import CursorGroup from .utils import CursorGroups @@ -76,6 +91,39 @@ class DeviceInput(TaskInput): device_type: str | None = None +class StreamWorkerInput(TaskInput): + stream_name: str = Field( + description="Name of the stream.", + ) + device_name: str = Field( + description="Name of the device to which the stream is added.", + ) + blueprint_id: str | None = Field( + description="Blueprint identifier.", + default=None + ) + stream_parameters: DictAny | None = Field( + description="Stream parameters.", + default=None + ) + + +class InventoryWorkerOutput(TaskOutput): + query: str = Field( + description="Constructed GraphQL query.", + ) + variable: DictAny | None = Field( + description="Constructed input GraphQL variables.", + default=None + ) + response_body: Any = Field( + description="Response body.", + ) + response_code: int = Field( + description="Response code.", + ) + + class InventoryService(ServiceWorkersImpl): class InventoryGetDevicesInfo(WorkerImpl): DEVICES: DeviceConnection = DeviceConnection( @@ -117,10 +165,8 @@ class WorkerInput(TaskInput): cursor: str | None = None type: PaginationCursorType | None = None - class WorkerOutput(TaskOutput): - query: str - variable: DictAny | None = None - response: DictAny + class WorkerOutput(InventoryWorkerOutput): + ... def execute(self, worker_input: WorkerInput) -> TaskResult[WorkerOutput]: devices = DevicesQuery( @@ -164,11 +210,8 @@ class WorkerDefinition(TaskDefinition): class WorkerInput(TaskInput): device_id: str - class WorkerOutput(TaskOutput): - query: str - variable: DictAny | None = None - response_code: int - response_body: Any + class WorkerOutput(InventoryWorkerOutput): + ... def execute(self, worker_input: WorkerInput) -> TaskResult[WorkerOutput]: self.install_device.id = worker_input.device_id @@ -194,11 +237,8 @@ class WorkerDefinition(TaskDefinition): class WorkerInput(TaskInput): device_id: str - class WorkerOutput(TaskOutput): - query: str - variable: DictAny | None = None - response_code: int - response_body: Any + class WorkerOutput(InventoryWorkerOutput): + ... def execute(self, worker_input: WorkerInput) -> TaskResult[WorkerOutput]: self.uninstall_device.id = worker_input.device_id @@ -244,11 +284,8 @@ class WorkerDefinition(TaskDefinition): class WorkerInput(TaskInput): device_name: str - class WorkerOutput(TaskOutput): - query: str - variable: DictAny | None = None - response_code: int - response_body: Any + class WorkerOutput(InventoryWorkerOutput): + ... @classmethod def _get_device_id(cls, device_name: str) -> str: @@ -313,10 +350,8 @@ class WorkerDefinition(TaskDefinition): class WorkerInput(TaskInput): device_name: str - class WorkerOutput(TaskOutput): - query: str - variable: DictAny | None = None - response_body: Any + class WorkerOutput(InventoryWorkerOutput): + ... def _get_device_id(self, device_name: str) -> str: query = DevicesQuery(payload=self.DEVICES, filter=FilterDevicesInput(deviceName=device_name)).render() @@ -376,11 +411,8 @@ class WorkerInput(TaskInput): cursor: str | None = None type: PaginationCursorType | None = None - class WorkerOutput(TaskOutput): - query: str - variable: DictAny | None = None - response_code: int - response_body: Any + class WorkerOutput(InventoryWorkerOutput): + ... def execute(self, worker_input: WorkerInput) -> TaskResult[WorkerOutput]: labels = LabelsQuery( @@ -472,10 +504,8 @@ class WorkerDefinition(TaskDefinition): class WorkerInput(TaskInput): label: str - class WorkerOutput(TaskOutput): - query: str - variable: DictAny | None = None - response_body: Any + class WorkerOutput(InventoryWorkerOutput): + ... def execute(self, worker_input: WorkerInput) -> TaskResult[WorkerOutput]: self.create_label.input.name = worker_input.label @@ -510,10 +540,8 @@ class WorkerInput(DeviceInput): device_name: str zone_id: str - class WorkerOutput(TaskOutput): - query: str - variable: DictAny | None = None - response_body: Any + class WorkerOutput(InventoryWorkerOutput): + ... def execute(self, worker_input: WorkerInput) -> TaskResult[WorkerOutput]: self.add_device.input.name = worker_input.device_name @@ -833,23 +861,8 @@ class WorkerInput(TaskInput): Identifier of the removed device. """ - class WorkerOutput(TaskOutput): - query: str - """ - Request GraphQL query. - """ - variable: DictAny | None = None - """ - Request GraphQL variables. - """ - response_body: Any - """ - Response containing the name and identifier of the removed device. - """ - response_code: int - """ - Status of the operation. - """ + class WorkerOutput(InventoryWorkerOutput): + ... def execute(self, worker_input: WorkerInput) -> TaskResult[WorkerOutput]: self.delete_device.id = worker_input.device_id @@ -884,11 +897,8 @@ class WorkerInput(DeviceInput): device_id: str location_id: str | None = None - class WorkerOutput(TaskOutput): - query: str - variable: DictAny | None = None - response_code: int - response_body: Any + class WorkerOutput(InventoryWorkerOutput): + ... def execute(self, worker_input: WorkerInput) -> TaskResult[WorkerOutput]: self.update_device.id = worker_input.device_id @@ -900,6 +910,182 @@ def execute(self, worker_input: WorkerInput) -> TaskResult[WorkerOutput]: response = execute_inventory_query(query=query.query, variables=query.variable) return response_handler(query, response) + class InventoryAddStream(WorkerImpl): + ADD_STREAM: AddStreamPayload = AddStreamPayload(stream=Stream(id=True, createdAt=True)) + + add_stream: AddStreamMutation = AddStreamMutation( + payload=ADD_STREAM, + input=AddStreamInput( + streamName="streamName", + deviceName="deviceName", + streamParameters="{}" + ), + ) + + class ExecutionProperties(TaskExecutionProperties): + exclude_empty_inputs: bool = False + transform_string_to_json_valid: bool = True + + class WorkerDefinition(TaskDefinition): + name: str = "INVENTORY_add_stream" + description: str = "Add stream to inventory database" + labels: ListStr = ["BASICS", "MAIN", "INVENTORY", "STREAMS"] + timeout_seconds: int = 60 + response_timeout_seconds: int = 60 + + class WorkerInput(StreamWorkerInput): + ... + + class WorkerOutput(InventoryWorkerOutput): + ... + + def execute(self, worker_input: WorkerInput) -> TaskResult[WorkerOutput]: + self.add_stream.input.device_name = worker_input.device_name + self.add_stream.input.stream_name = worker_input.stream_name + if worker_input.stream_parameters: + self.add_stream.input.stream_parameters = json.dumps(worker_input.stream_parameters) + if worker_input.blueprint_id: + self.add_stream.input.blueprint_id = worker_input.blueprint_id + + query = self.add_stream.render() + response = execute_inventory_query(query=query.query, variables=query.variable) + return response_handler(query, response) + + class InventoryUpdateStream(WorkerImpl): + UPDATE_STREAM = UpdateStreamPayload(stream=Stream(streamName=True, deviceName=True, updatedAt=True)) + + update_stream = UpdateStreamMutation( + id="streamId", + payload=UPDATE_STREAM, + input=UpdateStreamInput( + streamName="streamName", + deviceName="deviceName" + ) + ) + + class ExecutionProperties(TaskExecutionProperties): + exclude_empty_inputs: bool = True + transform_string_to_json_valid: bool = True + + class WorkerDefinition(TaskDefinition): + name: str = "INVENTORY_update_stream" + description: str = "Update stream in inventory database" + labels: ListStr = ["BASICS", "MAIN", "INVENTORY", "STREAMS"] + timeout_seconds: int = 60 + response_timeout_seconds: int = 60 + + class WorkerInput(StreamWorkerInput): + stream_id: str = Field( + description="Identifier of the stream.", + ) + + class WorkerOutput(InventoryWorkerOutput): + ... + + def execute(self, worker_input: WorkerInput) -> TaskResult[WorkerOutput]: + self.update_stream.id = worker_input.stream_id + self.update_stream.input.stream_name = worker_input.stream_name + self.update_stream.input.device_name = worker_input.device_name + if worker_input.blueprint_id: + self.update_stream.input.blueprint_id = worker_input.blueprint_id + if worker_input.stream_parameters: + self.update_stream.input.stream_parameters = json.dumps(worker_input.stream_parameters) + + query = self.update_stream.render() + response = execute_inventory_query(query=query.query, variables=query.variable) + return response_handler(query, response) + + class InventoryDeleteStream(WorkerImpl): + DELETE_STREAM = DeleteStreamPayload(stream=Stream(streamName=True, deviceName=True)) + + delete_stream = DeleteStreamMutation( + payload=DELETE_STREAM, + id="streamId" + ) + + class ExecutionProperties(TaskExecutionProperties): + exclude_empty_inputs: bool = True + transform_string_to_json_valid: bool = True + + class WorkerDefinition(TaskDefinition): + name: str = "INVENTORY_delete_stream" + description: str = "Delete stream from inventory database" + labels: ListStr = ["BASICS", "MAIN", "INVENTORY", "STREAMS"] + timeout_seconds: int = 60 + response_timeout_seconds: int = 60 + + class WorkerInput(TaskInput): + stream_id: str = Field( + description="Identifier of the stream.", + ) + + class WorkerOutput(InventoryWorkerOutput): + ... + + def execute(self, worker_input: WorkerInput) -> TaskResult[WorkerOutput]: + self.delete_stream.id = worker_input.stream_id + query = self.delete_stream.render() + response = execute_inventory_query(query=query.query, variables=query.variable) + return response_handler(query, response) + + class InventoryGetStreamsInfo(WorkerImpl): + STREAMS: StreamConnection = StreamConnection( + pageInfo=PageInfo(hasNextPage=True, hasPreviousPage=True, startCursor=True, endCursor=True), + edges=StreamEdge( + node=Stream( + id=True, + createdAt=True, + updatedAt=True, + streamName=True, + deviceName=True, + isActive=True, + streamParameters=True, + blueprint=Blueprint( + id=True, + name=True, + ), + ), + cursor=True, + ), + totalCount=True, + ) + + class ExecutionProperties(TaskExecutionProperties): + exclude_empty_inputs: bool = True + + class WorkerDefinition(TaskDefinition): + name: str = "INVENTORY_get_streams_info" + description: str = "Read streams from inventory database" + labels: ListStr = ["BASIC", "INVENTORY", "STREAMS"] + + class WorkerInput(TaskInput): + stream_name: str | None = None + size: int | None = None + cursor: str | None = None + type: PaginationCursorType | None = None + + + class WorkerOutput(InventoryWorkerOutput): + ... + + def execute(self, worker_input: WorkerInput) -> TaskResult[WorkerOutput]: + streams = StreamsQuery( + payload=self.STREAMS, + filter=FilterStreamsInput(streamName=worker_input.stream_name or None), + ) + + match worker_input.type: + case PaginationCursorType.AFTER: + streams.first = worker_input.size + streams.after = worker_input.cursor + case PaginationCursorType.BEFORE: + streams.last = worker_input.size + streams.before = worker_input.cursor + + query = streams.render() + response = execute_inventory_query(query=query.query, variables=query.variable) + return response_handler(query, response) + @staticmethod def _set_device_input_fields( query_input: UpdateDeviceInput | AddDeviceInput, diff --git a/inventory/python/pyproject.toml b/inventory/python/pyproject.toml index a0cc57e..88fe457 100644 --- a/inventory/python/pyproject.toml +++ b/inventory/python/pyproject.toml @@ -19,7 +19,7 @@ packages = [{ include = "frinx_worker" }] name = "frinx-inventory-worker" description = "Conductor worker for Frinx Device Inventory" authors = ["Jozef Volak "] -version = "1.2.2" +version = "1.3.0" readme = ["README.md", "CHANGELOG.md", "RELEASE.md"] keywords = ["frinx-machine", "device inventory", "worker"] license = "Apache 2.0"