From b9e62607d933f9da406dfb40c4e47521af40429c Mon Sep 17 00:00:00 2001 From: Luke Date: Fri, 5 Jul 2024 18:09:38 -0400 Subject: [PATCH] feat: add support for A6/A7 --- src/anova_wifi/web_socket_containers.py | 41 ++++++++--- tests/example_data.py | 90 +++++++++++++++++++++++++ tests/test_containers.py | 28 +++++++- 3 files changed, 150 insertions(+), 9 deletions(-) diff --git a/src/anova_wifi/web_socket_containers.py b/src/anova_wifi/web_socket_containers.py index 20a2269..ab9c213 100644 --- a/src/anova_wifi/web_socket_containers.py +++ b/src/anova_wifi/web_socket_containers.py @@ -9,8 +9,8 @@ @dataclass class APCUpdateBinary: cooking: bool - preheating: bool - maintaining: bool + preheating: bool | None = None + maintaining: bool | None = None device_safe: bool | None = None water_leak: bool | None = None water_level_critical: bool | None = None @@ -121,9 +121,9 @@ class WifiPinInfo: @dataclass class WifiSystemInfo: - class_name: str | None firmware_version: str - type: str + class_name: str | None = None + type: str | None = None @dataclass @@ -255,7 +255,7 @@ def build_wifi_cooker_state_body(apc_response: dict[str, Any]) -> WifiCookerStat system_info = WifiSystemInfo( firmware_version=system_info_json["firmware-version"], class_name=system_info_json.get("class"), - type=system_info_json["type"], + type=system_info_json.get("type"), ) system_info_3220_json: dict[str, str] | None = apc_response.get("system-info-3220") if system_info_3220_json: @@ -312,15 +312,18 @@ def build_a3_payload(apc_response: dict[str, Any]) -> APCUpdate: # is_speaker_on = apc_response.get("isSpeakerOn") # is_alarm_active = apc_response.get("isAlarmActive") # current_job_id = apc_response.get("currentJobID") - current_job: dict[str, Any] = apc_response["currentJob"] + current_job = apc_response.get("currentJob") # is_keeping_warm = apc_response.get("isKeepingWarming") # is_checking_temperature_for_ice_bath = apc_response.get( # "isCheckingTemperatureForIceBath" # ) # is_monitoring_ice_bath = apc_response.get("isMonitoringIcebath") # is_connected = apc_response.get("isConnected") - job_stage: str = current_job["jobStage"] - status = AnovaA3State(job_stage) + if current_job is not None: + job_stage: str = current_job["jobStage"] + status = AnovaA3State(job_stage) + else: + status = AnovaA3State.no_state sensors = APCUpdateSensor( a3_state=status.name, target_temperature=float(target_temperature), @@ -339,6 +342,28 @@ def build_a3_payload(apc_response: dict[str, Any]) -> APCUpdate: return APCUpdate(binary_sensors, sensors) +def build_a6_a7_payload(apc_response: dict[str, Any]) -> APCUpdate: + system_info = apc_response["systemInfo"] + firmware_version = system_info["firmwareVersion"] + mode = apc_response["state"]["mode"] + nodes = apc_response["nodes"] + sensors = APCUpdateSensor( + firmware_version=firmware_version, + mode=mode, + water_temperature=float(nodes["waterTemperatureSensor"]["current"]["celsius"]), + target_temperature=float( + nodes["waterTemperatureSensor"]["setpoint"]["celsius"] + ), + cook_time=int(nodes["timer"]["initial"]), + ) + binary_sensors = APCUpdateBinary( + cooking=bool(mode == "cook"), + water_level_low=bool(nodes["lowWater"]["warning"]), + water_level_critical=bool(nodes["lowWater"]["empty"]), + ) + return APCUpdate(binary_sensors, sensors) + + @dataclass class APCWifiDevice: cooker_id: str diff --git a/tests/example_data.py b/tests/example_data.py index e619b92..05b46a7 100644 --- a/tests/example_data.py +++ b/tests/example_data.py @@ -188,3 +188,93 @@ }, }, } + +A7_COOKING: dict[str, Any] = { + "command": "EVENT_APC_STATE", + "payload": { + "cookerId": "a7_id", + "type": "a7", + "state": { + "cook": { + "activeStageIndex": 0, + "stageTransitionPendingUserAction": False, + "activeStageId": "6b70634f-ed6b-9154-6b30-05e73cec7a4e", + "stages": [ + { + "do": { + "timer": { + "entry": { + "conditions": { + "and": { + "nodes.waterTemperatureSensor.current.celsius": { + ">=": 54.17, + "<=": 54.77, + } + } + } + }, + "initial": 0, + }, + "type": "cook", + "waterTemperatureSensor": {"setpoint": {"celsius": 54.47}}, + }, + "id": "6b70634f-ed6b-9154-6b30-05e73cec7a4e", + "entry": { + "conditions": { + "and": { + "nodes.waterTemperatureSensor.current.celsius": { + "<=": 54.77, + ">=": 54.17, + } + } + } + }, + "exit": {"conditions": {}}, + "title": "Cook Stage", + } + ], + "originSource": "hardware", + "activeStageStartedTimestamp": "2024-07-05T21:48:29Z", + "startedTimestamp": "2024-07-05T21:48:29Z", + "activeStageMode": "running", + "cookId": "a098de4a-8714-2dd8-1bdc-d067008f2d4f", + }, + "nodes": { + "lowWater": {"empty": False, "warning": False}, + "timer": { + "mode": "running", + "initial": 1200, + "startedAtTimestamp": "2024-07-05T21:49:10Z", + }, + "waterTemperatureSensor": { + "enabled": True, + "setpoint": {"celsius": 54.46}, + "current": {"celsius": 55.27}, + }, + }, + "state": { + "temperatureUnit": "F", + "resumeAfterPowerInterruption": False, + "processedCommandIds": [ + "876cfbc9-03f1-400b-89ec-ef5391270846", + "e2e3ce06-45b4-4485-b4fa-9211c082b90d", + ], + "mode": "cook", + }, + "systemInfo": { + "firmwareVersion": "01.01.25", + "deviceId": "5yMOd0D2lasOE2U8SkcIcS", + "lastDisconnectedTimestamp": "2024-07-05T21:47:59Z", + "triacsFailed": False, + "releaseTrack": "production", + "hardwareVersion": "1.0.0", + "lastConnectedTimestamp": "2024-07-05T21:47:59Z", + "online": True, + "firmwareUpdatedTimestamp": "2024-06-07T18:56:03Z", + }, + "metadata": {}, + "updatedTimestamp": "2024-07-05T21:49:56Z", + "version": 1, + }, + }, +} diff --git a/tests/test_containers.py b/tests/test_containers.py index 38a1bf6..1e84aa4 100644 --- a/tests/test_containers.py +++ b/tests/test_containers.py @@ -2,9 +2,16 @@ AnovaA3State, AnovaState, build_a3_payload, + build_a6_a7_payload, build_wifi_cooker_state_body, ) -from tests.example_data import A3_MESSAGE, A4_MESSAGE +from tests.example_data import ( + A3_MESSAGE, + A4_MESSAGE, + A6_MESSAGE, + A7_COOKING, + A7_MESSAGE, +) def test_a3_payload(): @@ -21,3 +28,22 @@ def test_a4_payload(): assert resp.sensor.target_temperature == 66.11 assert resp.sensor.state == AnovaState.cooking.name assert resp.sensor.cook_time_remaining == 0 + + +def test_a7_payload(): + resp = build_a6_a7_payload(A7_MESSAGE["payload"]["state"]) + assert resp.sensor.mode == "idle" + assert resp.sensor.cook_time == 36000 + + +def test_a7_cooking(): + resp = build_a6_a7_payload(A7_COOKING["payload"]["state"]) + assert resp.sensor.mode == "cook" + assert resp.sensor.cook_time == 1200 + + +def test_a6_payload(): + resp = build_a6_a7_payload(A6_MESSAGE["payload"]["state"]) + assert resp.sensor.mode == "idle" + assert resp.sensor.cook_time == 7200 + assert resp.sensor.target_temperature == 57.2