Skip to content

Commit

Permalink
feat: add support for A6/A7
Browse files Browse the repository at this point in the history
  • Loading branch information
Lash-L committed Jul 5, 2024
1 parent ed3d403 commit b9e6260
Show file tree
Hide file tree
Showing 3 changed files with 150 additions and 9 deletions.
41 changes: 33 additions & 8 deletions src/anova_wifi/web_socket_containers.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@
@dataclass
class APCUpdateBinary:
cooking: bool
preheating: bool
maintaining: bool
preheating: bool | None = None
maintaining: bool | None = None
device_safe: bool | None = None
water_leak: bool | None = None
water_level_critical: bool | None = None
Expand Down Expand Up @@ -121,9 +121,9 @@ class WifiPinInfo:

@dataclass
class WifiSystemInfo:
class_name: str | None
firmware_version: str
type: str
class_name: str | None = None
type: str | None = None


@dataclass
Expand Down Expand Up @@ -255,7 +255,7 @@ def build_wifi_cooker_state_body(apc_response: dict[str, Any]) -> WifiCookerStat
system_info = WifiSystemInfo(
firmware_version=system_info_json["firmware-version"],
class_name=system_info_json.get("class"),
type=system_info_json["type"],
type=system_info_json.get("type"),
)
system_info_3220_json: dict[str, str] | None = apc_response.get("system-info-3220")
if system_info_3220_json:
Expand Down Expand Up @@ -312,15 +312,18 @@ def build_a3_payload(apc_response: dict[str, Any]) -> APCUpdate:
# is_speaker_on = apc_response.get("isSpeakerOn")
# is_alarm_active = apc_response.get("isAlarmActive")
# current_job_id = apc_response.get("currentJobID")
current_job: dict[str, Any] = apc_response["currentJob"]
current_job = apc_response.get("currentJob")
# is_keeping_warm = apc_response.get("isKeepingWarming")
# is_checking_temperature_for_ice_bath = apc_response.get(
# "isCheckingTemperatureForIceBath"
# )
# is_monitoring_ice_bath = apc_response.get("isMonitoringIcebath")
# is_connected = apc_response.get("isConnected")
job_stage: str = current_job["jobStage"]
status = AnovaA3State(job_stage)
if current_job is not None:
job_stage: str = current_job["jobStage"]
status = AnovaA3State(job_stage)
else:
status = AnovaA3State.no_state
sensors = APCUpdateSensor(
a3_state=status.name,
target_temperature=float(target_temperature),
Expand All @@ -339,6 +342,28 @@ def build_a3_payload(apc_response: dict[str, Any]) -> APCUpdate:
return APCUpdate(binary_sensors, sensors)


def build_a6_a7_payload(apc_response: dict[str, Any]) -> APCUpdate:
system_info = apc_response["systemInfo"]
firmware_version = system_info["firmwareVersion"]
mode = apc_response["state"]["mode"]
nodes = apc_response["nodes"]
sensors = APCUpdateSensor(
firmware_version=firmware_version,
mode=mode,
water_temperature=float(nodes["waterTemperatureSensor"]["current"]["celsius"]),
target_temperature=float(
nodes["waterTemperatureSensor"]["setpoint"]["celsius"]
),
cook_time=int(nodes["timer"]["initial"]),
)
binary_sensors = APCUpdateBinary(
cooking=bool(mode == "cook"),
water_level_low=bool(nodes["lowWater"]["warning"]),
water_level_critical=bool(nodes["lowWater"]["empty"]),
)
return APCUpdate(binary_sensors, sensors)


@dataclass
class APCWifiDevice:
cooker_id: str
Expand Down
90 changes: 90 additions & 0 deletions tests/example_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,3 +188,93 @@
},
},
}

A7_COOKING: dict[str, Any] = {
"command": "EVENT_APC_STATE",
"payload": {
"cookerId": "a7_id",
"type": "a7",
"state": {
"cook": {
"activeStageIndex": 0,
"stageTransitionPendingUserAction": False,
"activeStageId": "6b70634f-ed6b-9154-6b30-05e73cec7a4e",
"stages": [
{
"do": {
"timer": {
"entry": {
"conditions": {
"and": {
"nodes.waterTemperatureSensor.current.celsius": {
">=": 54.17,
"<=": 54.77,
}
}
}
},
"initial": 0,
},
"type": "cook",
"waterTemperatureSensor": {"setpoint": {"celsius": 54.47}},
},
"id": "6b70634f-ed6b-9154-6b30-05e73cec7a4e",
"entry": {
"conditions": {
"and": {
"nodes.waterTemperatureSensor.current.celsius": {
"<=": 54.77,
">=": 54.17,
}
}
}
},
"exit": {"conditions": {}},
"title": "Cook Stage",
}
],
"originSource": "hardware",
"activeStageStartedTimestamp": "2024-07-05T21:48:29Z",
"startedTimestamp": "2024-07-05T21:48:29Z",
"activeStageMode": "running",
"cookId": "a098de4a-8714-2dd8-1bdc-d067008f2d4f",
},
"nodes": {
"lowWater": {"empty": False, "warning": False},
"timer": {
"mode": "running",
"initial": 1200,
"startedAtTimestamp": "2024-07-05T21:49:10Z",
},
"waterTemperatureSensor": {
"enabled": True,
"setpoint": {"celsius": 54.46},
"current": {"celsius": 55.27},
},
},
"state": {
"temperatureUnit": "F",
"resumeAfterPowerInterruption": False,
"processedCommandIds": [
"876cfbc9-03f1-400b-89ec-ef5391270846",
"e2e3ce06-45b4-4485-b4fa-9211c082b90d",
],
"mode": "cook",
},
"systemInfo": {
"firmwareVersion": "01.01.25",
"deviceId": "5yMOd0D2lasOE2U8SkcIcS",
"lastDisconnectedTimestamp": "2024-07-05T21:47:59Z",
"triacsFailed": False,
"releaseTrack": "production",
"hardwareVersion": "1.0.0",
"lastConnectedTimestamp": "2024-07-05T21:47:59Z",
"online": True,
"firmwareUpdatedTimestamp": "2024-06-07T18:56:03Z",
},
"metadata": {},
"updatedTimestamp": "2024-07-05T21:49:56Z",
"version": 1,
},
},
}
28 changes: 27 additions & 1 deletion tests/test_containers.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,16 @@
AnovaA3State,
AnovaState,
build_a3_payload,
build_a6_a7_payload,
build_wifi_cooker_state_body,
)
from tests.example_data import A3_MESSAGE, A4_MESSAGE
from tests.example_data import (
A3_MESSAGE,
A4_MESSAGE,
A6_MESSAGE,
A7_COOKING,
A7_MESSAGE,
)


def test_a3_payload():
Expand All @@ -21,3 +28,22 @@ def test_a4_payload():
assert resp.sensor.target_temperature == 66.11
assert resp.sensor.state == AnovaState.cooking.name
assert resp.sensor.cook_time_remaining == 0


def test_a7_payload():
resp = build_a6_a7_payload(A7_MESSAGE["payload"]["state"])
assert resp.sensor.mode == "idle"
assert resp.sensor.cook_time == 36000


def test_a7_cooking():
resp = build_a6_a7_payload(A7_COOKING["payload"]["state"])
assert resp.sensor.mode == "cook"
assert resp.sensor.cook_time == 1200


def test_a6_payload():
resp = build_a6_a7_payload(A6_MESSAGE["payload"]["state"])
assert resp.sensor.mode == "idle"
assert resp.sensor.cook_time == 7200
assert resp.sensor.target_temperature == 57.2

0 comments on commit b9e6260

Please sign in to comment.