From faa13c53e86968ff7338e6c481e6429d2d853281 Mon Sep 17 00:00:00 2001 From: Feng Wang Date: Sat, 21 Dec 2024 19:09:37 +0800 Subject: [PATCH 01/12] Remove tev & fix some type issues --- custom_components/xiaomi_home/miot/common.py | 3 + custom_components/xiaomi_home/miot/miot_ev.py | 324 ------------------ .../xiaomi_home/miot/miot_lan.py | 261 +++++++------- .../xiaomi_home/miot/miot_mdns.py | 6 +- .../xiaomi_home/miot/miot_network.py | 6 +- test/test_ev.py | 55 --- 6 files changed, 145 insertions(+), 510 deletions(-) delete mode 100644 custom_components/xiaomi_home/miot/miot_ev.py delete mode 100644 test/test_ev.py diff --git a/custom_components/xiaomi_home/miot/common.py b/custom_components/xiaomi_home/miot/common.py index 0b03f96..223dbd0 100644 --- a/custom_components/xiaomi_home/miot/common.py +++ b/custom_components/xiaomi_home/miot/common.py @@ -83,6 +83,9 @@ def randomize_int(value: int, ratio: float) -> int: """Randomize an integer value.""" return int(value * (1 - ratio + random.random()*2*ratio)) +def randomize_float(value: float, ratio: float) -> float: + """Randomize a float value.""" + return value * (1 - ratio + random.random()*2*ratio) class MIoTMatcher(MQTTMatcher): """MIoT Pub/Sub topic matcher.""" diff --git a/custom_components/xiaomi_home/miot/miot_ev.py b/custom_components/xiaomi_home/miot/miot_ev.py deleted file mode 100644 index be4e684..0000000 --- a/custom_components/xiaomi_home/miot/miot_ev.py +++ /dev/null @@ -1,324 +0,0 @@ -# -*- coding: utf-8 -*- -""" -Copyright (C) 2024 Xiaomi Corporation. - -The ownership and intellectual property rights of Xiaomi Home Assistant -Integration and related Xiaomi cloud service API interface provided under this -license, including source code and object code (collectively, "Licensed Work"), -are owned by Xiaomi. Subject to the terms and conditions of this License, Xiaomi -hereby grants you a personal, limited, non-exclusive, non-transferable, -non-sublicensable, and royalty-free license to reproduce, use, modify, and -distribute the Licensed Work only for your use of Home Assistant for -non-commercial purposes. For the avoidance of doubt, Xiaomi does not authorize -you to use the Licensed Work for any other purpose, including but not limited -to use Licensed Work to develop applications (APP), Web services, and other -forms of software. - -You may reproduce and distribute copies of the Licensed Work, with or without -modifications, whether in source or object form, provided that you must give -any other recipients of the Licensed Work a copy of this License and retain all -copyright and disclaimers. - -Xiaomi provides the Licensed Work on an "AS IS" BASIS, WITHOUT WARRANTIES OR -CONDITIONS OF ANY KIND, either express or implied, including, without -limitation, any warranties, undertakes, or conditions of TITLE, NO ERROR OR -OMISSION, CONTINUITY, RELIABILITY, NON-INFRINGEMENT, MERCHANTABILITY, or -FITNESS FOR A PARTICULAR PURPOSE. In any event, you are solely responsible -for any direct, indirect, special, incidental, or consequential damages or -losses arising from the use or inability to use the Licensed Work. - -Xiaomi reserves all rights not expressly granted to you in this License. -Except for the rights expressly granted by Xiaomi under this License, Xiaomi -does not authorize you in any form to use the trademarks, copyrights, or other -forms of intellectual property rights of Xiaomi and its affiliates, including, -without limitation, without obtaining other written permission from Xiaomi, you -shall not use "Xiaomi", "Mijia" and other words related to Xiaomi or words that -may make the public associate with Xiaomi in any form to publicize or promote -the software or hardware devices that use the Licensed Work. - -Xiaomi has the right to immediately terminate all your authorization under this -License in the event: -1. You assert patent invalidation, litigation, or other claims against patents -or other intellectual property rights of Xiaomi or its affiliates; or, -2. You make, have made, manufacture, sell, or offer to sell products that knock -off Xiaomi or its affiliates' products. - -MIoT event loop. -""" -import selectors -import heapq -import time -import traceback -from typing import Callable, TypeVar -import logging -import threading - -# pylint: disable=relative-beyond-top-level -from .miot_error import MIoTEvError - -_LOGGER = logging.getLogger(__name__) - -TimeoutHandle = TypeVar('TimeoutHandle') - - -class MIoTFdHandler: - """File descriptor handler.""" - fd: int - read_handler: Callable[[any], None] - read_handler_ctx: any - write_handler: Callable[[any], None] - write_handler_ctx: any - - def __init__( - self, fd: int, - read_handler: Callable[[any], None] = None, - read_handler_ctx: any = None, - write_handler: Callable[[any], None] = None, - write_handler_ctx: any = None - ) -> None: - self.fd = fd - self.read_handler = read_handler - self.read_handler_ctx = read_handler_ctx - self.write_handler = write_handler - self.write_handler_ctx = write_handler_ctx - - -class MIoTTimeout: - """Timeout handler.""" - key: TimeoutHandle - target: int - handler: Callable[[any], None] - handler_ctx: any - - def __init__( - self, key: str = None, target: int = None, - handler: Callable[[any], None] = None, - handler_ctx: any = None - ) -> None: - self.key = key - self.target = target - self.handler = handler - self.handler_ctx = handler_ctx - - def __lt__(self, other): - return self.target < other.target - - -class MIoTEventLoop: - """MIoT event loop.""" - _poll_fd: selectors.DefaultSelector - - _fd_handlers: dict[str, MIoTFdHandler] - - _timer_heap: list[MIoTTimeout] - _timer_handlers: dict[str, MIoTTimeout] - _timer_handle_seed: int - - # Label if the current fd handler is freed inside a read handler to - # avoid invalid reading. - _fd_handler_freed_in_read_handler: bool - - def __init__(self) -> None: - self._poll_fd = selectors.DefaultSelector() - self._timer_heap = [] - self._timer_handlers = {} - self._timer_handle_seed = 1 - self._fd_handlers = {} - self._fd_handler_freed_in_read_handler = False - - def loop_forever(self) -> None: - """Run an event loop in current thread.""" - next_timeout: int - while True: - next_timeout = 0 - # Handle timer - now_ms: int = self.__get_monotonic_ms - while len(self._timer_heap) > 0: - timer: MIoTTimeout = self._timer_heap[0] - if timer is None: - break - if timer.target <= now_ms: - heapq.heappop(self._timer_heap) - del self._timer_handlers[timer.key] - if timer.handler: - timer.handler(timer.handler_ctx) - else: - next_timeout = timer.target-now_ms - break - # Are there any files to listen to - if next_timeout == 0 and self._fd_handlers: - next_timeout = None # None == infinite - # Wait for timers & fds - if next_timeout == 0: - # Neither timer nor fds exist, exit loop - break - # Handle fd event - events = self._poll_fd.select( - timeout=next_timeout/1000.0 if next_timeout else next_timeout) - for key, mask in events: - fd_handler: MIoTFdHandler = key.data - if fd_handler is None: - continue - self._fd_handler_freed_in_read_handler = False - fd_key = str(id(fd_handler.fd)) - if fd_key not in self._fd_handlers: - continue - if ( - mask & selectors.EVENT_READ > 0 - and fd_handler.read_handler - ): - fd_handler.read_handler(fd_handler.read_handler_ctx) - if ( - mask & selectors.EVENT_WRITE > 0 - and self._fd_handler_freed_in_read_handler is False - and fd_handler.write_handler - ): - fd_handler.write_handler(fd_handler.write_handler_ctx) - - def loop_stop(self) -> None: - """Stop the event loop.""" - if self._poll_fd: - self._poll_fd.close() - self._poll_fd = None - self._fd_handlers = {} - self._timer_heap = [] - self._timer_handlers = {} - - def set_timeout( - self, timeout_ms: int, handler: Callable[[any], None], - handler_ctx: any = None - ) -> TimeoutHandle: - """Set a timer.""" - if timeout_ms is None or handler is None: - raise MIoTEvError('invalid params') - new_timeout: MIoTTimeout = MIoTTimeout() - new_timeout.key = self.__get_next_timeout_handle - new_timeout.target = self.__get_monotonic_ms + timeout_ms - new_timeout.handler = handler - new_timeout.handler_ctx = handler_ctx - heapq.heappush(self._timer_heap, new_timeout) - self._timer_handlers[new_timeout.key] = new_timeout - return new_timeout.key - - def clear_timeout(self, timer_key: TimeoutHandle) -> None: - """Stop and remove the timer.""" - if timer_key is None: - return - timer: MIoTTimeout = self._timer_handlers.pop(timer_key, None) - if timer: - self._timer_heap = list(self._timer_heap) - self._timer_heap.remove(timer) - heapq.heapify(self._timer_heap) - - def set_read_handler( - self, fd: int, handler: Callable[[any], None], handler_ctx: any = None - ) -> bool: - """Set a read handler for a file descriptor. - - Returns: - bool: True, success. False, failed. - """ - self.__set_handler( - fd, is_read=True, handler=handler, handler_ctx=handler_ctx) - - def set_write_handler( - self, fd: int, handler: Callable[[any], None], handler_ctx: any = None - ) -> bool: - """Set a write handler for a file descriptor. - - Returns: - bool: True, success. False, failed. - """ - self.__set_handler( - fd, is_read=False, handler=handler, handler_ctx=handler_ctx) - - def __set_handler( - self, fd, is_read: bool, handler: Callable[[any], None], - handler_ctx: any = None - ) -> bool: - """Set a handler.""" - if fd is None: - raise MIoTEvError('invalid params') - - if not self._poll_fd: - raise MIoTEvError('event loop not started') - - fd_key: str = str(id(fd)) - fd_handler = self._fd_handlers.get(fd_key, None) - - if fd_handler is None: - fd_handler = MIoTFdHandler(fd=fd) - fd_handler.fd = fd - self._fd_handlers[fd_key] = fd_handler - - read_handler_existed = fd_handler.read_handler is not None - write_handler_existed = fd_handler.write_handler is not None - if is_read is True: - fd_handler.read_handler = handler - fd_handler.read_handler_ctx = handler_ctx - else: - fd_handler.write_handler = handler - fd_handler.write_handler_ctx = handler_ctx - - if fd_handler.read_handler is None and fd_handler.write_handler is None: - # Remove from epoll and map - try: - self._poll_fd.unregister(fd) - except (KeyError, ValueError, OSError) as e: - del e - self._fd_handlers.pop(fd_key, None) - # May be inside a read handler, if not, this has no effect - self._fd_handler_freed_in_read_handler = True - elif read_handler_existed is False and write_handler_existed is False: - # Add to epoll - events = 0x0 - if fd_handler.read_handler: - events |= selectors.EVENT_READ - if fd_handler.write_handler: - events |= selectors.EVENT_WRITE - try: - self._poll_fd.register(fd, events=events, data=fd_handler) - except (KeyError, ValueError, OSError) as e: - _LOGGER.error( - '%s, register fd, error, %s, %s, %s, %s, %s', - threading.current_thread().name, - 'read' if is_read else 'write', - fd_key, handler, e, traceback.format_exc()) - self._fd_handlers.pop(fd_key, None) - return False - elif ( - read_handler_existed != (fd_handler.read_handler is not None) - or write_handler_existed != (fd_handler.write_handler is not None) - ): - # Modify epoll - events = 0x0 - if fd_handler.read_handler: - events |= selectors.EVENT_READ - if fd_handler.write_handler: - events |= selectors.EVENT_WRITE - try: - self._poll_fd.modify(fd, events=events, data=fd_handler) - except (KeyError, ValueError, OSError) as e: - _LOGGER.error( - '%s, modify fd, error, %s, %s, %s, %s, %s', - threading.current_thread().name, - 'read' if is_read else 'write', - fd_key, handler, e, traceback.format_exc()) - self._fd_handlers.pop(fd_key, None) - return False - - return True - - @property - def __get_next_timeout_handle(self) -> str: - # Get next timeout handle, that is not larger than the maximum - # value of UINT64 type. - self._timer_handle_seed += 1 - # uint64 max - self._timer_handle_seed %= 0xFFFFFFFFFFFFFFFF - return str(self._timer_handle_seed) - - @property - def __get_monotonic_ms(self) -> int: - """Get monotonic ms timestamp.""" - return int(time.monotonic()*1000) diff --git a/custom_components/xiaomi_home/miot/miot_lan.py b/custom_components/xiaomi_home/miot/miot_lan.py index 6679328..290a10c 100644 --- a/custom_components/xiaomi_home/miot/miot_lan.py +++ b/custom_components/xiaomi_home/miot/miot_lan.py @@ -10,30 +10,30 @@ non-sublicensable, and royalty-free license to reproduce, use, modify, and distribute the Licensed Work only for your use of Home Assistant for non-commercial purposes. For the avoidance of doubt, Xiaomi does not authorize -you to use the Licensed Work for any other purpose, including but not limited +you to use the Licensed Work for Any other purpose, including but not limited to use Licensed Work to develop applications (APP), Web services, and other forms of software. You may reproduce and distribute copies of the Licensed Work, with or without modifications, whether in source or object form, provided that you must give -any other recipients of the Licensed Work a copy of this License and retain all +Any other recipients of the Licensed Work a copy of this License and retain all copyright and disclaimers. Xiaomi provides the Licensed Work on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied, including, without -limitation, any warranties, undertakes, or conditions of TITLE, NO ERROR OR +limitation, Any warranties, undertakes, or conditions of TITLE, NO ERROR OR OMISSION, CONTINUITY, RELIABILITY, NON-INFRINGEMENT, MERCHANTABILITY, or -FITNESS FOR A PARTICULAR PURPOSE. In any event, you are solely responsible -for any direct, indirect, special, incidental, or consequential damages or +FITNESS FOR A PARTICULAR PURPOSE. In Any event, you are solely responsible +for Any direct, indirect, special, incidental, or consequential damages or losses arising from the use or inability to use the Licensed Work. Xiaomi reserves all rights not expressly granted to you in this License. Except for the rights expressly granted by Xiaomi under this License, Xiaomi -does not authorize you in any form to use the trademarks, copyrights, or other +does not authorize you in Any form to use the trademarks, copyrights, or other forms of intellectual property rights of Xiaomi and its affiliates, including, without limitation, without obtaining other written permission from Xiaomi, you shall not use "Xiaomi", "Mijia" and other words related to Xiaomi or words that -may make the public associate with Xiaomi in any form to publicize or promote +may make the public associate with Xiaomi in Any form to publicize or promote the software or hardware devices that use the Licensed Work. Xiaomi has the right to immediately terminate all your authorization under this @@ -60,7 +60,7 @@ import socket import struct import threading -from typing import Callable, Optional, final +from typing import Callable, Optional, final, Coroutine, Any from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes from cryptography.hazmat.primitives import padding from cryptography.hazmat.backends import default_backend @@ -68,11 +68,10 @@ # pylint: disable=relative-beyond-top-level from .miot_error import MIoTErrorCode -from .miot_ev import MIoTEventLoop, TimeoutHandle from .miot_network import InterfaceStatus, MIoTNetwork, NetworkInfo from .miot_mdns import MipsService, MipsServiceState from .common import ( - randomize_int, load_yaml_file, gen_absolute_path, MIoTMatcher) + randomize_float, load_yaml_file, gen_absolute_path, MIoTMatcher) _LOGGER = logging.getLogger(__name__) @@ -98,13 +97,13 @@ class MIoTLanCmdType(Enum): class MIoTLanCmd: """MIoT lan command.""" type_: MIoTLanCmdType - data: any + data: Any @dataclass class MIoTLanCmdData: - handler: Callable[[dict, any], None] - handler_ctx: any + handler: Callable[[dict, Any], None] + handler_ctx: Any timeout_ms: int @@ -131,8 +130,8 @@ class MIoTLanUnregisterBroadcastData: @dataclass class MIoTLanRegisterBroadcastData: key: str - handler: Callable[[dict, any], None] - handler_ctx: any + handler: Callable[[dict, Any], None] + handler_ctx: Any @dataclass @@ -143,8 +142,8 @@ class MIoTLanUnsubDeviceState: @dataclass class MIoTLanSubDeviceState: key: str - handler: Callable[[str, dict, any], None] - handler_ctx: any + handler: Callable[[str, dict, Any], None] + handler_ctx: Any @dataclass @@ -156,9 +155,9 @@ class MIoTLanNetworkUpdateData: @dataclass class MIoTLanRequestData: msg_id: int - handler: Callable[[dict, any], None] - handler_ctx: any - timeout: TimeoutHandle + handler: Optional[Callable[[dict, Any], None]] + handler_ctx: Any + timeout: asyncio.TimerHandle class MIoTLanDeviceState(Enum): @@ -175,12 +174,12 @@ class MIoTLanDevice: OT_HEADER: int = 0x2131 OT_HEADER_LEN: int = 32 NETWORK_UNSTABLE_CNT_TH: int = 10 - NETWORK_UNSTABLE_TIME_TH: int = 120000 - NETWORK_UNSTABLE_RESUME_TH: int = 300000 - FAST_PING_INTERVAL: int = 5000 - CONSTRUCT_STATE_PENDING: int = 15000 - KA_INTERVAL_MIN = 10000 - KA_INTERVAL_MAX = 50000 + NETWORK_UNSTABLE_TIME_TH: float = 120 + NETWORK_UNSTABLE_RESUME_TH: float = 300 + FAST_PING_INTERVAL: float = 5 + CONSTRUCT_STATE_PENDING: float = 15 + KA_INTERVAL_MIN: float = 10 + KA_INTERVAL_MAX: float = 50 did: str token: bytes @@ -192,19 +191,19 @@ class MIoTLanDevice: sub_ts: int supported_wildcard_sub: bool - _manager: any + _manager: 'MIoTLan' _if_name: Optional[str] _sub_locked: bool _state: MIoTLanDeviceState _online: bool - _online_offline_history: list[dict[str, any]] - _online_offline_timer: Optional[TimeoutHandle] + _online_offline_history: list[dict[str, Any]] + _online_offline_timer: Optional[asyncio.TimerHandle] - _ka_timer: TimeoutHandle - _ka_internal: int + _ka_timer: Optional[asyncio.TimerHandle] + _ka_internal: float def __init__( - self, manager: any, did: str, token: str, ip: Optional[str] = None + self, manager: 'MIoTLan', did: str, token: str, ip: Optional[str] = None ) -> None: self._manager: MIoTLan = manager self.did = did @@ -225,12 +224,12 @@ def __init__( self._online_offline_history = [] self._online_offline_timer = None - def ka_init_handler(ctx: any) -> None: + def ka_init_handler() -> None: self._ka_internal = self.KA_INTERVAL_MIN self.__update_keep_alive(state=MIoTLanDeviceState.DEAD) - self._ka_timer = self._manager.mev.set_timeout( - randomize_int(self.CONSTRUCT_STATE_PENDING, 0.5), - ka_init_handler, None) + self._ka_timer = self._manager.internal_loop.call_later( + randomize_float(self.CONSTRUCT_STATE_PENDING, 0.5), + ka_init_handler,) _LOGGER.debug('miot lan device add, %s', self.did) def keep_alive(self, ip: str, if_name: str) -> None: @@ -342,11 +341,9 @@ def unsubscribe(self) -> None: def on_delete(self) -> None: if self._ka_timer: - self._manager.mev.clear_timeout(self._ka_timer) + self._ka_timer.cancel() if self._online_offline_timer: - self._manager.mev.clear_timeout(self._online_offline_timer) - self._manager = None - self.cipher = None + self._online_offline_timer.cancel() _LOGGER.debug('miot lan device delete, %s', self.did) def update_info(self, info: dict) -> None: @@ -379,7 +376,7 @@ def __subscribe_handler(self, msg: dict, sub_ts: int) -> None: 'online': self._online, 'push_available': self.subscribed}) _LOGGER.info('subscribe success, %s, %s', self._if_name, self.did) - def __unsubscribe_handler(self, msg: dict, ctx: any) -> None: + def __unsubscribe_handler(self, msg: dict, ctx: Any) -> None: if ( 'result' not in msg or 'code' not in msg['result'] @@ -395,14 +392,14 @@ def __update_keep_alive(self, state: MIoTLanDeviceState) -> None: if self._state != MIoTLanDeviceState.FRESH: _LOGGER.debug('device status, %s, %s', self.did, self._state) if self._ka_timer: - self._manager.mev.clear_timeout(self._ka_timer) + self._ka_timer.cancel() self._ka_timer = None match state: case MIoTLanDeviceState.FRESH: if last_state == MIoTLanDeviceState.DEAD: self._ka_internal = self.KA_INTERVAL_MIN self.__change_online(True) - self._ka_timer = self._manager.mev.set_timeout( + self._ka_timer = self._manager.internal_loop.call_later( self.__get_next_ka_timeout(), self.__update_keep_alive, MIoTLanDeviceState.PING1) case ( @@ -410,11 +407,18 @@ def __update_keep_alive(self, state: MIoTLanDeviceState) -> None: | MIoTLanDeviceState.PING2 | MIoTLanDeviceState.PING3 ): - self._manager.ping(if_name=self._if_name, target_ip=self.ip) - # Fast ping - self._ka_timer = self._manager.mev.set_timeout( + # Set the timer first to avoid Any early returns + self._ka_timer = self._manager.internal_loop.call_later( self.FAST_PING_INTERVAL, self.__update_keep_alive, MIoTLanDeviceState(state.value+1)) + # Fast ping + if self._if_name is None: + _LOGGER.error('if_name is Not set for device, %s', self.did) + return + if self.ip is None: + _LOGGER.error('ip is Not set for device, %s', self.did) + return + self._manager.ping(if_name=self._if_name, target_ip=self.ip) case MIoTLanDeviceState.DEAD: if last_state == MIoTLanDeviceState.PING3: self._ka_internal = self.KA_INTERVAL_MIN @@ -422,9 +426,9 @@ def __update_keep_alive(self, state: MIoTLanDeviceState) -> None: case _: _LOGGER.error('invalid state, %s', state) - def __get_next_ka_timeout(self) -> int: + def __get_next_ka_timeout(self) -> float: self._ka_internal = min(self._ka_internal*2, self.KA_INTERVAL_MAX) - return randomize_int(self._ka_internal, 0.1) + return randomize_float(self._ka_internal, 0.1) def __change_online(self, online: bool) -> None: _LOGGER.info('change online, %s, %s', self.did, online) @@ -433,7 +437,7 @@ def __change_online(self, online: bool) -> None: if len(self._online_offline_history) > self.NETWORK_UNSTABLE_CNT_TH: self._online_offline_history.pop(0) if self._online_offline_timer: - self._manager.mev.clear_timeout(self._online_offline_timer) + self._online_offline_timer.cancel() if not online: self.online = False else: @@ -446,11 +450,11 @@ def __change_online(self, online: bool) -> None: self.online = True else: _LOGGER.info('unstable device detected, %s', self.did) - self._online_offline_timer = self._manager.mev.set_timeout( + self._online_offline_timer = self._manager.internal_loop.call_later( self.NETWORK_UNSTABLE_RESUME_TH, - self.__online_resume_handler, None) + self.__online_resume_handler) - def __online_resume_handler(self, ctx: any) -> None: + def __online_resume_handler(self) -> None: _LOGGER.info('unstable resume threshold past, %s', self.did) self.online = True @@ -470,8 +474,8 @@ class MIoTLan: OT_MSG_LEN: int = 1400 OT_SUPPORT_WILDCARD_SUB: int = 0xFE - OT_PROBE_INTERVAL_MIN: int = 5000 - OT_PROBE_INTERVAL_MAX: int = 45000 + OT_PROBE_INTERVAL_MIN: float = 5 + OT_PROBE_INTERVAL_MAX: float = 45 PROFILE_MODELS_FILE: str = 'lan/profile_models.yaml' @@ -486,23 +490,21 @@ class MIoTLan: _write_buffer: bytearray _read_buffer: bytearray - _mev: MIoTEventLoop + _internal_loop: asyncio.AbstractEventLoop _thread: threading.Thread - _queue: queue.Queue - _cmd_event_fd: os.eventfd _available_net_ifs: set[str] _broadcast_socks: dict[str, socket.socket] _local_port: Optional[int] - _scan_timer: TimeoutHandle - _last_scan_interval: Optional[int] + _scan_timer: Optional[asyncio.TimerHandle] + _last_scan_interval: Optional[float] _msg_id_counter: int _pending_requests: dict[int, MIoTLanRequestData] _device_msg_matcher: MIoTMatcher _device_state_sub_map: dict[str, MIoTLanSubDeviceState] - _reply_msg_buffer: dict[str, TimeoutHandle] + _reply_msg_buffer: dict[str, asyncio.TimerHandle] - _lan_state_sub_map: dict[str, Callable[[bool], asyncio.Future]] + _lan_state_sub_map: dict[str, Callable[[bool], Coroutine]] _lan_ctrl_vote_map: dict[str, bool] _profile_models: dict[str, dict] @@ -532,7 +534,7 @@ def __init__( key='miot_lan', group_id='*', handler=self.__on_mips_service_change) self._enable_subscribe = enable_subscribe - self._virtual_did = virtual_did or str(secrets.randbits(64)) + self._virtual_did = str(virtual_did) if (virtual_did is not None) else str(secrets.randbits(64)) # Init socket probe message probe_bytes = bytearray(self.OT_PROBE_LEN) probe_bytes[:20] = ( @@ -574,8 +576,8 @@ def virtual_did(self) -> str: return self._virtual_did @property - def mev(self) -> MIoTEventLoop: - return self._mev + def internal_loop(self) -> asyncio.AbstractEventLoop: + return self._internal_loop @property def init_done(self) -> bool: @@ -609,12 +611,10 @@ async def init_async(self) -> None: except Exception as err: # pylint: disable=broad-exception-caught _LOGGER.error('load profile models error, %s', err) self._profile_models = {} - self._mev = MIoTEventLoop() self._queue = queue.Queue() self._cmd_event_fd = os.eventfd(0, os.O_NONBLOCK) - self._mev.set_read_handler( - self._cmd_event_fd, self.__cmd_read_handler, None) - self._thread = threading.Thread(target=self.__lan_thread_handler) + # All tasks meant for the internal loop should happen in this thread + self._thread = threading.Thread(target=self.__internal_loop_thread) self._thread.name = 'miot_lan' self._thread.daemon = True self._thread.start() @@ -623,6 +623,19 @@ async def init_async(self) -> None: self._main_loop.create_task(handler(True)) _LOGGER.info( 'miot lan init, %s ,%s', self._net_ifs, self._available_net_ifs) + + def __internal_loop_thread(self) -> None: + _LOGGER.info('miot lan thread start') + self._internal_loop = asyncio.new_event_loop() + asyncio.set_event_loop(self._internal_loop) + self.__init_socket() + self._internal_loop.add_reader( + self._cmd_event_fd, self.__cmd_read_handler) + self._scan_timer = self._internal_loop.call_later( + int(3*random.random()), self.__scan_devices) + self._internal_loop.run_forever() + self._internal_loop.close() + _LOGGER.info('miot lan thread exit') async def deinit_async(self) -> None: if not self._init_done: @@ -687,7 +700,7 @@ async def update_subscribe_option(self, enable_subscribe: bool) -> None: if not self._init_done: self._enable_subscribe = enable_subscribe return - return self.__lan_send_cmd( + self.__lan_send_cmd( cmd=MIoTLanCmdType.OPTIONS_UPDATE, data={ 'enable_subscribe': enable_subscribe, }) @@ -705,7 +718,7 @@ def delete_devices(self, devices: list[str]) -> bool: data=devices) def sub_lan_state( - self, key: str, handler: Callable[[bool], asyncio.Future] + self, key: str, handler: Callable[[bool], Coroutine] ) -> None: self._lan_state_sub_map[key] = handler @@ -714,8 +727,8 @@ def unsub_lan_state(self, key: str) -> None: @final def sub_device_state( - self, key: str, handler: Callable[[str, dict, any], None], - handler_ctx: any = None + self, key: str, handler: Callable[[str, dict, Any], None], + handler_ctx: Any = None ) -> bool: return self.__lan_send_cmd( cmd=MIoTLanCmdType.SUB_DEVICE_STATE, @@ -730,8 +743,8 @@ def unsub_device_state(self, key: str) -> bool: @final def sub_prop( - self, did: str, handler: Callable[[dict, any], None], - siid: int = None, piid: int = None, handler_ctx: any = None + self, did: str, handler: Callable[[dict, Any], None], + siid: Optional[int] = None, piid: Optional[int] = None, handler_ctx: Any = None ) -> bool: if not self._enable_subscribe: return False @@ -744,7 +757,7 @@ def sub_prop( key=key, handler=handler, handler_ctx=handler_ctx)) @final - def unsub_prop(self, did: str, siid: int = None, piid: int = None) -> bool: + def unsub_prop(self, did: str, siid: Optional[int] = None, piid: Optional[int] = None) -> bool: if not self._enable_subscribe: return False key = ( @@ -756,8 +769,8 @@ def unsub_prop(self, did: str, siid: int = None, piid: int = None) -> bool: @final def sub_event( - self, did: str, handler: Callable[[dict, any], None], - siid: int = None, eiid: int = None, handler_ctx: any = None + self, did: str, handler: Callable[[dict, Any], None], + siid: Optional[int] = None, eiid: Optional[int] = None, handler_ctx: Any = None ) -> bool: if not self._enable_subscribe: return False @@ -770,7 +783,7 @@ def sub_event( key=key, handler=handler, handler_ctx=handler_ctx)) @final - def unsub_event(self, did: str, siid: int = None, eiid: int = None) -> bool: + def unsub_event(self, did: str, siid: Optional[int] = None, eiid: Optional[int] = None) -> bool: if not self._enable_subscribe: return False key = ( @@ -783,7 +796,7 @@ def unsub_event(self, did: str, siid: int = None, eiid: int = None) -> bool: @final async def get_prop_async( self, did: str, siid: int, piid: int, timeout_ms: int = 10000 - ) -> any: + ) -> Any: result_obj = await self.__call_api_async( did=did, msg={ 'method': 'get_properties', @@ -801,7 +814,7 @@ async def get_prop_async( @final async def set_prop_async( - self, did: str, siid: int, piid: int, value: any, + self, did: str, siid: int, piid: int, value: Any, timeout_ms: int = 10000 ) -> dict: result_obj = await self.__call_api_async( @@ -868,7 +881,7 @@ def get_device_list_handler(msg: dict, fut: asyncio.Future): fut.set_result({}) return await fut - def ping(self, if_name: str, target_ip: str) -> None: + def ping(self, if_name: str | None, target_ip: str) -> None: if not target_ip: return self.__sendto( @@ -878,13 +891,13 @@ def ping(self, if_name: str, target_ip: str) -> None: def send2device( self, did: str, msg: dict, - handler: Optional[Callable[[dict, any], None]] = None, - handler_ctx: any = None, + handler: Optional[Callable[[dict, Any], None]] = None, + handler_ctx: Any = None, timeout_ms: Optional[int] = None ) -> None: if timeout_ms and not handler: raise ValueError('handler is required when timeout_ms is set') - device: MIoTLanDevice = self._lan_devices.get(did) + device: MIoTLanDevice | None = self._lan_devices.get(did) if not device: raise ValueError('invalid device') if not device.cipher: @@ -900,7 +913,7 @@ def send2device( did=did, offset=int(time.time())-device.offset) - return self.make_request( + return self.__make_request( msg_id=in_msg['id'], msg=self._write_buffer[0: msg_len], if_name=device.if_name, @@ -909,33 +922,33 @@ def send2device( handler_ctx=handler_ctx, timeout_ms=timeout_ms) - def make_request( + def __make_request( self, msg_id: int, msg: bytearray, if_name: str, ip: str, - handler: Callable[[dict, any], None], - handler_ctx: any = None, + handler: Optional[Callable[[dict, Any], None]], + handler_ctx: Any = None, timeout_ms: Optional[int] = None ) -> None: def request_timeout_handler(req_data: MIoTLanRequestData): self._pending_requests.pop(req_data.msg_id, None) - if req_data: + if req_data and req_data.handler: req_data.handler({ 'code': MIoTErrorCode.CODE_TIMEOUT.value, 'error': 'timeout'}, req_data.handler_ctx) - timer: Optional[TimeoutHandle] = None + timer: Optional[asyncio.TimerHandle] = None request_data = MIoTLanRequestData( msg_id=msg_id, handler=handler, handler_ctx=handler_ctx, timeout=timer) if timeout_ms: - timer = self._mev.set_timeout( - timeout_ms, request_timeout_handler, request_data) + timer = self._internal_loop.call_later( + timeout_ms/1000, request_timeout_handler, request_data) request_data.timeout = timer self._pending_requests[msg_id] = request_data self.__sendto(if_name=if_name, data=msg, address=ip, port=self.OT_PORT) @@ -954,7 +967,7 @@ def __gen_msg_id(self) -> int: self._msg_id_counter = 1 return self._msg_id_counter - def __lan_send_cmd(self, cmd: MIoTLanCmd, data: any) -> bool: + def __lan_send_cmd(self, cmd: MIoTLanCmdType, data: Any) -> bool: try: self._queue.put(MIoTLanCmd(type_=cmd, data=data)) os.eventfd_write(self._cmd_event_fd, 1) @@ -986,16 +999,7 @@ def call_api_handler(msg: dict, fut: asyncio.Future): 'error': 'send cmd error'}) return await fut - def __lan_thread_handler(self) -> None: - _LOGGER.info('miot lan thread start') - self.__init_socket() - # Create scan devices timer - self._scan_timer = self._mev.set_timeout( - int(3000*random.random()), self.__scan_devices, None) - self._mev.loop_forever() - _LOGGER.info('miot lan thread exit') - - def __cmd_read_handler(self, ctx: any) -> None: + def __cmd_read_handler(self) -> None: fd_value = os.eventfd_read(self._cmd_event_fd) if fd_value == 0: return @@ -1109,20 +1113,21 @@ def __cmd_read_handler(self, ctx: any) -> None: elif mips_cmd.type_ == MIoTLanCmdType.DEINIT: # stop the thread if self._scan_timer: - self._mev.clear_timeout(self._scan_timer) + self._scan_timer.cancel() self._scan_timer = None for device in self._lan_devices.values(): device.on_delete() self._lan_devices.clear() for req_data in self._pending_requests.values(): - self._mev.clear_timeout(req_data.timeout) + req_data.timeout.cancel() self._pending_requests.clear() for timer in self._reply_msg_buffer.values(): - self._mev.clear_timeout(timer) + timer.cancel() self._reply_msg_buffer.clear() self._device_msg_matcher = MIoTMatcher() self.__deinit_socket() - self._mev.loop_stop() + # DO NOT force a event loop to stop. + # It will stop when you release all handles properly. def __init_socket(self) -> None: self.__deinit_socket() @@ -1145,7 +1150,7 @@ def __create_socket(self, if_name: str) -> None: sock.setsockopt( socket.SOL_SOCKET, socket.SO_BINDTODEVICE, if_name.encode()) sock.bind(('', self._local_port or 0)) - self._mev.set_read_handler( + self._internal_loop.add_reader( sock.fileno(), self.__socket_read_handler, (if_name, sock)) self._broadcast_socks[if_name] = sock self._local_port = self._local_port or sock.getsockname()[1] @@ -1163,7 +1168,7 @@ def __destroy_socket(self, if_name: str) -> None: sock = self._broadcast_socks.pop(if_name, None) if not sock: return - self._mev.set_read_handler(sock.fileno(), None, None) + self._internal_loop.remove_reader(sock.fileno()) sock.close() _LOGGER.info('destroyed socket, %s', if_name) @@ -1190,7 +1195,7 @@ def __raw_message_handler( return # Keep alive message did: str = str(struct.unpack('>Q', data[4:12])[0]) - device: MIoTLanDevice = self._lan_devices.get(did) + device: MIoTLanDevice | None = self._lan_devices.get(did) if not device: return timestamp: int = struct.unpack('>I', data[12:16])[0] @@ -1230,11 +1235,12 @@ def __message_handler(self, did: str, msg: dict) -> None: _LOGGER.warning('invalid message, no id, %s, %s', did, msg) return # Reply - req: MIoTLanRequestData = self._pending_requests.pop(msg['id'], None) + req: MIoTLanRequestData | None = self._pending_requests.pop(msg['id'], None) if req: - self._mev.clear_timeout(req.timeout) - self._main_loop.call_soon_threadsafe( - req.handler, msg, req.handler_ctx) + req.timeout.cancel() + if req.handler is not None: + self._main_loop.call_soon_threadsafe( + req.handler, msg, req.handler_ctx) return # Handle up link message if 'method' not in msg or 'params' not in msg: @@ -1281,15 +1287,16 @@ def __filter_dup_message(self, did: str, msg_id: int) -> bool: filter_id = f'{did}.{msg_id}' if filter_id in self._reply_msg_buffer: return True - self._reply_msg_buffer[filter_id] = self._mev.set_timeout( - 5000, + self._reply_msg_buffer[filter_id] = self._internal_loop.call_later( + 5, lambda filter_id: self._reply_msg_buffer.pop(filter_id, None), filter_id) + return False def __sendto( - self, if_name: str, data: bytes, address: str, port: int + self, if_name: str | None, data: bytes, address: str, port: int ) -> None: - if address == '255.255.255.255': + if if_name is None: # Broadcast for if_n, sock in self._broadcast_socks.items(): _LOGGER.debug('send broadcast, %s', if_n) @@ -1302,17 +1309,21 @@ def __sendto( return sock.sendto(data, socket.MSG_DONTWAIT, (address, port)) - def __scan_devices(self, ctx: any) -> None: + def __scan_devices(self) -> None: if self._scan_timer: - self._mev.clear_timeout(self._scan_timer) - # Scan devices - self.ping(if_name=None, target_ip='255.255.255.255') + self._scan_timer.cancel() + # Ignore any exceptions to avoid blocking the loop + try: + # Scan devices + self.ping(if_name=None, target_ip='255.255.255.255') + except: + pass scan_time = self.__get_next_scan_time() - self._scan_timer = self._mev.set_timeout( - scan_time, self.__scan_devices, None) + self._scan_timer = self._internal_loop.call_later( + scan_time, self.__scan_devices) _LOGGER.debug('next scan time: %sms', scan_time) - def __get_next_scan_time(self) -> int: + def __get_next_scan_time(self) -> float: if not self._last_scan_interval: self._last_scan_interval = self.OT_PROBE_INTERVAL_MIN self._last_scan_interval = min( diff --git a/custom_components/xiaomi_home/miot/miot_mdns.py b/custom_components/xiaomi_home/miot/miot_mdns.py index 4f13649..7d45193 100644 --- a/custom_components/xiaomi_home/miot/miot_mdns.py +++ b/custom_components/xiaomi_home/miot/miot_mdns.py @@ -50,7 +50,7 @@ import binascii import copy from enum import Enum -from typing import Callable, Optional +from typing import Callable, Optional, Coroutine import logging from zeroconf import ( @@ -151,7 +151,7 @@ class MipsService: _services: dict[str, dict] # key = (key, group_id) _sub_list: dict[(str, str), Callable[[ - str, MipsServiceState, dict], asyncio.Future]] + str, MipsServiceState, dict], Coroutine]] def __init__( self, aiozc: AsyncZeroconf, @@ -207,7 +207,7 @@ def get_services(self, group_id: Optional[str] = None) -> dict[str, dict]: def sub_service_change( self, key: str, group_id: str, - handler: Callable[[str, MipsServiceState, dict], asyncio.Future] + handler: Callable[[str, MipsServiceState, dict], Coroutine] ) -> None: if key is None or group_id is None or handler is None: raise MipsServiceError('invalid params') diff --git a/custom_components/xiaomi_home/miot/miot_network.py b/custom_components/xiaomi_home/miot/miot_network.py index a4606eb..70c3dc4 100644 --- a/custom_components/xiaomi_home/miot/miot_network.py +++ b/custom_components/xiaomi_home/miot/miot_network.py @@ -52,7 +52,7 @@ from dataclasses import dataclass from enum import Enum, auto import subprocess -from typing import Callable, Optional +from typing import Callable, Optional, Coroutine import psutil import ipaddress @@ -97,7 +97,7 @@ class MIoTNetwork: _sub_list_network_status: dict[str, Callable[[bool], asyncio.Future]] _sub_list_network_info: dict[str, Callable[[ - InterfaceStatus, NetworkInfo], asyncio.Future]] + InterfaceStatus, NetworkInfo], Coroutine]] _ping_address_priority: int @@ -155,7 +155,7 @@ def unsub_network_status(self, key: str) -> None: def sub_network_info( self, key: str, - handler: Callable[[InterfaceStatus, NetworkInfo], asyncio.Future] + handler: Callable[[InterfaceStatus, NetworkInfo], Coroutine] ) -> None: self._sub_list_network_info[key] = handler diff --git a/test/test_ev.py b/test/test_ev.py deleted file mode 100644 index 6353fe8..0000000 --- a/test/test_ev.py +++ /dev/null @@ -1,55 +0,0 @@ -# -*- coding: utf-8 -*- -"""Unit test for miot_ev.py.""" -import os -import pytest - -# pylint: disable=import-outside-toplevel, disable=unused-argument - - -@pytest.mark.github -def test_mev_timer_and_fd(): - from miot.miot_ev import MIoTEventLoop, TimeoutHandle - - mev = MIoTEventLoop() - assert mev - event_fd: os.eventfd = os.eventfd(0, os.O_NONBLOCK) - assert event_fd - timer4: TimeoutHandle = None - - def event_handler(event_fd): - value: int = os.eventfd_read(event_fd) - if value == 1: - mev.clear_timeout(timer4) - print('cancel timer4') - elif value == 2: - print('event write twice in a row') - elif value == 3: - mev.set_read_handler(event_fd, None, None) - os.close(event_fd) - event_fd = None - print('close event fd') - - def timer1_handler(event_fd): - os.eventfd_write(event_fd, 1) - - def timer2_handler(event_fd): - os.eventfd_write(event_fd, 1) - os.eventfd_write(event_fd, 1) - - def timer3_handler(event_fd): - os.eventfd_write(event_fd, 3) - - def timer4_handler(event_fd): - raise ValueError('unreachable code') - - mev.set_read_handler( - event_fd, event_handler, event_fd) - - mev.set_timeout(500, timer1_handler, event_fd) - mev.set_timeout(1000, timer2_handler, event_fd) - mev.set_timeout(1500, timer3_handler, event_fd) - timer4 = mev.set_timeout(2000, timer4_handler, event_fd) - - mev.loop_forever() - # Loop will exit when there are no timers or fd handlers. - mev.loop_stop() From 794505f082dafeff7a6f34f2427aa7bd4b978558 Mon Sep 17 00:00:00 2001 From: Feng Wang Date: Sat, 21 Dec 2024 21:39:59 +0800 Subject: [PATCH 02/12] Use call_soon_threadsafe instead of event fd for ipc --- .../xiaomi_home/miot/miot_client.py | 2 +- .../xiaomi_home/miot/miot_lan.py | 497 ++++++++---------- 2 files changed, 226 insertions(+), 273 deletions(-) diff --git a/custom_components/xiaomi_home/miot/miot_client.py b/custom_components/xiaomi_home/miot/miot_client.py index 7423e20..f184ec5 100644 --- a/custom_components/xiaomi_home/miot/miot_client.py +++ b/custom_components/xiaomi_home/miot/miot_client.py @@ -1022,7 +1022,7 @@ async def __on_miot_lan_state_change(self, state: bool) -> None: handler=self.__on_lan_device_state_changed) for did, info in ( await self._miot_lan.get_dev_list_async()).items(): - self.__on_lan_device_state_changed( + await self.__on_lan_device_state_changed( did=did, state=info, ctx=None) _LOGGER.info('lan device list, %s', self._device_list_lan) self._miot_lan.update_devices(devices={ diff --git a/custom_components/xiaomi_home/miot/miot_lan.py b/custom_components/xiaomi_home/miot/miot_lan.py index 290a10c..ff32859 100644 --- a/custom_components/xiaomi_home/miot/miot_lan.py +++ b/custom_components/xiaomi_home/miot/miot_lan.py @@ -53,8 +53,6 @@ from dataclasses import dataclass from enum import Enum, auto import logging -import os -import queue import random import secrets import socket @@ -77,29 +75,6 @@ _LOGGER = logging.getLogger(__name__) -class MIoTLanCmdType(Enum): - """MIoT lan command.""" - DEINIT = 0 - CALL_API = auto() - SUB_DEVICE_STATE = auto() - UNSUB_DEVICE_STATE = auto() - REG_BROADCAST = auto() - UNREG_BROADCAST = auto() - GET_DEV_LIST = auto() - DEVICE_UPDATE = auto() - DEVICE_DELETE = auto() - NET_INFO_UPDATE = auto() - NET_IFS_UPDATE = auto() - OPTIONS_UPDATE = auto() - - -@dataclass -class MIoTLanCmd: - """MIoT lan command.""" - type_: MIoTLanCmdType - data: Any - - @dataclass class MIoTLanCmdData: handler: Callable[[dict, Any], None] @@ -142,7 +117,7 @@ class MIoTLanUnsubDeviceState: @dataclass class MIoTLanSubDeviceState: key: str - handler: Callable[[str, dict, Any], None] + handler: Callable[[str, dict, Any], Coroutine] handler_ctx: Any @@ -157,7 +132,7 @@ class MIoTLanRequestData: msg_id: int handler: Optional[Callable[[dict, Any], None]] handler_ctx: Any - timeout: asyncio.TimerHandle + timeout: Optional[asyncio.TimerHandle] class MIoTLanDeviceState(Enum): @@ -202,6 +177,8 @@ class MIoTLanDevice: _ka_timer: Optional[asyncio.TimerHandle] _ka_internal: float +# All functions SHOULD be called from the internal loop + def __init__( self, manager: 'MIoTLan', did: str, token: str, ip: Optional[str] = None ) -> None: @@ -511,6 +488,8 @@ class MIoTLan: _init_done: bool +# The following should be called from the main loop + def __init__( self, net_ifs: list[str], @@ -528,7 +507,7 @@ def __init__( self._net_ifs = set(net_ifs) self._network = network self._network.sub_network_info( - key='miot_lan', handler=self.__on_network_info_change) + key='miot_lan', handler=self.__on_network_info_change_external_async) self._mips_service = mips_service self._mips_service.sub_service_change( key='miot_lan', group_id='*', @@ -611,8 +590,6 @@ async def init_async(self) -> None: except Exception as err: # pylint: disable=broad-exception-caught _LOGGER.error('load profile models error, %s', err) self._profile_models = {} - self._queue = queue.Queue() - self._cmd_event_fd = os.eventfd(0, os.O_NONBLOCK) # All tasks meant for the internal loop should happen in this thread self._thread = threading.Thread(target=self.__internal_loop_thread) self._thread.name = 'miot_lan' @@ -629,8 +606,6 @@ def __internal_loop_thread(self) -> None: self._internal_loop = asyncio.new_event_loop() asyncio.set_event_loop(self._internal_loop) self.__init_socket() - self._internal_loop.add_reader( - self._cmd_event_fd, self.__cmd_read_handler) self._scan_timer = self._internal_loop.call_later( int(3*random.random()), self.__scan_devices) self._internal_loop.run_forever() @@ -642,7 +617,7 @@ async def deinit_async(self) -> None: _LOGGER.info('miot lan not init') return self._init_done = False - self.__lan_send_cmd(MIoTLanCmdType.DEINIT, None) + self._internal_loop.call_soon_threadsafe(self.__deinit) self._thread.join() self._profile_models = {} @@ -683,9 +658,9 @@ async def update_net_ifs_async(self, net_ifs: list[str]) -> None: self._net_ifs = set(net_ifs) await self.init_async() return - self.__lan_send_cmd( - cmd=MIoTLanCmdType.NET_IFS_UPDATE, - data=net_ifs) + self._internal_loop.call_soon_threadsafe( + self.__update_net_ifs, + net_ifs) async def vote_for_lan_ctrl_async(self, key: str, vote: bool) -> None: _LOGGER.info('vote for lan ctrl, %s, %s', key, vote) @@ -700,22 +675,21 @@ async def update_subscribe_option(self, enable_subscribe: bool) -> None: if not self._init_done: self._enable_subscribe = enable_subscribe return - self.__lan_send_cmd( - cmd=MIoTLanCmdType.OPTIONS_UPDATE, - data={ - 'enable_subscribe': enable_subscribe, }) + self._internal_loop.call_soon_threadsafe( + self.__update_subscribe_option, + {'enable_subscribe': enable_subscribe}) def update_devices(self, devices: dict[str, dict]) -> bool: _LOGGER.info('update devices, %s', devices) - return self.__lan_send_cmd( - cmd=MIoTLanCmdType.DEVICE_UPDATE, - data=devices) + self._internal_loop.call_soon_threadsafe( + self.__update_devices, devices) + return True def delete_devices(self, devices: list[str]) -> bool: _LOGGER.info('delete devices, %s', devices) - return self.__lan_send_cmd( - cmd=MIoTLanCmdType.DEVICE_DELETE, - data=devices) + self._internal_loop.call_soon_threadsafe( + self.__delete_devices, devices) + return True def sub_lan_state( self, key: str, handler: Callable[[bool], Coroutine] @@ -727,19 +701,20 @@ def unsub_lan_state(self, key: str) -> None: @final def sub_device_state( - self, key: str, handler: Callable[[str, dict, Any], None], + self, key: str, handler: Callable[[str, dict, Any], Coroutine], handler_ctx: Any = None ) -> bool: - return self.__lan_send_cmd( - cmd=MIoTLanCmdType.SUB_DEVICE_STATE, - data=MIoTLanSubDeviceState( + self._internal_loop.call_soon_threadsafe( + self.__sub_device_state, + MIoTLanSubDeviceState( key=key, handler=handler, handler_ctx=handler_ctx)) + return True @final def unsub_device_state(self, key: str) -> bool: - return self.__lan_send_cmd( - cmd=MIoTLanCmdType.UNSUB_DEVICE_STATE, - data=MIoTLanUnsubDeviceState(key=key)) + self._internal_loop.call_soon_threadsafe( + self.__unsub_device_state, MIoTLanUnsubDeviceState(key=key)) + return True @final def sub_prop( @@ -751,10 +726,11 @@ def sub_prop( key = ( f'{did}/p/' f'{"#" if siid is None or piid is None else f"{siid}/{piid}"}') - return self.__lan_send_cmd( - cmd=MIoTLanCmdType.REG_BROADCAST, - data=MIoTLanRegisterBroadcastData( + self._internal_loop.call_soon_threadsafe( + self.__sub_broadcast, + MIoTLanRegisterBroadcastData( key=key, handler=handler, handler_ctx=handler_ctx)) + return True @final def unsub_prop(self, did: str, siid: Optional[int] = None, piid: Optional[int] = None) -> bool: @@ -763,9 +739,10 @@ def unsub_prop(self, did: str, siid: Optional[int] = None, piid: Optional[int] = key = ( f'{did}/p/' f'{"#" if siid is None or piid is None else f"{siid}/{piid}"}') - return self.__lan_send_cmd( - cmd=MIoTLanCmdType.UNREG_BROADCAST, - data=MIoTLanUnregisterBroadcastData(key=key)) + self._internal_loop.call_soon_threadsafe( + self.__unsub_broadcast, + MIoTLanUnregisterBroadcastData(key=key)) + return True @final def sub_event( @@ -777,10 +754,11 @@ def sub_event( key = ( f'{did}/e/' f'{"#" if siid is None or eiid is None else f"{siid}/{eiid}"}') - return self.__lan_send_cmd( - cmd=MIoTLanCmdType.REG_BROADCAST, - data=MIoTLanRegisterBroadcastData( + self._internal_loop.call_soon_threadsafe( + self.__sub_broadcast, + MIoTLanRegisterBroadcastData( key=key, handler=handler, handler_ctx=handler_ctx)) + return True @final def unsub_event(self, did: str, siid: Optional[int] = None, eiid: Optional[int] = None) -> bool: @@ -789,9 +767,10 @@ def unsub_event(self, did: str, siid: Optional[int] = None, eiid: Optional[int] key = ( f'{did}/e/' f'{"#" if siid is None or eiid is None else f"{siid}/{eiid}"}') - return self.__lan_send_cmd( - cmd=MIoTLanCmdType.UNREG_BROADCAST, - data=MIoTLanUnregisterBroadcastData(key=key)) + self._internal_loop.call_soon_threadsafe( + self.__unsub_broadcast, + MIoTLanUnregisterBroadcastData(key=key)) + return True @final async def get_prop_async( @@ -870,17 +849,67 @@ def get_device_list_handler(msg: dict, fut: asyncio.Future): fut.set_result, msg) fut: asyncio.Future = self._main_loop.create_future() - if self.__lan_send_cmd( - MIoTLanCmdType.GET_DEV_LIST, + self._internal_loop.call_soon_threadsafe( + self.__get_dev_list, MIoTLanGetDevListData( handler=get_device_list_handler, handler_ctx=fut, - timeout_ms=timeout_ms)): - return await fut - _LOGGER.error('get_dev_list_async error, send cmd failed') - fut.set_result({}) + timeout_ms=timeout_ms)) return await fut + async def __call_api_async( + self, did: str, msg: dict, timeout_ms: int = 10000 + ) -> dict: + def call_api_handler(msg: dict, fut: asyncio.Future): + self._main_loop.call_soon_threadsafe( + fut.set_result, msg) + + fut: asyncio.Future = self._main_loop.create_future() + self._internal_loop.call_soon_threadsafe( + self.__call_api, did, msg, call_api_handler, fut, timeout_ms) + return await fut + + async def __on_network_info_change_external_async( + self, + status: InterfaceStatus, + info: NetworkInfo + ) -> None: + _LOGGER.info( + 'on network info change, status: %s, info: %s', status, info) + available_net_ifs = set() + for if_name in list(self._network.network_info.keys()): + available_net_ifs.add(if_name) + if len(available_net_ifs) == 0: + await self.deinit_async() + self._available_net_ifs = available_net_ifs + return + if self._net_ifs.isdisjoint(available_net_ifs): + _LOGGER.info('no valid net_ifs') + await self.deinit_async() + self._available_net_ifs = available_net_ifs + return + if not self._init_done: + self._available_net_ifs = available_net_ifs + await self.init_async() + return + self._internal_loop.call_soon_threadsafe( + self.__on_network_info_chnage, + MIoTLanNetworkUpdateData(status=status, if_name=info.name)) + + async def __on_mips_service_change( + self, group_id: str, state: MipsServiceState, data: dict + ) -> None: + _LOGGER.info( + 'on mips service change, %s, %s, %s', group_id, state, data) + if len(self._mips_service.get_services()) > 0: + _LOGGER.info('find central service, deinit miot lan') + await self.deinit_async() + else: + _LOGGER.info('no central service, init miot lan') + await self.init_async() + +# The folowing methods SHOULD ONLY be called in the internal loop + def ping(self, if_name: str | None, target_ip: str) -> None: if not target_ip: return @@ -956,8 +985,8 @@ def request_timeout_handler(req_data: MIoTLanRequestData): def broadcast_device_state(self, did: str, state: dict) -> None: for handler in self._device_state_sub_map.values(): self._main_loop.call_soon_threadsafe( - self._main_loop.create_task, - handler.handler(did, state, handler.handler_ctx)) + lambda: self._main_loop.create_task( + handler.handler(did, state, handler.handler_ctx))) def __gen_msg_id(self) -> int: if not self._msg_id_counter: @@ -967,167 +996,129 @@ def __gen_msg_id(self) -> int: self._msg_id_counter = 1 return self._msg_id_counter - def __lan_send_cmd(self, cmd: MIoTLanCmdType, data: Any) -> bool: + def __call_api(self, did: str, msg: dict, handler: Callable, handler_ctx: Any, timeout_ms: int = 10000) -> None: try: - self._queue.put(MIoTLanCmd(type_=cmd, data=data)) - os.eventfd_write(self._cmd_event_fd, 1) - return True + self.send2device( + did=did, + msg={'from': 'ha.xiaomi_home', **msg}, + handler=handler, + handler_ctx=handler_ctx, + timeout_ms=timeout_ms) except Exception as err: # pylint: disable=broad-exception-caught - _LOGGER.error('send cmd error, %s, %s', cmd, err) - return False - - async def __call_api_async( - self, did: str, msg: dict, timeout_ms: int = 10000 - ) -> dict: - def call_api_handler(msg: dict, fut: asyncio.Future): - self._main_loop.call_soon_threadsafe( - fut.set_result, msg) - - fut: asyncio.Future = self._main_loop.create_future() - if self.__lan_send_cmd( - cmd=MIoTLanCmdType.CALL_API, - data=MIoTLanCallApiData( - did=did, - msg=msg, - handler=call_api_handler, - handler_ctx=fut, - timeout_ms=timeout_ms)): - return await fut - - fut.set_result({ - 'code': MIoTErrorCode.CODE_UNAVAILABLE.value, - 'error': 'send cmd error'}) - return await fut - - def __cmd_read_handler(self) -> None: - fd_value = os.eventfd_read(self._cmd_event_fd) - if fd_value == 0: - return - while not self._queue.empty(): - mips_cmd: MIoTLanCmd = self._queue.get(block=False) - if mips_cmd.type_ == MIoTLanCmdType.CALL_API: - call_api_data: MIoTLanCallApiData = mips_cmd.data - try: - self.send2device( - did=call_api_data.did, - msg={'from': 'ha.xiaomi_home', **call_api_data.msg}, - handler=call_api_data.handler, - handler_ctx=call_api_data.handler_ctx, - timeout_ms=call_api_data.timeout_ms) - except Exception as err: # pylint: disable=broad-exception-caught - _LOGGER.error('send2device error, %s', err) - call_api_data.handler({ - 'code': MIoTErrorCode.CODE_INTERNAL_ERROR.value, - 'error': str(err)}, - call_api_data.handler_ctx) - elif mips_cmd.type_ == MIoTLanCmdType.SUB_DEVICE_STATE: - sub_data: MIoTLanSubDeviceState = mips_cmd.data - self._device_state_sub_map[sub_data.key] = sub_data - elif mips_cmd.type_ == MIoTLanCmdType.UNSUB_DEVICE_STATE: - sub_data: MIoTLanUnsubDeviceState = mips_cmd.data - self._device_state_sub_map.pop(sub_data.key, None) - elif mips_cmd.type_ == MIoTLanCmdType.REG_BROADCAST: - reg_data: MIoTLanRegisterBroadcastData = mips_cmd.data - self._device_msg_matcher[reg_data.key] = reg_data - _LOGGER.debug('lan register broadcast, %s', reg_data.key) - elif mips_cmd.type_ == MIoTLanCmdType.UNREG_BROADCAST: - unreg_data: MIoTLanUnregisterBroadcastData = mips_cmd.data - if self._device_msg_matcher.get(topic=unreg_data.key): - del self._device_msg_matcher[unreg_data.key] - _LOGGER.debug('lan unregister broadcast, %s', unreg_data.key) - elif mips_cmd.type_ == MIoTLanCmdType.GET_DEV_LIST: - get_dev_list_data: MIoTLanGetDevListData = mips_cmd.data - dev_list = { - device.did: { - 'online': device.online, - 'push_available': device.subscribed - } - for device in self._lan_devices.values() - if device.online} - get_dev_list_data.handler( - dev_list, get_dev_list_data.handler_ctx) - elif mips_cmd.type_ == MIoTLanCmdType.DEVICE_UPDATE: - devices: dict[str, dict] = mips_cmd.data - for did, info in devices.items(): - # did MUST be digit(UINT64) - if not did.isdigit(): - _LOGGER.info('invalid did, %s', did) - continue - if ( - 'model' not in info - or info['model'] in self._profile_models): - # Do not support the local control of - # Profile device for the time being - _LOGGER.info( - 'model not support local ctrl, %s, %s', - did, info.get('model')) - continue - if did not in self._lan_devices: - if 'token' not in info: - _LOGGER.error( - 'token not found, %s, %s', did, info) - continue - if len(info['token']) != 32: - _LOGGER.error( - 'invalid device token, %s, %s', did, info) - continue - self._lan_devices[did] = MIoTLanDevice( - manager=self, did=did, token=info['token'], - ip=info.get('ip', None)) - else: - self._lan_devices[did].update_info(info) - elif mips_cmd.type_ == MIoTLanCmdType.DEVICE_DELETE: - device_dids: list[str] = mips_cmd.data - for did in device_dids: - lan_device = self._lan_devices.pop(did, None) - if not lan_device: - continue - lan_device.on_delete() - elif mips_cmd.type_ == MIoTLanCmdType.NET_INFO_UPDATE: - net_data: MIoTLanNetworkUpdateData = mips_cmd.data - if net_data.status == InterfaceStatus.ADD: - self._available_net_ifs.add(net_data.if_name) - if net_data.if_name in self._net_ifs: - self.__create_socket(if_name=net_data.if_name) - elif net_data.status == InterfaceStatus.REMOVE: - self._available_net_ifs.remove(net_data.if_name) - self.__destroy_socket(if_name=net_data.if_name) - elif mips_cmd.type_ == MIoTLanCmdType.NET_IFS_UPDATE: - net_ifs: list[str] = mips_cmd.data - if self._net_ifs != set(net_ifs): - self._net_ifs = set(net_ifs) - for if_name in self._net_ifs: - self.__create_socket(if_name=if_name) - for if_name in list(self._broadcast_socks.keys()): - if if_name not in self._net_ifs: - self.__destroy_socket(if_name=if_name) - elif mips_cmd.type_ == MIoTLanCmdType.OPTIONS_UPDATE: - options: dict = mips_cmd.data - if 'enable_subscribe' in options: - if options['enable_subscribe'] != self._enable_subscribe: - self._enable_subscribe = options['enable_subscribe'] - if not self._enable_subscribe: - # Unsubscribe all - for device in self._lan_devices.values(): - device.unsubscribe() - elif mips_cmd.type_ == MIoTLanCmdType.DEINIT: - # stop the thread - if self._scan_timer: - self._scan_timer.cancel() - self._scan_timer = None - for device in self._lan_devices.values(): - device.on_delete() - self._lan_devices.clear() - for req_data in self._pending_requests.values(): - req_data.timeout.cancel() - self._pending_requests.clear() - for timer in self._reply_msg_buffer.values(): - timer.cancel() - self._reply_msg_buffer.clear() - self._device_msg_matcher = MIoTMatcher() - self.__deinit_socket() - # DO NOT force a event loop to stop. - # It will stop when you release all handles properly. + _LOGGER.error('send2device error, %s', err) + handler({ + 'code': MIoTErrorCode.CODE_INTERNAL_ERROR.value, + 'error': str(err)}, + handler_ctx) + + def __sub_device_state(self, data: MIoTLanSubDeviceState) -> None: + self._device_state_sub_map[data.key] = data + + def __unsub_device_state(self, data: MIoTLanUnsubDeviceState) -> None: + self._device_state_sub_map.pop(data.key, None) + + def __sub_broadcast(self, data: MIoTLanRegisterBroadcastData) -> None: + self._device_msg_matcher[data.key] = data + _LOGGER.debug('lan register broadcast, %s', data.key) + + def __unsub_broadcast(self, data: MIoTLanUnregisterBroadcastData) -> None: + if self._device_msg_matcher.get(topic=data.key): + del self._device_msg_matcher[data.key] + _LOGGER.debug('lan unregister broadcast, %s', data.key) + + def __get_dev_list(self, data: MIoTLanGetDevListData) -> None: + dev_list = { + device.did: { + 'online': device.online, + 'push_available': device.subscribed + } + for device in self._lan_devices.values() + if device.online} + data.handler( + dev_list, data.handler_ctx) + + def __update_devices(self, devices: dict[str, dict]) -> None: + for did, info in devices.items(): + # did MUST be digit(UINT64) + if not did.isdigit(): + _LOGGER.info('invalid did, %s', did) + continue + if ( + 'model' not in info + or info['model'] in self._profile_models): + # Do not support the local control of + # Profile device for the time being + _LOGGER.info( + 'model not support local ctrl, %s, %s', + did, info.get('model')) + continue + if did not in self._lan_devices: + if 'token' not in info: + _LOGGER.error( + 'token not found, %s, %s', did, info) + continue + if len(info['token']) != 32: + _LOGGER.error( + 'invalid device token, %s, %s', did, info) + continue + self._lan_devices[did] = MIoTLanDevice( + manager=self, did=did, token=info['token'], + ip=info.get('ip', None)) + else: + self._lan_devices[did].update_info(info) + + def __delete_devices(self, devices: list[str]) -> None: + for did in devices: + lan_device = self._lan_devices.pop(did, None) + if not lan_device: + continue + lan_device.on_delete() + + def __on_network_info_chnage(self, data: MIoTLanNetworkUpdateData) -> None: + if data.status == InterfaceStatus.ADD: + self._available_net_ifs.add(data.if_name) + if data.if_name in self._net_ifs: + self.__create_socket(if_name=data.if_name) + elif data.status == InterfaceStatus.REMOVE: + self._available_net_ifs.remove(data.if_name) + self.__destroy_socket(if_name=data.if_name) + + def __update_net_ifs(self, net_ifs: list[str]) -> None: + if self._net_ifs != set(net_ifs): + self._net_ifs = set(net_ifs) + for if_name in self._net_ifs: + self.__create_socket(if_name=if_name) + for if_name in list(self._broadcast_socks.keys()): + if if_name not in self._net_ifs: + self.__destroy_socket(if_name=if_name) + + def __update_subscribe_option(self, options: dict) -> None: + if 'enable_subscribe' in options: + if options['enable_subscribe'] != self._enable_subscribe: + self._enable_subscribe = options['enable_subscribe'] + if not self._enable_subscribe: + # Unsubscribe all + for device in self._lan_devices.values(): + device.unsubscribe() + + def __deinit(self) -> None: + # Release all resources + if self._scan_timer: + self._scan_timer.cancel() + self._scan_timer = None + for device in self._lan_devices.values(): + device.on_delete() + self._lan_devices.clear() + for req_data in self._pending_requests.values(): + if req_data.timeout: + req_data.timeout.cancel() + self._pending_requests.clear() + for timer in self._reply_msg_buffer.values(): + timer.cancel() + self._reply_msg_buffer.clear() + self._device_msg_matcher = MIoTMatcher() + self.__deinit_socket() + self._internal_loop.stop() def __init_socket(self) -> None: self.__deinit_socket() @@ -1237,7 +1228,8 @@ def __message_handler(self, did: str, msg: dict) -> None: # Reply req: MIoTLanRequestData | None = self._pending_requests.pop(msg['id'], None) if req: - req.timeout.cancel() + if req.timeout: + req.timeout.cancel() if req.handler is not None: self._main_loop.call_soon_threadsafe( req.handler, msg, req.handler_ctx) @@ -1329,42 +1321,3 @@ def __get_next_scan_time(self) -> float: self._last_scan_interval = min( self._last_scan_interval*2, self.OT_PROBE_INTERVAL_MAX) return self._last_scan_interval - - async def __on_network_info_change( - self, - status: InterfaceStatus, - info: NetworkInfo - ) -> None: - _LOGGER.info( - 'on network info change, status: %s, info: %s', status, info) - available_net_ifs = set() - for if_name in list(self._network.network_info.keys()): - available_net_ifs.add(if_name) - if len(available_net_ifs) == 0: - await self.deinit_async() - self._available_net_ifs = available_net_ifs - return - if self._net_ifs.isdisjoint(available_net_ifs): - _LOGGER.info('no valid net_ifs') - await self.deinit_async() - self._available_net_ifs = available_net_ifs - return - if not self._init_done: - self._available_net_ifs = available_net_ifs - await self.init_async() - return - self.__lan_send_cmd( - MIoTLanCmdType.NET_INFO_UPDATE, MIoTLanNetworkUpdateData( - status=status, if_name=info.name)) - - async def __on_mips_service_change( - self, group_id: str, state: MipsServiceState, data: dict - ) -> None: - _LOGGER.info( - 'on mips service change, %s, %s, %s', group_id, state, data) - if len(self._mips_service.get_services()) > 0: - _LOGGER.info('find central service, deinit miot lan') - await self.deinit_async() - else: - _LOGGER.info('no central service, init miot lan') - await self.init_async() From 569e1512e3052fb13664c69a9aed4b6c5611fd90 Mon Sep 17 00:00:00 2001 From: Feng Wang Date: Sat, 21 Dec 2024 21:52:45 +0800 Subject: [PATCH 03/12] fix lint --- .../xiaomi_home/miot/miot_lan.py | 82 +++++++++++++------ 1 file changed, 57 insertions(+), 25 deletions(-) diff --git a/custom_components/xiaomi_home/miot/miot_lan.py b/custom_components/xiaomi_home/miot/miot_lan.py index ff32859..a05d2cc 100644 --- a/custom_components/xiaomi_home/miot/miot_lan.py +++ b/custom_components/xiaomi_home/miot/miot_lan.py @@ -180,7 +180,11 @@ class MIoTLanDevice: # All functions SHOULD be called from the internal loop def __init__( - self, manager: 'MIoTLan', did: str, token: str, ip: Optional[str] = None + self, + manager: 'MIoTLan', + did: str, + token: str, + ip: Optional[str] = None ) -> None: self._manager: MIoTLan = manager self.did = did @@ -427,9 +431,10 @@ def __change_online(self, online: bool) -> None: self.online = True else: _LOGGER.info('unstable device detected, %s', self.did) - self._online_offline_timer = self._manager.internal_loop.call_later( - self.NETWORK_UNSTABLE_RESUME_TH, - self.__online_resume_handler) + self._online_offline_timer = \ + self._manager.internal_loop.call_later( + self.NETWORK_UNSTABLE_RESUME_TH, + self.__online_resume_handler) def __online_resume_handler(self) -> None: _LOGGER.info('unstable resume threshold past, %s', self.did) @@ -491,13 +496,13 @@ class MIoTLan: # The following should be called from the main loop def __init__( - self, - net_ifs: list[str], - network: MIoTNetwork, - mips_service: MipsService, - enable_subscribe: bool = False, - virtual_did: Optional[int] = None, - loop: Optional[asyncio.AbstractEventLoop] = None + self, + net_ifs: list[str], + network: MIoTNetwork, + mips_service: MipsService, + enable_subscribe: bool = False, + virtual_did: Optional[int] = None, + loop: Optional[asyncio.AbstractEventLoop] = None ) -> None: if not network: raise ValueError('network is required') @@ -513,7 +518,9 @@ def __init__( key='miot_lan', group_id='*', handler=self.__on_mips_service_change) self._enable_subscribe = enable_subscribe - self._virtual_did = str(virtual_did) if (virtual_did is not None) else str(secrets.randbits(64)) + self._virtual_did = str(virtual_did) \ + if (virtual_did is not None) \ + else str(secrets.randbits(64)) # Init socket probe message probe_bytes = bytearray(self.OT_PROBE_LEN) probe_bytes[:20] = ( @@ -600,7 +607,7 @@ async def init_async(self) -> None: self._main_loop.create_task(handler(True)) _LOGGER.info( 'miot lan init, %s ,%s', self._net_ifs, self._available_net_ifs) - + def __internal_loop_thread(self) -> None: _LOGGER.info('miot lan thread start') self._internal_loop = asyncio.new_event_loop() @@ -718,8 +725,12 @@ def unsub_device_state(self, key: str) -> bool: @final def sub_prop( - self, did: str, handler: Callable[[dict, Any], None], - siid: Optional[int] = None, piid: Optional[int] = None, handler_ctx: Any = None + self, + did: str, + handler: Callable[[dict, Any], None], + siid: Optional[int] = None, + piid: Optional[int] = None, + handler_ctx: Any = None ) -> bool: if not self._enable_subscribe: return False @@ -733,21 +744,30 @@ def sub_prop( return True @final - def unsub_prop(self, did: str, siid: Optional[int] = None, piid: Optional[int] = None) -> bool: + def unsub_prop( + self, + did: str, + siid: Optional[int] = None, + piid: Optional[int] = None + ) -> bool: if not self._enable_subscribe: return False key = ( f'{did}/p/' f'{"#" if siid is None or piid is None else f"{siid}/{piid}"}') self._internal_loop.call_soon_threadsafe( - self.__unsub_broadcast, + self.__unsub_broadcast, MIoTLanUnregisterBroadcastData(key=key)) return True @final def sub_event( - self, did: str, handler: Callable[[dict, Any], None], - siid: Optional[int] = None, eiid: Optional[int] = None, handler_ctx: Any = None + self, + did: str, + handler: Callable[[dict, Any], None], + siid: Optional[int] = None, + eiid: Optional[int] = None, + handler_ctx: Any = None ) -> bool: if not self._enable_subscribe: return False @@ -761,7 +781,12 @@ def sub_event( return True @final - def unsub_event(self, did: str, siid: Optional[int] = None, eiid: Optional[int] = None) -> bool: + def unsub_event( + self, + did: str, + siid: Optional[int] = None, + eiid: Optional[int] = None + ) -> bool: if not self._enable_subscribe: return False key = ( @@ -908,7 +933,7 @@ async def __on_mips_service_change( _LOGGER.info('no central service, init miot lan') await self.init_async() -# The folowing methods SHOULD ONLY be called in the internal loop +# The folowing methods SHOULD ONLY be called in the internal loop def ping(self, if_name: str | None, target_ip: str) -> None: if not target_ip: @@ -985,8 +1010,8 @@ def request_timeout_handler(req_data: MIoTLanRequestData): def broadcast_device_state(self, did: str, state: dict) -> None: for handler in self._device_state_sub_map.values(): self._main_loop.call_soon_threadsafe( - lambda: self._main_loop.create_task( - handler.handler(did, state, handler.handler_ctx))) + self._main_loop.create_task, + handler.handler(did, state, handler.handler_ctx)) def __gen_msg_id(self) -> int: if not self._msg_id_counter: @@ -996,7 +1021,14 @@ def __gen_msg_id(self) -> int: self._msg_id_counter = 1 return self._msg_id_counter - def __call_api(self, did: str, msg: dict, handler: Callable, handler_ctx: Any, timeout_ms: int = 10000) -> None: + def __call_api( + self, + did: str, + msg: dict, + handler: Callable, + handler_ctx: Any, + timeout_ms: int = 10000 + ) -> None: try: self.send2device( did=did, @@ -1304,11 +1336,11 @@ def __sendto( def __scan_devices(self) -> None: if self._scan_timer: self._scan_timer.cancel() - # Ignore any exceptions to avoid blocking the loop try: # Scan devices self.ping(if_name=None, target_ip='255.255.255.255') except: + # Ignore any exceptions to avoid blocking the loop pass scan_time = self.__get_next_scan_time() self._scan_timer = self._internal_loop.call_later( From f288a951b941d09438fe2a7da2a61aab77e9c870 Mon Sep 17 00:00:00 2001 From: Feng Wang Date: Sat, 21 Dec 2024 21:58:24 +0800 Subject: [PATCH 04/12] add tev back --- custom_components/xiaomi_home/miot/miot_ev.py | 324 ++++++++++++++++++ test/test_ev.py | 55 +++ 2 files changed, 379 insertions(+) create mode 100644 custom_components/xiaomi_home/miot/miot_ev.py create mode 100644 test/test_ev.py diff --git a/custom_components/xiaomi_home/miot/miot_ev.py b/custom_components/xiaomi_home/miot/miot_ev.py new file mode 100644 index 0000000..be4e684 --- /dev/null +++ b/custom_components/xiaomi_home/miot/miot_ev.py @@ -0,0 +1,324 @@ +# -*- coding: utf-8 -*- +""" +Copyright (C) 2024 Xiaomi Corporation. + +The ownership and intellectual property rights of Xiaomi Home Assistant +Integration and related Xiaomi cloud service API interface provided under this +license, including source code and object code (collectively, "Licensed Work"), +are owned by Xiaomi. Subject to the terms and conditions of this License, Xiaomi +hereby grants you a personal, limited, non-exclusive, non-transferable, +non-sublicensable, and royalty-free license to reproduce, use, modify, and +distribute the Licensed Work only for your use of Home Assistant for +non-commercial purposes. For the avoidance of doubt, Xiaomi does not authorize +you to use the Licensed Work for any other purpose, including but not limited +to use Licensed Work to develop applications (APP), Web services, and other +forms of software. + +You may reproduce and distribute copies of the Licensed Work, with or without +modifications, whether in source or object form, provided that you must give +any other recipients of the Licensed Work a copy of this License and retain all +copyright and disclaimers. + +Xiaomi provides the Licensed Work on an "AS IS" BASIS, WITHOUT WARRANTIES OR +CONDITIONS OF ANY KIND, either express or implied, including, without +limitation, any warranties, undertakes, or conditions of TITLE, NO ERROR OR +OMISSION, CONTINUITY, RELIABILITY, NON-INFRINGEMENT, MERCHANTABILITY, or +FITNESS FOR A PARTICULAR PURPOSE. In any event, you are solely responsible +for any direct, indirect, special, incidental, or consequential damages or +losses arising from the use or inability to use the Licensed Work. + +Xiaomi reserves all rights not expressly granted to you in this License. +Except for the rights expressly granted by Xiaomi under this License, Xiaomi +does not authorize you in any form to use the trademarks, copyrights, or other +forms of intellectual property rights of Xiaomi and its affiliates, including, +without limitation, without obtaining other written permission from Xiaomi, you +shall not use "Xiaomi", "Mijia" and other words related to Xiaomi or words that +may make the public associate with Xiaomi in any form to publicize or promote +the software or hardware devices that use the Licensed Work. + +Xiaomi has the right to immediately terminate all your authorization under this +License in the event: +1. You assert patent invalidation, litigation, or other claims against patents +or other intellectual property rights of Xiaomi or its affiliates; or, +2. You make, have made, manufacture, sell, or offer to sell products that knock +off Xiaomi or its affiliates' products. + +MIoT event loop. +""" +import selectors +import heapq +import time +import traceback +from typing import Callable, TypeVar +import logging +import threading + +# pylint: disable=relative-beyond-top-level +from .miot_error import MIoTEvError + +_LOGGER = logging.getLogger(__name__) + +TimeoutHandle = TypeVar('TimeoutHandle') + + +class MIoTFdHandler: + """File descriptor handler.""" + fd: int + read_handler: Callable[[any], None] + read_handler_ctx: any + write_handler: Callable[[any], None] + write_handler_ctx: any + + def __init__( + self, fd: int, + read_handler: Callable[[any], None] = None, + read_handler_ctx: any = None, + write_handler: Callable[[any], None] = None, + write_handler_ctx: any = None + ) -> None: + self.fd = fd + self.read_handler = read_handler + self.read_handler_ctx = read_handler_ctx + self.write_handler = write_handler + self.write_handler_ctx = write_handler_ctx + + +class MIoTTimeout: + """Timeout handler.""" + key: TimeoutHandle + target: int + handler: Callable[[any], None] + handler_ctx: any + + def __init__( + self, key: str = None, target: int = None, + handler: Callable[[any], None] = None, + handler_ctx: any = None + ) -> None: + self.key = key + self.target = target + self.handler = handler + self.handler_ctx = handler_ctx + + def __lt__(self, other): + return self.target < other.target + + +class MIoTEventLoop: + """MIoT event loop.""" + _poll_fd: selectors.DefaultSelector + + _fd_handlers: dict[str, MIoTFdHandler] + + _timer_heap: list[MIoTTimeout] + _timer_handlers: dict[str, MIoTTimeout] + _timer_handle_seed: int + + # Label if the current fd handler is freed inside a read handler to + # avoid invalid reading. + _fd_handler_freed_in_read_handler: bool + + def __init__(self) -> None: + self._poll_fd = selectors.DefaultSelector() + self._timer_heap = [] + self._timer_handlers = {} + self._timer_handle_seed = 1 + self._fd_handlers = {} + self._fd_handler_freed_in_read_handler = False + + def loop_forever(self) -> None: + """Run an event loop in current thread.""" + next_timeout: int + while True: + next_timeout = 0 + # Handle timer + now_ms: int = self.__get_monotonic_ms + while len(self._timer_heap) > 0: + timer: MIoTTimeout = self._timer_heap[0] + if timer is None: + break + if timer.target <= now_ms: + heapq.heappop(self._timer_heap) + del self._timer_handlers[timer.key] + if timer.handler: + timer.handler(timer.handler_ctx) + else: + next_timeout = timer.target-now_ms + break + # Are there any files to listen to + if next_timeout == 0 and self._fd_handlers: + next_timeout = None # None == infinite + # Wait for timers & fds + if next_timeout == 0: + # Neither timer nor fds exist, exit loop + break + # Handle fd event + events = self._poll_fd.select( + timeout=next_timeout/1000.0 if next_timeout else next_timeout) + for key, mask in events: + fd_handler: MIoTFdHandler = key.data + if fd_handler is None: + continue + self._fd_handler_freed_in_read_handler = False + fd_key = str(id(fd_handler.fd)) + if fd_key not in self._fd_handlers: + continue + if ( + mask & selectors.EVENT_READ > 0 + and fd_handler.read_handler + ): + fd_handler.read_handler(fd_handler.read_handler_ctx) + if ( + mask & selectors.EVENT_WRITE > 0 + and self._fd_handler_freed_in_read_handler is False + and fd_handler.write_handler + ): + fd_handler.write_handler(fd_handler.write_handler_ctx) + + def loop_stop(self) -> None: + """Stop the event loop.""" + if self._poll_fd: + self._poll_fd.close() + self._poll_fd = None + self._fd_handlers = {} + self._timer_heap = [] + self._timer_handlers = {} + + def set_timeout( + self, timeout_ms: int, handler: Callable[[any], None], + handler_ctx: any = None + ) -> TimeoutHandle: + """Set a timer.""" + if timeout_ms is None or handler is None: + raise MIoTEvError('invalid params') + new_timeout: MIoTTimeout = MIoTTimeout() + new_timeout.key = self.__get_next_timeout_handle + new_timeout.target = self.__get_monotonic_ms + timeout_ms + new_timeout.handler = handler + new_timeout.handler_ctx = handler_ctx + heapq.heappush(self._timer_heap, new_timeout) + self._timer_handlers[new_timeout.key] = new_timeout + return new_timeout.key + + def clear_timeout(self, timer_key: TimeoutHandle) -> None: + """Stop and remove the timer.""" + if timer_key is None: + return + timer: MIoTTimeout = self._timer_handlers.pop(timer_key, None) + if timer: + self._timer_heap = list(self._timer_heap) + self._timer_heap.remove(timer) + heapq.heapify(self._timer_heap) + + def set_read_handler( + self, fd: int, handler: Callable[[any], None], handler_ctx: any = None + ) -> bool: + """Set a read handler for a file descriptor. + + Returns: + bool: True, success. False, failed. + """ + self.__set_handler( + fd, is_read=True, handler=handler, handler_ctx=handler_ctx) + + def set_write_handler( + self, fd: int, handler: Callable[[any], None], handler_ctx: any = None + ) -> bool: + """Set a write handler for a file descriptor. + + Returns: + bool: True, success. False, failed. + """ + self.__set_handler( + fd, is_read=False, handler=handler, handler_ctx=handler_ctx) + + def __set_handler( + self, fd, is_read: bool, handler: Callable[[any], None], + handler_ctx: any = None + ) -> bool: + """Set a handler.""" + if fd is None: + raise MIoTEvError('invalid params') + + if not self._poll_fd: + raise MIoTEvError('event loop not started') + + fd_key: str = str(id(fd)) + fd_handler = self._fd_handlers.get(fd_key, None) + + if fd_handler is None: + fd_handler = MIoTFdHandler(fd=fd) + fd_handler.fd = fd + self._fd_handlers[fd_key] = fd_handler + + read_handler_existed = fd_handler.read_handler is not None + write_handler_existed = fd_handler.write_handler is not None + if is_read is True: + fd_handler.read_handler = handler + fd_handler.read_handler_ctx = handler_ctx + else: + fd_handler.write_handler = handler + fd_handler.write_handler_ctx = handler_ctx + + if fd_handler.read_handler is None and fd_handler.write_handler is None: + # Remove from epoll and map + try: + self._poll_fd.unregister(fd) + except (KeyError, ValueError, OSError) as e: + del e + self._fd_handlers.pop(fd_key, None) + # May be inside a read handler, if not, this has no effect + self._fd_handler_freed_in_read_handler = True + elif read_handler_existed is False and write_handler_existed is False: + # Add to epoll + events = 0x0 + if fd_handler.read_handler: + events |= selectors.EVENT_READ + if fd_handler.write_handler: + events |= selectors.EVENT_WRITE + try: + self._poll_fd.register(fd, events=events, data=fd_handler) + except (KeyError, ValueError, OSError) as e: + _LOGGER.error( + '%s, register fd, error, %s, %s, %s, %s, %s', + threading.current_thread().name, + 'read' if is_read else 'write', + fd_key, handler, e, traceback.format_exc()) + self._fd_handlers.pop(fd_key, None) + return False + elif ( + read_handler_existed != (fd_handler.read_handler is not None) + or write_handler_existed != (fd_handler.write_handler is not None) + ): + # Modify epoll + events = 0x0 + if fd_handler.read_handler: + events |= selectors.EVENT_READ + if fd_handler.write_handler: + events |= selectors.EVENT_WRITE + try: + self._poll_fd.modify(fd, events=events, data=fd_handler) + except (KeyError, ValueError, OSError) as e: + _LOGGER.error( + '%s, modify fd, error, %s, %s, %s, %s, %s', + threading.current_thread().name, + 'read' if is_read else 'write', + fd_key, handler, e, traceback.format_exc()) + self._fd_handlers.pop(fd_key, None) + return False + + return True + + @property + def __get_next_timeout_handle(self) -> str: + # Get next timeout handle, that is not larger than the maximum + # value of UINT64 type. + self._timer_handle_seed += 1 + # uint64 max + self._timer_handle_seed %= 0xFFFFFFFFFFFFFFFF + return str(self._timer_handle_seed) + + @property + def __get_monotonic_ms(self) -> int: + """Get monotonic ms timestamp.""" + return int(time.monotonic()*1000) diff --git a/test/test_ev.py b/test/test_ev.py new file mode 100644 index 0000000..6353fe8 --- /dev/null +++ b/test/test_ev.py @@ -0,0 +1,55 @@ +# -*- coding: utf-8 -*- +"""Unit test for miot_ev.py.""" +import os +import pytest + +# pylint: disable=import-outside-toplevel, disable=unused-argument + + +@pytest.mark.github +def test_mev_timer_and_fd(): + from miot.miot_ev import MIoTEventLoop, TimeoutHandle + + mev = MIoTEventLoop() + assert mev + event_fd: os.eventfd = os.eventfd(0, os.O_NONBLOCK) + assert event_fd + timer4: TimeoutHandle = None + + def event_handler(event_fd): + value: int = os.eventfd_read(event_fd) + if value == 1: + mev.clear_timeout(timer4) + print('cancel timer4') + elif value == 2: + print('event write twice in a row') + elif value == 3: + mev.set_read_handler(event_fd, None, None) + os.close(event_fd) + event_fd = None + print('close event fd') + + def timer1_handler(event_fd): + os.eventfd_write(event_fd, 1) + + def timer2_handler(event_fd): + os.eventfd_write(event_fd, 1) + os.eventfd_write(event_fd, 1) + + def timer3_handler(event_fd): + os.eventfd_write(event_fd, 3) + + def timer4_handler(event_fd): + raise ValueError('unreachable code') + + mev.set_read_handler( + event_fd, event_handler, event_fd) + + mev.set_timeout(500, timer1_handler, event_fd) + mev.set_timeout(1000, timer2_handler, event_fd) + mev.set_timeout(1500, timer3_handler, event_fd) + timer4 = mev.set_timeout(2000, timer4_handler, event_fd) + + mev.loop_forever() + # Loop will exit when there are no timers or fd handlers. + mev.loop_stop() From 3f77738b90c71b473a74ee198edee41c3633e46b Mon Sep 17 00:00:00 2001 From: Feng Wang Date: Sat, 21 Dec 2024 22:02:19 +0800 Subject: [PATCH 05/12] fix lint --- custom_components/xiaomi_home/miot/miot_lan.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/custom_components/xiaomi_home/miot/miot_lan.py b/custom_components/xiaomi_home/miot/miot_lan.py index a05d2cc..2e8255a 100644 --- a/custom_components/xiaomi_home/miot/miot_lan.py +++ b/custom_components/xiaomi_home/miot/miot_lan.py @@ -512,7 +512,8 @@ def __init__( self._net_ifs = set(net_ifs) self._network = network self._network.sub_network_info( - key='miot_lan', handler=self.__on_network_info_change_external_async) + key='miot_lan', + handler=self.__on_network_info_change_external_async) self._mips_service = mips_service self._mips_service.sub_service_change( key='miot_lan', group_id='*', @@ -1258,7 +1259,8 @@ def __message_handler(self, did: str, msg: dict) -> None: _LOGGER.warning('invalid message, no id, %s, %s', did, msg) return # Reply - req: MIoTLanRequestData | None = self._pending_requests.pop(msg['id'], None) + req: MIoTLanRequestData | None = \ + self._pending_requests.pop(msg['id'], None) if req: if req.timeout: req.timeout.cancel() @@ -1339,7 +1341,7 @@ def __scan_devices(self) -> None: try: # Scan devices self.ping(if_name=None, target_ip='255.255.255.255') - except: + except Exception: # Ignore any exceptions to avoid blocking the loop pass scan_time = self.__get_next_scan_time() From 0872bf4f69e051e30a21f30922d1a3d7599ff932 Mon Sep 17 00:00:00 2001 From: Feng Wang Date: Sat, 21 Dec 2024 22:11:38 +0800 Subject: [PATCH 06/12] ignore broad exception warning --- custom_components/xiaomi_home/miot/miot_lan.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/custom_components/xiaomi_home/miot/miot_lan.py b/custom_components/xiaomi_home/miot/miot_lan.py index 2e8255a..4db79d5 100644 --- a/custom_components/xiaomi_home/miot/miot_lan.py +++ b/custom_components/xiaomi_home/miot/miot_lan.py @@ -1341,8 +1341,9 @@ def __scan_devices(self) -> None: try: # Scan devices self.ping(if_name=None, target_ip='255.255.255.255') - except Exception: + except Exception as err: # pylint: disable=broad-exception-caught # Ignore any exceptions to avoid blocking the loop + _LOGGER.error('ping device error, %s', err) pass scan_time = self.__get_next_scan_time() self._scan_timer = self._internal_loop.call_later( From 30e0433aee56e93068d946fc1aace8bd46d11981 Mon Sep 17 00:00:00 2001 From: Feng Wang Date: Sat, 21 Dec 2024 23:16:30 +0800 Subject: [PATCH 07/12] revert changes in the license --- custom_components/xiaomi_home/miot/miot_lan.py | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/custom_components/xiaomi_home/miot/miot_lan.py b/custom_components/xiaomi_home/miot/miot_lan.py index 4db79d5..1922332 100644 --- a/custom_components/xiaomi_home/miot/miot_lan.py +++ b/custom_components/xiaomi_home/miot/miot_lan.py @@ -10,30 +10,30 @@ non-sublicensable, and royalty-free license to reproduce, use, modify, and distribute the Licensed Work only for your use of Home Assistant for non-commercial purposes. For the avoidance of doubt, Xiaomi does not authorize -you to use the Licensed Work for Any other purpose, including but not limited +you to use the Licensed Work for any other purpose, including but not limited to use Licensed Work to develop applications (APP), Web services, and other forms of software. You may reproduce and distribute copies of the Licensed Work, with or without modifications, whether in source or object form, provided that you must give -Any other recipients of the Licensed Work a copy of this License and retain all +any other recipients of the Licensed Work a copy of this License and retain all copyright and disclaimers. Xiaomi provides the Licensed Work on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied, including, without -limitation, Any warranties, undertakes, or conditions of TITLE, NO ERROR OR +limitation, any warranties, undertakes, or conditions of TITLE, NO ERROR OR OMISSION, CONTINUITY, RELIABILITY, NON-INFRINGEMENT, MERCHANTABILITY, or -FITNESS FOR A PARTICULAR PURPOSE. In Any event, you are solely responsible -for Any direct, indirect, special, incidental, or consequential damages or +FITNESS FOR A PARTICULAR PURPOSE. In any event, you are solely responsible +for any direct, indirect, special, incidental, or consequential damages or losses arising from the use or inability to use the Licensed Work. Xiaomi reserves all rights not expressly granted to you in this License. Except for the rights expressly granted by Xiaomi under this License, Xiaomi -does not authorize you in Any form to use the trademarks, copyrights, or other +does not authorize you in any form to use the trademarks, copyrights, or other forms of intellectual property rights of Xiaomi and its affiliates, including, without limitation, without obtaining other written permission from Xiaomi, you shall not use "Xiaomi", "Mijia" and other words related to Xiaomi or words that -may make the public associate with Xiaomi in Any form to publicize or promote +may make the public associate with Xiaomi in any form to publicize or promote the software or hardware devices that use the Licensed Work. Xiaomi has the right to immediately terminate all your authorization under this @@ -598,6 +598,7 @@ async def init_async(self) -> None: except Exception as err: # pylint: disable=broad-exception-caught _LOGGER.error('load profile models error, %s', err) self._profile_models = {} + self._internal_loop = asyncio.new_event_loop() # All tasks meant for the internal loop should happen in this thread self._thread = threading.Thread(target=self.__internal_loop_thread) self._thread.name = 'miot_lan' @@ -611,13 +612,11 @@ async def init_async(self) -> None: def __internal_loop_thread(self) -> None: _LOGGER.info('miot lan thread start') - self._internal_loop = asyncio.new_event_loop() asyncio.set_event_loop(self._internal_loop) self.__init_socket() self._scan_timer = self._internal_loop.call_later( int(3*random.random()), self.__scan_devices) self._internal_loop.run_forever() - self._internal_loop.close() _LOGGER.info('miot lan thread exit') async def deinit_async(self) -> None: @@ -627,6 +626,7 @@ async def deinit_async(self) -> None: self._init_done = False self._internal_loop.call_soon_threadsafe(self.__deinit) self._thread.join() + self._internal_loop.close() self._profile_models = {} self._lan_devices = {} From 463216d86694e32851046200d94e87085db3ff42 Mon Sep 17 00:00:00 2001 From: Feng Wang Date: Sat, 21 Dec 2024 23:18:49 +0800 Subject: [PATCH 08/12] do not set asyncio event loop --- custom_components/xiaomi_home/miot/miot_lan.py | 1 - 1 file changed, 1 deletion(-) diff --git a/custom_components/xiaomi_home/miot/miot_lan.py b/custom_components/xiaomi_home/miot/miot_lan.py index 1922332..557665a 100644 --- a/custom_components/xiaomi_home/miot/miot_lan.py +++ b/custom_components/xiaomi_home/miot/miot_lan.py @@ -612,7 +612,6 @@ async def init_async(self) -> None: def __internal_loop_thread(self) -> None: _LOGGER.info('miot lan thread start') - asyncio.set_event_loop(self._internal_loop) self.__init_socket() self._scan_timer = self._internal_loop.call_later( int(3*random.random()), self.__scan_devices) From d13a6bfb114e928c3742f3b9a7eb1f9806d330f3 Mon Sep 17 00:00:00 2001 From: Feng Wang Date: Sun, 22 Dec 2024 19:51:58 +0800 Subject: [PATCH 09/12] fix racing condition --- .../xiaomi_home/miot/miot_lan.py | 82 ++++++++++--------- 1 file changed, 43 insertions(+), 39 deletions(-) diff --git a/custom_components/xiaomi_home/miot/miot_lan.py b/custom_components/xiaomi_home/miot/miot_lan.py index 557665a..98385e1 100644 --- a/custom_components/xiaomi_home/miot/miot_lan.py +++ b/custom_components/xiaomi_home/miot/miot_lan.py @@ -491,6 +491,7 @@ class MIoTLan: _profile_models: dict[str, dict] + _init_lock: asyncio.Lock _init_done: bool # The following should be called from the main loop @@ -547,6 +548,7 @@ def __init__( self._lan_state_sub_map = {} self._lan_ctrl_vote_map = {} + self._init_lock = asyncio.Lock() self._init_done = False if ( @@ -571,44 +573,46 @@ def init_done(self) -> bool: return self._init_done async def init_async(self) -> None: - if self._init_done: - _LOGGER.info('miot lan already init') - return - if len(self._net_ifs) == 0: - _LOGGER.info('no net_ifs') - return - if not any(self._lan_ctrl_vote_map.values()): - _LOGGER.info('no vote for lan ctrl') - return - if len(self._mips_service.get_services()) > 0: - _LOGGER.info('central hub gateway service exist') - return - for if_name in list(self._network.network_info.keys()): - self._available_net_ifs.add(if_name) - if len(self._available_net_ifs) == 0: - _LOGGER.info('no available net_ifs') - return - if self._net_ifs.isdisjoint(self._available_net_ifs): - _LOGGER.info('no valid net_ifs') - return - try: - self._profile_models = await self._main_loop.run_in_executor( - None, load_yaml_file, - gen_absolute_path(self.PROFILE_MODELS_FILE)) - except Exception as err: # pylint: disable=broad-exception-caught - _LOGGER.error('load profile models error, %s', err) - self._profile_models = {} - self._internal_loop = asyncio.new_event_loop() - # All tasks meant for the internal loop should happen in this thread - self._thread = threading.Thread(target=self.__internal_loop_thread) - self._thread.name = 'miot_lan' - self._thread.daemon = True - self._thread.start() - self._init_done = True - for handler in list(self._lan_state_sub_map.values()): - self._main_loop.create_task(handler(True)) - _LOGGER.info( - 'miot lan init, %s ,%s', self._net_ifs, self._available_net_ifs) + # Avoid race condition + async with self._init_lock: + if self._init_done: + _LOGGER.info('miot lan already init') + return + if len(self._net_ifs) == 0: + _LOGGER.info('no net_ifs') + return + if not any(self._lan_ctrl_vote_map.values()): + _LOGGER.info('no vote for lan ctrl') + return + if len(self._mips_service.get_services()) > 0: + _LOGGER.info('central hub gateway service exist') + return + for if_name in list(self._network.network_info.keys()): + self._available_net_ifs.add(if_name) + if len(self._available_net_ifs) == 0: + _LOGGER.info('no available net_ifs') + return + if self._net_ifs.isdisjoint(self._available_net_ifs): + _LOGGER.info('no valid net_ifs') + return + try: + self._profile_models = await self._main_loop.run_in_executor( + None, load_yaml_file, + gen_absolute_path(self.PROFILE_MODELS_FILE)) + except Exception as err: # pylint: disable=broad-exception-caught + _LOGGER.error('load profile models error, %s', err) + self._profile_models = {} + self._internal_loop = asyncio.new_event_loop() + # All tasks meant for the internal loop should happen in this thread + self._thread = threading.Thread(target=self.__internal_loop_thread) + self._thread.name = 'miot_lan' + self._thread.daemon = True + self._thread.start() + self._init_done = True + for handler in list(self._lan_state_sub_map.values()): + self._main_loop.create_task(handler(True)) + _LOGGER.info( + 'miot lan init, %s ,%s', self._net_ifs, self._available_net_ifs) def __internal_loop_thread(self) -> None: _LOGGER.info('miot lan thread start') @@ -1347,7 +1351,7 @@ def __scan_devices(self) -> None: scan_time = self.__get_next_scan_time() self._scan_timer = self._internal_loop.call_later( scan_time, self.__scan_devices) - _LOGGER.debug('next scan time: %sms', scan_time) + _LOGGER.debug('next scan time: %ss', scan_time) def __get_next_scan_time(self) -> float: if not self._last_scan_interval: From d3c39f38e1924c2a6380554a7a6b6ae6178ca75f Mon Sep 17 00:00:00 2001 From: Feng Wang Date: Sun, 22 Dec 2024 20:13:38 +0800 Subject: [PATCH 10/12] remove unused data classes --- .../xiaomi_home/miot/miot_lan.py | 31 +++++-------------- 1 file changed, 8 insertions(+), 23 deletions(-) diff --git a/custom_components/xiaomi_home/miot/miot_lan.py b/custom_components/xiaomi_home/miot/miot_lan.py index 98385e1..37bd9c5 100644 --- a/custom_components/xiaomi_home/miot/miot_lan.py +++ b/custom_components/xiaomi_home/miot/miot_lan.py @@ -76,27 +76,12 @@ @dataclass -class MIoTLanCmdData: +class MIoTLanGetDevListData: handler: Callable[[dict, Any], None] handler_ctx: Any timeout_ms: int -@dataclass -class MIoTLanGetDevListData(MIoTLanCmdData): - ... - - -@dataclass -class MIoTLanCallApiData(MIoTLanCmdData): - did: str - msg: dict - - -class MIoTLanSendBroadcastData(MIoTLanCallApiData): - ... - - @dataclass class MIoTLanUnregisterBroadcastData: key: str @@ -110,12 +95,12 @@ class MIoTLanRegisterBroadcastData: @dataclass -class MIoTLanUnsubDeviceState: +class MIoTLanUnsubDeviceData: key: str @dataclass -class MIoTLanSubDeviceState: +class MIoTLanSubDeviceData: key: str handler: Callable[[str, dict, Any], Coroutine] handler_ctx: Any @@ -483,7 +468,7 @@ class MIoTLan: _msg_id_counter: int _pending_requests: dict[int, MIoTLanRequestData] _device_msg_matcher: MIoTMatcher - _device_state_sub_map: dict[str, MIoTLanSubDeviceState] + _device_state_sub_map: dict[str, MIoTLanSubDeviceData] _reply_msg_buffer: dict[str, asyncio.TimerHandle] _lan_state_sub_map: dict[str, Callable[[bool], Coroutine]] @@ -717,14 +702,14 @@ def sub_device_state( ) -> bool: self._internal_loop.call_soon_threadsafe( self.__sub_device_state, - MIoTLanSubDeviceState( + MIoTLanSubDeviceData( key=key, handler=handler, handler_ctx=handler_ctx)) return True @final def unsub_device_state(self, key: str) -> bool: self._internal_loop.call_soon_threadsafe( - self.__unsub_device_state, MIoTLanUnsubDeviceState(key=key)) + self.__unsub_device_state, MIoTLanUnsubDeviceData(key=key)) return True @final @@ -1047,10 +1032,10 @@ def __call_api( 'error': str(err)}, handler_ctx) - def __sub_device_state(self, data: MIoTLanSubDeviceState) -> None: + def __sub_device_state(self, data: MIoTLanSubDeviceData) -> None: self._device_state_sub_map[data.key] = data - def __unsub_device_state(self, data: MIoTLanUnsubDeviceState) -> None: + def __unsub_device_state(self, data: MIoTLanUnsubDeviceData) -> None: self._device_state_sub_map.pop(data.key, None) def __sub_broadcast(self, data: MIoTLanRegisterBroadcastData) -> None: From fb46bc6b192b50a3967f5635337af244cab6b257 Mon Sep 17 00:00:00 2001 From: Feng Wang Date: Sun, 22 Dec 2024 20:16:45 +0800 Subject: [PATCH 11/12] change internal class scope --- .../xiaomi_home/miot/miot_lan.py | 100 +++++++++--------- 1 file changed, 50 insertions(+), 50 deletions(-) diff --git a/custom_components/xiaomi_home/miot/miot_lan.py b/custom_components/xiaomi_home/miot/miot_lan.py index 37bd9c5..760b733 100644 --- a/custom_components/xiaomi_home/miot/miot_lan.py +++ b/custom_components/xiaomi_home/miot/miot_lan.py @@ -76,51 +76,51 @@ @dataclass -class MIoTLanGetDevListData: +class _MIoTLanGetDevListData: handler: Callable[[dict, Any], None] handler_ctx: Any timeout_ms: int @dataclass -class MIoTLanUnregisterBroadcastData: +class _MIoTLanUnregisterBroadcastData: key: str @dataclass -class MIoTLanRegisterBroadcastData: +class _MIoTLanRegisterBroadcastData: key: str handler: Callable[[dict, Any], None] handler_ctx: Any @dataclass -class MIoTLanUnsubDeviceData: +class _MIoTLanUnsubDeviceData: key: str @dataclass -class MIoTLanSubDeviceData: +class _MIoTLanSubDeviceData: key: str handler: Callable[[str, dict, Any], Coroutine] handler_ctx: Any @dataclass -class MIoTLanNetworkUpdateData: +class _MIoTLanNetworkUpdateData: status: InterfaceStatus if_name: str @dataclass -class MIoTLanRequestData: +class _MIoTLanRequestData: msg_id: int handler: Optional[Callable[[dict, Any], None]] handler_ctx: Any timeout: Optional[asyncio.TimerHandle] -class MIoTLanDeviceState(Enum): +class _MIoTLanDeviceState(Enum): FRESH = 0 PING1 = auto() PING2 = auto() @@ -128,7 +128,7 @@ class MIoTLanDeviceState(Enum): DEAD = auto() -class MIoTLanDevice: +class _MIoTLanDevice: """MIoT lan device.""" # pylint: disable=unused-argument OT_HEADER: int = 0x2131 @@ -154,7 +154,7 @@ class MIoTLanDevice: _manager: 'MIoTLan' _if_name: Optional[str] _sub_locked: bool - _state: MIoTLanDeviceState + _state: _MIoTLanDeviceState _online: bool _online_offline_history: list[dict[str, Any]] _online_offline_timer: Optional[asyncio.TimerHandle] @@ -185,14 +185,14 @@ def __init__( self.supported_wildcard_sub = False self._if_name = None self._sub_locked = False - self._state = MIoTLanDeviceState.DEAD + self._state = _MIoTLanDeviceState.DEAD self._online = False self._online_offline_history = [] self._online_offline_timer = None def ka_init_handler() -> None: self._ka_internal = self.KA_INTERVAL_MIN - self.__update_keep_alive(state=MIoTLanDeviceState.DEAD) + self.__update_keep_alive(state=_MIoTLanDeviceState.DEAD) self._ka_timer = self._manager.internal_loop.call_later( randomize_float(self.CONSTRUCT_STATE_PENDING, 0.5), ka_init_handler,) @@ -204,7 +204,7 @@ def keep_alive(self, ip: str, if_name: str) -> None: self._if_name = if_name _LOGGER.info( 'device if_name change, %s, %s', self._if_name, self.did) - self.__update_keep_alive(state=MIoTLanDeviceState.FRESH) + self.__update_keep_alive(state=_MIoTLanDeviceState.FRESH) @property def online(self) -> bool: @@ -352,31 +352,31 @@ def __unsubscribe_handler(self, msg: dict, ctx: Any) -> None: return _LOGGER.info('unsubscribe success, %s, %s', self._if_name, self.did) - def __update_keep_alive(self, state: MIoTLanDeviceState) -> None: - last_state: MIoTLanDeviceState = self._state + def __update_keep_alive(self, state: _MIoTLanDeviceState) -> None: + last_state: _MIoTLanDeviceState = self._state self._state = state - if self._state != MIoTLanDeviceState.FRESH: + if self._state != _MIoTLanDeviceState.FRESH: _LOGGER.debug('device status, %s, %s', self.did, self._state) if self._ka_timer: self._ka_timer.cancel() self._ka_timer = None match state: - case MIoTLanDeviceState.FRESH: - if last_state == MIoTLanDeviceState.DEAD: + case _MIoTLanDeviceState.FRESH: + if last_state == _MIoTLanDeviceState.DEAD: self._ka_internal = self.KA_INTERVAL_MIN self.__change_online(True) self._ka_timer = self._manager.internal_loop.call_later( self.__get_next_ka_timeout(), self.__update_keep_alive, - MIoTLanDeviceState.PING1) + _MIoTLanDeviceState.PING1) case ( - MIoTLanDeviceState.PING1 - | MIoTLanDeviceState.PING2 - | MIoTLanDeviceState.PING3 + _MIoTLanDeviceState.PING1 + | _MIoTLanDeviceState.PING2 + | _MIoTLanDeviceState.PING3 ): # Set the timer first to avoid Any early returns self._ka_timer = self._manager.internal_loop.call_later( self.FAST_PING_INTERVAL, self.__update_keep_alive, - MIoTLanDeviceState(state.value+1)) + _MIoTLanDeviceState(state.value+1)) # Fast ping if self._if_name is None: _LOGGER.error('if_name is Not set for device, %s', self.did) @@ -385,8 +385,8 @@ def __update_keep_alive(self, state: MIoTLanDeviceState) -> None: _LOGGER.error('ip is Not set for device, %s', self.did) return self._manager.ping(if_name=self._if_name, target_ip=self.ip) - case MIoTLanDeviceState.DEAD: - if last_state == MIoTLanDeviceState.PING3: + case _MIoTLanDeviceState.DEAD: + if last_state == _MIoTLanDeviceState.PING3: self._ka_internal = self.KA_INTERVAL_MIN self.__change_online(False) case _: @@ -451,7 +451,7 @@ class MIoTLan: _network: MIoTNetwork _mips_service: MipsService _enable_subscribe: bool - _lan_devices: dict[str, MIoTLanDevice] + _lan_devices: dict[str, _MIoTLanDevice] _virtual_did: str _probe_msg: bytes _write_buffer: bytearray @@ -466,9 +466,9 @@ class MIoTLan: _scan_timer: Optional[asyncio.TimerHandle] _last_scan_interval: Optional[float] _msg_id_counter: int - _pending_requests: dict[int, MIoTLanRequestData] + _pending_requests: dict[int, _MIoTLanRequestData] _device_msg_matcher: MIoTMatcher - _device_state_sub_map: dict[str, MIoTLanSubDeviceData] + _device_state_sub_map: dict[str, _MIoTLanSubDeviceData] _reply_msg_buffer: dict[str, asyncio.TimerHandle] _lan_state_sub_map: dict[str, Callable[[bool], Coroutine]] @@ -702,14 +702,14 @@ def sub_device_state( ) -> bool: self._internal_loop.call_soon_threadsafe( self.__sub_device_state, - MIoTLanSubDeviceData( + _MIoTLanSubDeviceData( key=key, handler=handler, handler_ctx=handler_ctx)) return True @final def unsub_device_state(self, key: str) -> bool: self._internal_loop.call_soon_threadsafe( - self.__unsub_device_state, MIoTLanUnsubDeviceData(key=key)) + self.__unsub_device_state, _MIoTLanUnsubDeviceData(key=key)) return True @final @@ -728,7 +728,7 @@ def sub_prop( f'{"#" if siid is None or piid is None else f"{siid}/{piid}"}') self._internal_loop.call_soon_threadsafe( self.__sub_broadcast, - MIoTLanRegisterBroadcastData( + _MIoTLanRegisterBroadcastData( key=key, handler=handler, handler_ctx=handler_ctx)) return True @@ -746,7 +746,7 @@ def unsub_prop( f'{"#" if siid is None or piid is None else f"{siid}/{piid}"}') self._internal_loop.call_soon_threadsafe( self.__unsub_broadcast, - MIoTLanUnregisterBroadcastData(key=key)) + _MIoTLanUnregisterBroadcastData(key=key)) return True @final @@ -765,7 +765,7 @@ def sub_event( f'{"#" if siid is None or eiid is None else f"{siid}/{eiid}"}') self._internal_loop.call_soon_threadsafe( self.__sub_broadcast, - MIoTLanRegisterBroadcastData( + _MIoTLanRegisterBroadcastData( key=key, handler=handler, handler_ctx=handler_ctx)) return True @@ -783,7 +783,7 @@ def unsub_event( f'{"#" if siid is None or eiid is None else f"{siid}/{eiid}"}') self._internal_loop.call_soon_threadsafe( self.__unsub_broadcast, - MIoTLanUnregisterBroadcastData(key=key)) + _MIoTLanUnregisterBroadcastData(key=key)) return True @final @@ -865,7 +865,7 @@ def get_device_list_handler(msg: dict, fut: asyncio.Future): fut: asyncio.Future = self._main_loop.create_future() self._internal_loop.call_soon_threadsafe( self.__get_dev_list, - MIoTLanGetDevListData( + _MIoTLanGetDevListData( handler=get_device_list_handler, handler_ctx=fut, timeout_ms=timeout_ms)) @@ -908,7 +908,7 @@ async def __on_network_info_change_external_async( return self._internal_loop.call_soon_threadsafe( self.__on_network_info_chnage, - MIoTLanNetworkUpdateData(status=status, if_name=info.name)) + _MIoTLanNetworkUpdateData(status=status, if_name=info.name)) async def __on_mips_service_change( self, group_id: str, state: MipsServiceState, data: dict @@ -940,7 +940,7 @@ def send2device( ) -> None: if timeout_ms and not handler: raise ValueError('handler is required when timeout_ms is set') - device: MIoTLanDevice | None = self._lan_devices.get(did) + device: _MIoTLanDevice | None = self._lan_devices.get(did) if not device: raise ValueError('invalid device') if not device.cipher: @@ -975,7 +975,7 @@ def __make_request( handler_ctx: Any = None, timeout_ms: Optional[int] = None ) -> None: - def request_timeout_handler(req_data: MIoTLanRequestData): + def request_timeout_handler(req_data: _MIoTLanRequestData): self._pending_requests.pop(req_data.msg_id, None) if req_data and req_data.handler: req_data.handler({ @@ -984,7 +984,7 @@ def request_timeout_handler(req_data: MIoTLanRequestData): req_data.handler_ctx) timer: Optional[asyncio.TimerHandle] = None - request_data = MIoTLanRequestData( + request_data = _MIoTLanRequestData( msg_id=msg_id, handler=handler, handler_ctx=handler_ctx, @@ -1032,22 +1032,22 @@ def __call_api( 'error': str(err)}, handler_ctx) - def __sub_device_state(self, data: MIoTLanSubDeviceData) -> None: + def __sub_device_state(self, data: _MIoTLanSubDeviceData) -> None: self._device_state_sub_map[data.key] = data - def __unsub_device_state(self, data: MIoTLanUnsubDeviceData) -> None: + def __unsub_device_state(self, data: _MIoTLanUnsubDeviceData) -> None: self._device_state_sub_map.pop(data.key, None) - def __sub_broadcast(self, data: MIoTLanRegisterBroadcastData) -> None: + def __sub_broadcast(self, data: _MIoTLanRegisterBroadcastData) -> None: self._device_msg_matcher[data.key] = data _LOGGER.debug('lan register broadcast, %s', data.key) - def __unsub_broadcast(self, data: MIoTLanUnregisterBroadcastData) -> None: + def __unsub_broadcast(self, data: _MIoTLanUnregisterBroadcastData) -> None: if self._device_msg_matcher.get(topic=data.key): del self._device_msg_matcher[data.key] _LOGGER.debug('lan unregister broadcast, %s', data.key) - def __get_dev_list(self, data: MIoTLanGetDevListData) -> None: + def __get_dev_list(self, data: _MIoTLanGetDevListData) -> None: dev_list = { device.did: { 'online': device.online, @@ -1082,7 +1082,7 @@ def __update_devices(self, devices: dict[str, dict]) -> None: _LOGGER.error( 'invalid device token, %s, %s', did, info) continue - self._lan_devices[did] = MIoTLanDevice( + self._lan_devices[did] = _MIoTLanDevice( manager=self, did=did, token=info['token'], ip=info.get('ip', None)) else: @@ -1095,7 +1095,7 @@ def __delete_devices(self, devices: list[str]) -> None: continue lan_device.on_delete() - def __on_network_info_chnage(self, data: MIoTLanNetworkUpdateData) -> None: + def __on_network_info_chnage(self, data: _MIoTLanNetworkUpdateData) -> None: if data.status == InterfaceStatus.ADD: self._available_net_ifs.add(data.if_name) if data.if_name in self._net_ifs: @@ -1207,7 +1207,7 @@ def __raw_message_handler( return # Keep alive message did: str = str(struct.unpack('>Q', data[4:12])[0]) - device: MIoTLanDevice | None = self._lan_devices.get(did) + device: _MIoTLanDevice | None = self._lan_devices.get(did) if not device: return timestamp: int = struct.unpack('>I', data[12:16])[0] @@ -1247,7 +1247,7 @@ def __message_handler(self, did: str, msg: dict) -> None: _LOGGER.warning('invalid message, no id, %s, %s', did, msg) return # Reply - req: MIoTLanRequestData | None = \ + req: _MIoTLanRequestData | None = \ self._pending_requests.pop(msg['id'], None) if req: if req.timeout: @@ -1274,7 +1274,7 @@ def __message_handler(self, did: str, msg: dict) -> None: 'invalid message, no siid or piid, %s, %s', did, msg) continue key = f'{did}/p/{param["siid"]}/{param["piid"]}' - subs: list[MIoTLanRegisterBroadcastData] = list( + subs: list[_MIoTLanRegisterBroadcastData] = list( self._device_msg_matcher.iter_match(key)) for sub in subs: self._main_loop.call_soon_threadsafe( @@ -1285,7 +1285,7 @@ def __message_handler(self, did: str, msg: dict) -> None: and 'eiid' in msg['params'] ): key = f'{did}/e/{msg["params"]["siid"]}/{msg["params"]["eiid"]}' - subs: list[MIoTLanRegisterBroadcastData] = list( + subs: list[_MIoTLanRegisterBroadcastData] = list( self._device_msg_matcher.iter_match(key)) for sub in subs: self._main_loop.call_soon_threadsafe( From f61418175524baee344d5f46e5aed86bd9a7360b Mon Sep 17 00:00:00 2001 From: Feng Wang Date: Sun, 22 Dec 2024 20:26:54 +0800 Subject: [PATCH 12/12] set timers to None after cancel --- custom_components/xiaomi_home/miot/miot_lan.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/custom_components/xiaomi_home/miot/miot_lan.py b/custom_components/xiaomi_home/miot/miot_lan.py index 760b733..b2b4d03 100644 --- a/custom_components/xiaomi_home/miot/miot_lan.py +++ b/custom_components/xiaomi_home/miot/miot_lan.py @@ -308,8 +308,10 @@ def unsubscribe(self) -> None: def on_delete(self) -> None: if self._ka_timer: self._ka_timer.cancel() + self._ka_timer = None if self._online_offline_timer: self._online_offline_timer.cancel() + self._online_offline_timer = None _LOGGER.debug('miot lan device delete, %s', self.did) def update_info(self, info: dict) -> None: @@ -404,6 +406,7 @@ def __change_online(self, online: bool) -> None: self._online_offline_history.pop(0) if self._online_offline_timer: self._online_offline_timer.cancel() + self._online_offline_timer = None if not online: self.online = False else: @@ -1133,6 +1136,7 @@ def __deinit(self) -> None: for req_data in self._pending_requests.values(): if req_data.timeout: req_data.timeout.cancel() + req_data.timeout = None self._pending_requests.clear() for timer in self._reply_msg_buffer.values(): timer.cancel() @@ -1252,6 +1256,7 @@ def __message_handler(self, did: str, msg: dict) -> None: if req: if req.timeout: req.timeout.cancel() + req.timeout = None if req.handler is not None: self._main_loop.call_soon_threadsafe( req.handler, msg, req.handler_ctx) @@ -1326,6 +1331,7 @@ def __sendto( def __scan_devices(self) -> None: if self._scan_timer: self._scan_timer.cancel() + self._scan_timer = None try: # Scan devices self.ping(if_name=None, target_ip='255.255.255.255')