Skip to content

Commit

Permalink
feat: Tidy per-scanner calibration table
Browse files Browse the repository at this point in the history
- Cleaned up calibration table display
- Linting and ruff fixes
  • Loading branch information
agittins committed Dec 1, 2024
1 parent 9a5e567 commit d6516d2
Showing 1 changed file with 66 additions and 63 deletions.
129 changes: 66 additions & 63 deletions custom_components/bermuda/config_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
DEFAULT_REF_POWER,
DEFAULT_SMOOTHING_SAMPLES,
DEFAULT_UPDATE_INTERVAL,
DISTANCE_INFINITE,
DOMAIN,
DOMAIN_PRIVATE_BLE_DEVICE,
NAME,
Expand Down Expand Up @@ -175,7 +176,7 @@ async def async_step_init(self, user_input=None): # pylint: disable=unused-argu

scanner_table += (
f"| {scanner.get("name", "NAME_ERR")}| [{scanner.get("address", "ADDR_ERR")}]"
f"| {status} {int(scanner.get("last_stamp_age")):d} seconds ago.|\n"
f"| {status} {int(scanner.get("last_stamp_age", DISTANCE_INFINITE)):d} seconds ago.|\n"
)
messages["status"] += scanner_table

Expand Down Expand Up @@ -408,42 +409,43 @@ async def async_step_calibration1_global(self, user_input=None):
description_placeholders=_ugly_token_hack
| {"suffix": "After you click Submit, the new distances will be shown here."},
)
results_str = ""
device = self._get_bermuda_device_from_registry(user_input[CONF_DEVICES])
if device is not None:
if user_input[CONF_SCANNERS] in device.scanners:
scanner = device.scanners[user_input[CONF_SCANNERS]]
else:
return self.async_show_form(
step_id="calibration_global",
errors={"err_scanner_no_record": "The selected scanner hasn't (yet) seen this device."},
data_schema=vol.Schema(data_schema),
description_placeholders=_ugly_token_hack
| {"suffix": "After you click Submit, the new distances will be shown here."},
)

if user_input[CONF_SCANNERS] in device.scanners:
scanner = device.scanners[user_input[CONF_SCANNERS]]
else:
return self.async_show_form(
step_id="calibration_global",
errors={"err_scanner_no_record": "The selected scanner hasn't (yet) seen this device."},
data_schema=vol.Schema(data_schema),
description_placeholders=_ugly_token_hack
| {"suffix": "After you click Submit, the new distances will be shown here."},
)

distances = [
rssi_to_metres(historical_rssi, self._last_ref_power, self._last_attenuation)
for historical_rssi in scanner.hist_rssi
]

# Build a markdown table showing distance and rssi history for the
# selected device / scanner combination
results_str = f"| {device.name} |"
# Limit the number of columns to what's available up to a max of 5.
cols = min(5, len(distances), len(scanner.hist_rssi))
for i in range(cols):
results_str += f" {i} |"
results_str += "\n|---|"
for i in range(cols): # noqa for unused var i
results_str += "---:|"

results_str += "\n| Estimate (m) |"
for i in range(cols):
results_str += f" `{distances[i]:>5.2f}`|"
results_str += "\n| RSSI Actual |"
for i in range(cols):
results_str += f" `{scanner.hist_rssi[i]:>5}`|"
results_str += "\n"
distances = [
rssi_to_metres(historical_rssi, self._last_ref_power, self._last_attenuation)
for historical_rssi in scanner.hist_rssi
]

# Build a markdown table showing distance and rssi history for the
# selected device / scanner combination
results_str = f"| {device.name} |"
# Limit the number of columns to what's available up to a max of 5.
cols = min(5, len(distances), len(scanner.hist_rssi))
for i in range(cols):
results_str += f" {i} |"
results_str += "\n|---|"
for i in range(cols): # noqa for unused var i
results_str += "---:|"

results_str += "\n| Estimate (m) |"
for i in range(cols):
results_str += f" `{distances[i]:>5.2f}`|"
results_str += "\n| RSSI Actual |"
for i in range(cols):
results_str += f" `{scanner.hist_rssi[i]:>5}`|"
results_str += "\n"

return self.async_show_form(
step_id="calibration1_global",
Expand Down Expand Up @@ -516,34 +518,35 @@ async def async_step_calibration2_scanners(self, user_input=None):
data_schema=vol.Schema(data_schema),
description_placeholders={"suffix": "After you click Submit, the new distances will be shown here."},
)
device = self._get_bermuda_device_from_registry(self._last_device)
results = {}
# Gather new estimates for distances using rssi hist and the new offset.
for scanner in self.coordinator.scanner_list:
scanner_name = self.coordinator.devices[scanner].name
cur_offset = self._last_scanner_info.get(scanner_name, 0)
if scanner in device.scanners:
results[scanner_name] = [
rssi_to_metres(
historical_rssi + cur_offset,
self.options.get(CONF_REF_POWER, DEFAULT_REF_POWER),
self.options.get(CONF_ATTENUATION, DEFAULT_ATTENUATION),
)
for historical_rssi in device.scanners[scanner].hist_rssi
]
# Format the results for display (HA has full markdown support!)
results_str = "| Scanner | Measurements (new...old)|\n|---|---|"
for scanner_name, distances in results.items():
results_str += f"\n|{scanner_name}|<pre>"
i = 0
for distance in distances:
# limit how many columns we'll dump
i += 1
if i > 5:
continue
# We round to 2 places (1cm) and pad to fit nn.nn
results_str += f" {distance:>6.2f}"
results_str += "</pre>|"
if isinstance(self._last_device, str):
device = self._get_bermuda_device_from_registry(self._last_device)
results_str = ""
if device is not None and isinstance(self._last_scanner_info, dict):
results = {}
# Gather new estimates for distances using rssi hist and the new offset.
for scanner in self.coordinator.scanner_list:
scanner_name = self.coordinator.devices[scanner].name
cur_offset = self._last_scanner_info.get(scanner_name, 0)
if scanner in device.scanners:
results[scanner_name] = [
rssi_to_metres(
historical_rssi + cur_offset,
self.options.get(CONF_REF_POWER, DEFAULT_REF_POWER),
self.options.get(CONF_ATTENUATION, DEFAULT_ATTENUATION),
)
for historical_rssi in device.scanners[scanner].hist_rssi
]
# Format the results for display (HA has full markdown support!)
results_str = "| Scanner | 0 | 1 | 2 | 3 | 4 |\n|---|---:|---:|---:|---:|---:|"
for scanner_name, distances in results.items():
results_str += f"\n|{scanner_name}|"
for i in range(5):
# We round to 2 places (1cm) and pad to fit nn.nn
try:
results_str += f" `{distances[i]:>6.2f}`|"
except IndexError:
results_str += "`-`|"
results_str += "\n\n"

return self.async_show_form(
step_id="calibration2_scanners",
Expand Down

0 comments on commit d6516d2

Please sign in to comment.