Skip to content

Commit

Permalink
Fixes for 2024.12.0 issues #298 from sca075/refactoring_camera
Browse files Browse the repository at this point in the history
Fix issue #294 #297 and probably #295
  • Loading branch information
sca075 authored Dec 7, 2024
2 parents f79f589 + b8b0fea commit 9cb90da
Show file tree
Hide file tree
Showing 6 changed files with 112 additions and 77 deletions.
90 changes: 52 additions & 38 deletions custom_components/mqtt_vacuum_camera/camera.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
"""
Camera
Version: v2024.12.0
Version: v2024.12.1
"""

from __future__ import annotations

import asyncio
from asyncio import gather, get_event_loop
import concurrent.futures
from concurrent.futures import ThreadPoolExecutor
from datetime import timedelta
from io import BytesIO
Expand Down Expand Up @@ -248,8 +247,9 @@ async def handle_obstacle_view(self, event):

if self._shared.camera_mode == CameraModes.OBSTACLE_VIEW:
self._shared.camera_mode = CameraModes.MAP_VIEW
_LOGGER.debug(f"Camera Mode Change to {self._shared.camera_mode}")
self._should_poll = True
return
return await self.async_update()

if (
self._shared.obstacles_data
Expand All @@ -258,6 +258,7 @@ async def handle_obstacle_view(self, event):
_LOGGER.debug(f"Received event: {event.event_type}, Data: {event.data}")
if event.data.get("entity_id") == self.entity_id:
self._shared.camera_mode = CameraModes.OBSTACLE_DOWNLOAD
_LOGGER.debug(f"Camera Mode Change to {self._shared.camera_mode}")
self._should_poll = False # Turn off polling
coordinates = event.data.get("coordinates")
if coordinates:
Expand Down Expand Up @@ -288,11 +289,14 @@ async def handle_obstacle_view(self, event):
)
self._should_poll = True # Turn on polling
self._shared.camera_mode = CameraModes.MAP_VIEW
_LOGGER.debug(
f"Camera Mode Change to {self._shared.camera_mode}"
)
return None
if temp_image is not None:
try:
# Open the downloaded image with PIL
pil_img = Image.open(temp_image)
pil_img = await self.async_open_image(temp_image)

# Resize the image if resize_to is provided
pil_img.thumbnail((self._image_w, self._image_h))
Expand All @@ -304,15 +308,24 @@ async def handle_obstacle_view(self, event):
f"{self._file_name}: Error processing image: {e}"
)
self._shared.camera_mode = CameraModes.MAP_VIEW
_LOGGER.debug(
f"Camera Mode Change to {self._shared.camera_mode}"
)
self._should_poll = True # Turn on polling
return None

self.Image = await self.hass.async_create_task(
self.run_async_pil_to_bytes(pil_img)
)
self._shared.camera_mode = CameraModes.OBSTACLE_VIEW
_LOGGER.debug(
f"Camera Mode Change to {self._shared.camera_mode}"
)
else:
self._shared.camera_mode = CameraModes.MAP_VIEW
_LOGGER.debug(
f"Camera Mode Change to {self._shared.camera_mode}"
)
self._should_poll = True # Turn on polling
else:
_LOGGER.debug("No nearby obstacle found.")
Expand All @@ -326,9 +339,9 @@ async def handle_obstacle_view(self, event):
async def _async_find_nearest_obstacle(x, y, obstacles):
"""Find the nearest obstacle to the given coordinates."""
nearest_obstacle = None
min_distance = float("inf") # Start with a very large distance
min_distance = 500 # Start with a very large distance
_LOGGER.debug(
f"Finding the nearest {min_distance} obstacle to coordinates: {x}, {y}"
f"Finding in the nearest {min_distance} pixels obstacle to coordinates: {x}, {y}"
)

for obstacle in obstacles:
Expand All @@ -345,10 +358,26 @@ async def _async_find_nearest_obstacle(x, y, obstacles):

return nearest_obstacle

async def async_open_image(self, file_path) -> Image.Image:
"""
Asynchronously open an image file using a thread pool.
Args:
file_path (str): Path to the image file.
Returns:
Image.Image: Opened PIL image.
"""
executor = ThreadPoolExecutor(
max_workers=1, thread_name_prefix=f"{self._file_name}_camera"
)
loop = asyncio.get_running_loop()
pil_img = await loop.run_in_executor(executor, Image.open, file_path)
return pil_img

@staticmethod
async def download_image(url: str, storage_path: str, filename: str):
"""
Asynchronously download an image using threading to avoid blocking.
Asynchronously download an image without blocking.
Args:
url (str): The URL to download the image from.
Expand All @@ -358,38 +387,23 @@ async def download_image(url: str, storage_path: str, filename: str):
Returns:
str: The full path to the saved image or None if the download fails.
"""
# Ensure the storage path exists
os.makedirs(storage_path, exist_ok=True)

obstacle_file = os.path.join(storage_path, filename)

async def blocking_download():
"""Run the blocking download in a separate thread."""
try:
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
if response.status == 200:
with open(obstacle_file, "wb") as f:
f.write(await response.read())
_LOGGER.debug(
f"Image downloaded successfully: {obstacle_file}"
)
return obstacle_file
else:
_LOGGER.warning(
f"Failed to download image: {response.status}"
)
return None
except Exception as e:
_LOGGER.error(f"Error downloading image: {e}")
return None

executor = ThreadPoolExecutor(max_workers=3) # Limit to 3 workers

# Run the blocking I/O in a thread
return await asyncio.get_running_loop().run_in_executor(
executor, asyncio.run, blocking_download()
)
try:
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
if response.status == 200:
with open(obstacle_file, "wb") as f:
f.write(await response.read())
_LOGGER.debug(f"Image downloaded successfully: {obstacle_file}")
return obstacle_file
else:
_LOGGER.warning(f"Failed to download image: {response.status}")
return None
except Exception as e:
_LOGGER.error(f"Error downloading image: {e}")
return None

@property
def should_poll(self) -> bool:
Expand Down Expand Up @@ -482,7 +496,7 @@ async def async_update(self):
f"{self._file_name}: Camera image data update available: {process_data}"
)
try:
parsed_json, is_a_test = await self._process_parsed_json()
parsed_json, is_a_test = await self._process_parsed_json(True)
except ValueError:
self._vac_json_available = "Error"
pass
Expand Down Expand Up @@ -666,7 +680,7 @@ async def run_async_pil_to_bytes(self, pil_img):
pil_img_list = [pil_img for _ in range(num_processes)]
loop = get_event_loop()

with concurrent.futures.ThreadPoolExecutor(
with ThreadPoolExecutor(
max_workers=1, thread_name_prefix=f"{self._file_name}_camera"
) as executor:
tasks = [
Expand Down
10 changes: 8 additions & 2 deletions custom_components/mqtt_vacuum_camera/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,14 +219,20 @@ def compose_obstacle_links(vacuum_host_ip: str, obstacles: list) -> list:
Compose JSON with obstacle details including the image link.
"""
obstacle_links = []
if not obstacles or not vacuum_host_ip:
_LOGGER.debug(
f"Obstacle links: no obstacles: "
f"{obstacles} and / or ip: {vacuum_host_ip} to link."
)
return None

for obstacle in obstacles:
# Extract obstacle details
label = obstacle.get("label", "")
points = obstacle.get("points", {})
image_id = obstacle.get("id", "None")

if label and points and image_id:
if label and points and image_id and vacuum_host_ip:
# Append formatted obstacle data
if image_id != "None":
# Compose the link
Expand All @@ -246,6 +252,6 @@ def compose_obstacle_links(vacuum_host_ip: str, obstacles: list) -> list:
}
)

_LOGGER.debug(f"Obstacle links: {obstacle_links}")
_LOGGER.debug(f"Obstacle links: linked data complete.")

return obstacle_links
71 changes: 39 additions & 32 deletions custom_components/mqtt_vacuum_camera/coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,35 +143,42 @@ async def async_update_sensor_data(self, sensor_data):
"""
Update the sensor data format before sending to the sensors.
"""

if sensor_data:
# Assume sensor_data is a dictionary or transform it into the expected format
battery_level = await self.connector.get_battery_level()
vacuum_state = await self.connector.get_vacuum_status()
vacuum_room = self.shared.current_room
if not vacuum_room:
vacuum_room = {"in_room": "Unsupported"}
last_run_stats = sensor_data.get("last_run_stats", {})
last_loaded_map = sensor_data.get("last_loaded_map", {})
formatted_data = {
"mainBrush": sensor_data.get("mainBrush", 0),
"sideBrush": sensor_data.get("sideBrush", 0),
"filter": sensor_data.get("filter", 0),
"currentCleanTime": sensor_data.get("currentCleanTime", 0),
"currentCleanArea": sensor_data.get("currentCleanArea", 0),
"cleanTime": sensor_data.get("cleanTime", 0),
"cleanArea": sensor_data.get("cleanArea", 0),
"cleanCount": sensor_data.get("cleanCount", 0),
"battery": battery_level,
"state": vacuum_state,
"last_run_start": last_run_stats.get("startTime", 0),
"last_run_end": last_run_stats.get("endTime", 0),
"last_run_duration": last_run_stats.get("duration", 0),
"last_run_area": last_run_stats.get("area", 0),
"last_bin_out": sensor_data.get("last_bin_out", 0),
"last_bin_full": sensor_data.get("last_bin_full", 0),
"last_loaded_map": last_loaded_map.get("name", "NoMap"),
"robot_in_room": vacuum_room.get("in_room", "Unsupported"),
}
return formatted_data
return SENSOR_NO_DATA
try:
if sensor_data:
# Assume sensor_data is a dictionary or transform it into the expected format
battery_level = await self.connector.get_battery_level()
vacuum_state = await self.connector.get_vacuum_status()
vacuum_room = self.shared.current_room
last_run_stats = sensor_data.get("last_run_stats", {})
last_loaded_map = sensor_data.get("last_loaded_map", {})

if not vacuum_room:
vacuum_room = {"in_room": "Unsupported"}
if last_loaded_map == {}:
last_loaded_map = {"name", "Default"}

formatted_data = {
"mainBrush": sensor_data.get("mainBrush", 0),
"sideBrush": sensor_data.get("sideBrush", 0),
"filter": sensor_data.get("filter", 0),
"currentCleanTime": sensor_data.get("currentCleanTime", 0),
"currentCleanArea": sensor_data.get("currentCleanArea", 0),
"cleanTime": sensor_data.get("cleanTime", 0),
"cleanArea": sensor_data.get("cleanArea", 0),
"cleanCount": sensor_data.get("cleanCount", 0),
"battery": battery_level,
"state": vacuum_state,
"last_run_start": last_run_stats.get("startTime", 0),
"last_run_end": last_run_stats.get("endTime", 0),
"last_run_duration": last_run_stats.get("duration", 0),
"last_run_area": last_run_stats.get("area", 0),
"last_bin_out": sensor_data.get("last_bin_out", 0),
"last_bin_full": sensor_data.get("last_bin_full", 0),
"last_loaded_map": last_loaded_map.get("name", "Default"),
"robot_in_room": vacuum_room.get("in_room"),
}
return formatted_data
return SENSOR_NO_DATA
except Exception as err:
_LOGGER.warning(f"Error processing sensor data: {err}")
return SENSOR_NO_DATA
2 changes: 1 addition & 1 deletion custom_components/mqtt_vacuum_camera/manifest.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@
"iot_class": "local_polling",
"issue_tracker": "https://github.com/sca075/mqtt_vacuum_camera/issues",
"requirements": ["pillow>=10.3.0,<=11.0.0", "numpy"],
"version": "2024.12.0"
"version": "2024.12.1"
}
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,9 @@ async def async_message_received(self, msg) -> None:
# When IPV4 and IPV6 are available, use IPV4
if vacuum_host_ip.split(",").__len__() > 1:
self._shared.vacuum_ips = vacuum_host_ip.split(",")[0]
else:
# Use IPV4 when no IPV6 without split
self._shared.vacuum_ips = vacuum_host_ip
_LOGGER.debug(f"Vacuum IPs: {self._shared.vacuum_ips}")

async def async_subscribe_to_topics(self) -> None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,13 +122,18 @@ async def async_draw_obstacle(
obstacle_objects.append(obstacle_obj)

# Store obstacle data in shared data
self.img_h.shared.obstacles_data = compose_obstacle_links(
self.img_h.shared.vacuum_ips, obstacle_objects
)
if self.img_h.shared.vacuum_ips:
self.img_h.shared.obstacles_data = compose_obstacle_links(
self.img_h.shared.vacuum_ips, obstacle_objects
)
elif self.img_h.shared.vacuum_api: # Fall back to API usage if no IP.
self.img_h.shared.obstacles_data = compose_obstacle_links(
self.img_h.shared.vacuum_api.split("http://")[1], obstacle_objects
)

# Draw obstacles on the map
if obstacle_objects:
_LOGGER.debug(f"{self.file_name} All obstacle detected: {obstacle_objects}")
_LOGGER.debug(f"{self.file_name} Obstacle detected.")
self.img_h.draw.draw_obstacles(np_array, obstacle_objects, color_no_go)

return np_array
Expand Down

0 comments on commit 9cb90da

Please sign in to comment.