From 7e3bfabeae380b3be63169d03bf464e4fff78774 Mon Sep 17 00:00:00 2001 From: Jan Janak Date: Tue, 27 Dec 2022 19:12:58 -0500 Subject: [PATCH] Fix type declarations and potential bugs Checked the entire Python file with the basic type checking functionality built into the Python extension module of VSCode. Also checked with mypy 0.991 on Python 3.11. To re-check the file with mypy run: mypy --ignore-missing-imports --check-untyped-defs python/lora.py --- python/lora.py | 344 +++++++++++++++++++++++++++---------------------- 1 file changed, 188 insertions(+), 156 deletions(-) diff --git a/python/lora.py b/python/lora.py index 6b89dbd..1a67c6d 100755 --- a/python/lora.py +++ b/python/lora.py @@ -32,7 +32,7 @@ from __future__ import annotations import sys import re -import serial # type: ignore +import serial import binascii import select from abc import ABC @@ -53,7 +53,7 @@ import secrets from textwrap import dedent from tabulate import tabulate -from typing import Callable +from typing import Callable, Type from base64 import b64encode, b64decode @@ -106,6 +106,11 @@ def __init__(self, message): super().__init__(message, -1) +class MissingValue(ModemError): + def __init__(self, message): + super().__init__(message, -1) + + class JoinFailed(Exception): def __init__(self, message): super().__init__(message) @@ -273,11 +278,15 @@ class LoRaDataRateRU864(LoRaDataRate): FSK_50 = 7 -def region_to_data_rate(value: str | LoRaRegion) -> LoRaDataRate: +def region_to_LoRaDataRate(value: LoRaRegion | str| int) -> Type[LoRaDataRate]: + if isinstance(value, int): + value = LoRaRegion(value).name + if isinstance(value, LoRaRegion): value = value.name - else: + elif isinstance(value, str): value = value.upper() + return globals()[f'LoRaDataRate{value}'] @@ -389,7 +398,7 @@ def raise_for_error(errno): class TypeABZ: - port: serial.Serial | None + port: Optional[serial.Serial] subscriptions: Set[EventSubscription] prev_at: datetime | None @@ -406,7 +415,7 @@ def __init__(self, pathname: str, verbose: bool = False, guard: Optional[float] def __str__(self): return self.pathname - @property # type: ignore + @property @contextmanager def secret(self): try: @@ -415,7 +424,7 @@ def secret(self): finally: self.hide_value = False - @property # type: ignore + @property @contextmanager def events(self): sub = EventSubscription() @@ -457,12 +466,14 @@ def emit(self, *args, **kwargs): sub.emit(*args, **kwargs) def flush(self): + assert self.port is not None self.port.flush() - def flush_atci(self, timeout=0.1, port=None): + def flush_atci(self, timeout=0.1, port: Optional[serial.Serial]=None): # Note: this function assumes that the port has been opened with # timeout=0 port = port or self.port + assert port is not None # Write a CRLF sequence to the modem, just in case the modem has any # data in its input buffer that hasn't been processed yet, e.g., due to @@ -497,6 +508,7 @@ def open(self, speed: int): self.thread.start() def close(self): + assert self.port is not None self.port.flush() self.port.close() @@ -591,7 +603,7 @@ def receive(self, data: bytes): else: self.response.put_nowait(data) - def read_inline_response(self, timeout: float = None): + def read_inline_response(self, timeout: Optional[float] = None): try: response = self.response.get(timeout=timeout) except Empty: @@ -609,7 +621,7 @@ def read_inline_response(self, timeout: float = None): finally: self.response.task_done() - def read_multiline_response(self, timeout: float = None): + def read_multiline_response(self, timeout: Optional[float] = None): body: List[bytes] = [] while True: try: @@ -674,10 +686,10 @@ def close(self): self.SDK_AT('$LORA>ATCI=0') super().close() - def detect_baud_rate(self, speeds=[115200, 57600, 38400, 19200, 9600], timeout=0.3) -> Optional[int]: + def detect_baud_rate(self, speeds=[115200, 57600, 38400, 19200, 9600], timeout=0.3) -> Optional[int]: # type: ignore return super().detect_baud_rate(speeds=speeds, timeout=timeout, response=b'OK\r') - def read_sdk_response(self, timeout: float = None): + def read_sdk_response(self, timeout: Optional[float] = None): while True: try: line = self.sdk_response.get(timeout=timeout) @@ -700,7 +712,7 @@ def SDK_AT(self, cmd: str='', timeout: Optional[float] = 5): self.write(b'AT' + cmd.encode('ascii')) self.read_sdk_response() - def AT(self, cmd: str = '', timeout: Optional[float] = 5, wait = True, inline = True, flush = True, encoding = 'ascii'): + def AT(self, cmd: str = '', timeout: Optional[float] = 5, wait = True, inline = True, flush = True, encoding = 'ascii'): # type: ignore return super().AT(cmd, timeout=timeout, wait=wait, inline=inline, flush=flush, encoding=encoding, prefix=b'AT$LORA AT') def receive(self, data: bytes): @@ -710,21 +722,22 @@ def receive(self, data: bytes): self.sdk_response.put_nowait(data) -def parse_data_rate(region: LoRaRegion, value: str | LoRaDataRate | int) -> int: - try: - if isinstance(value, str): - value = region_to_data_rate(region)[value.upper()] # type: ignore - except KeyError: - try: - value = int(value) - except ValueError: - raise Exception(f'Unsupported data rate "{value}"') - +def parse_data_rate(region: LoRaRegion | str | int, value: str | LoRaDataRate | int) -> int: if isinstance(value, LoRaDataRate): - value = value.value + rv = value.value + elif isinstance(value, str): + try: + rv = getattr(region_to_LoRaDataRate(region), value.upper()) + except AttributeError: + try: + rv = int(value) + except ValueError: + raise Exception(f'Unsupported data rate "{value}"') + else: + rv = value - assert type(value) == int - return value + assert type(rv) == int + return rv class ATCI(ABC): @@ -776,12 +789,22 @@ def __setattr__(self, name, value): raise AttributeError(f'Unsupported setting') - @property # type: ignore + @property @contextmanager def events(self): yield self.modem.events +def assert_response(s: Optional[str]) -> str: + if s is None: + raise MissingValue('No response from modem') + + if len(s) == 0: + raise MissingValue('Got unexpected empty response from modem') + + return s + + class MurataModem(ATCI): def reset(self): '''Initialize the LoRa modem into a known state. @@ -813,7 +836,7 @@ def uart(self): The default configuration after factory reset is 1900 8N1. ''' - reply = self.modem.AT('+UART?').split(',') + reply = assert_response(self.modem.AT('+UART?')).split(',') if len(reply) != 5: raise Exception('Unexpected reply to AT+UART') return UARTConfig(int(reply[0]), int(reply[1]), int(reply[2]), int(reply[3]), reply[4] == 1) @@ -844,7 +867,7 @@ def ver(self): This property returns a tuple of two strings, where the first string is the version and the second string is the build date of Murata Modem. ''' - return self.modem.AT('+VER?').split(',') + return assert_response(self.modem.AT('+VER?')).split(',') @property def dev(self): @@ -888,7 +911,7 @@ def band(self): and 915 MHz RF bands. Thus, only regions that use the two RF bands can be used. ''' - value = int(self.modem.AT('+BAND?')) + value: LoRaRegion | int = int(assert_response(self.modem.AT('+BAND?'))) try: value = LoRaRegion(value) except ValueError: @@ -916,17 +939,26 @@ def band(self, value: str | LoRaRegion | int): leaves security keys, the DevNonce value, and system parameters such as the UART port baud rate unmodified. ''' - if type(value) == str: + + # The getter band returns a LoRaRegion if it can map the number received + # from the modem to a known region, otherwise it returns the integer + # received from the modem unmodified. + old = self.band + if isinstance(old, LoRaRegion): + old = old.value + assert isinstance(old, int) + + if isinstance(value, str): value = LoRaRegion[value.upper()] if isinstance(value, LoRaRegion): value = value.value + assert isinstance(value, int) - wait = self.band.value != value with self.modem.lock: with self.modem.events as events: self.modem.AT(f'+BAND={value}') - if wait: + if old != value: events.wait_for('event=0,0') region = band @@ -944,7 +976,7 @@ def CLASS(self): is active permanently, allowing the device to receive network downlinks at any time. ''' - value = int(self.modem.AT('+CLASS?')) + value: LoRaClass | int = int(assert_response(self.modem.AT('+CLASS?'))) try: value = LoRaClass(value) except ValueError: @@ -967,7 +999,7 @@ def CLASS(self, value: str | LoRaClass | int): LoRaWAN class of the device can only be switched while the device is idle, i.e., when there is no uplink or downlink in progress. ''' - if type(value) == str: + if isinstance(value, str): value = LoRaClass[value.upper()] if isinstance(value, LoRaClass): @@ -983,7 +1015,7 @@ def mode(self): (OTAA) mode, or in the activation by provisioning (APB) mode. The default mode upon factory reset is ABP. ''' - value = int(self.modem.AT('+MODE?')) + value: LoRaMode | int = int(assert_response(self.modem.AT('+MODE?'))) try: value = LoRaMode(value) except ValueError: @@ -998,7 +1030,7 @@ def mode(self, value: str | LoRaMode | int): mode. The value 0 switches the modem into the activation by provisioning (ABP) mode. The default mode upon factory reset is ABP. ''' - if type(value) == str: + if isinstance(value, str): value = LoRaMode[value.upper()] if isinstance(value, LoRaMode): @@ -1007,7 +1039,7 @@ def mode(self, value: str | LoRaMode | int): self.modem.AT(f'+MODE={value}') @property - def devaddr(self) -> str: + def devaddr(self): '''Return the LoRaWAN device address (DevAddr) assigned to the modem. The DevAddr is a 32-bit number that consists of two parts: an address @@ -1024,7 +1056,7 @@ def devaddr(self) -> str: Upon factory reset, and whenever the DevAddr consists of all zeroes, the modem generates a random DevAddr. ''' - return self.modem.AT('+DEVADDR?') + return assert_response(self.modem.AT('+DEVADDR?')) @devaddr.setter def devaddr(self, value: str): @@ -1063,14 +1095,14 @@ def devaddr(self, value: str): dev_addr = devaddr @property - def deveui(self) -> str: + def deveui(self): '''Return LoRaWAN device EUI (DevEUI). The DevEUI is a 64-bit globally unique extended identifier assigned to the device by the manufacturer. The DevEUI is encoded in a hexadecimal format. ''' - return self.modem.AT('+DEVEUI?') + return assert_response(self.modem.AT('+DEVEUI?')) @deveui.setter def deveui(self, value: str): @@ -1087,7 +1119,7 @@ def deveui(self, value: str): dev_eui = deveui @property - def appeui(self) -> str: + def appeui(self): '''Return the LoRaWAN AppEUI (JoinEUI). In recent LoRaWAN specifications, the parameter was renamed to JoinEUI. @@ -1099,7 +1131,7 @@ def appeui(self) -> str: The default value is 0101010101010101. ''' - return self.modem.AT('+APPEUI?') + return assert_response(self.modem.AT('+APPEUI?')) @appeui.setter def appeui(self, value: str): @@ -1119,7 +1151,7 @@ def appeui(self, value: str): app_eui = appeui @property - def nwkskey(self) -> str: + def nwkskey(self): '''Return LoRaWAN 1.0 network session key (NwkSKey). The network session key is a 128-bit symmetric key that secures @@ -1132,7 +1164,7 @@ def nwkskey(self) -> str: factory reset is 2B7E151628AED2A6ABF7158809CF4F3C. ''' with self.modem.secret: - return self.modem.AT('+NWKSKEY?') + return assert_response(self.modem.AT('+NWKSKEY?')) @nwkskey.setter def nwkskey(self, value: str): @@ -1159,7 +1191,7 @@ def nwkskey(self, value: str): nwk_s_key = nwkskey @property - def appskey(self) -> str: + def appskey(self): '''Return LoRaWAN application session key (AppSKey). The application session key is a 128-bit symmetric key that secures @@ -1172,7 +1204,7 @@ def appskey(self) -> str: factory reset is 2B7E151628AED2A6ABF7158809CF4F3C. ''' with self.modem.secret: - return self.modem.AT('+APPSKEY?') + return assert_response(self.modem.AT('+APPSKEY?')) @appskey.setter def appskey(self, value: str): @@ -1199,7 +1231,7 @@ def appskey(self, value: str): app_s_key = appskey @property - def appkey(self) -> str: + def appkey(self): '''Return LoRaWAN 1.0 root application key (AppKey). The AppKey is a 128-bit symmetric key that is used to derive an @@ -1209,7 +1241,7 @@ def appkey(self) -> str: factory reset is 2B7E151628AED2A6ABF7158809CF4F3C. ''' with self.modem.secret: - return self.modem.AT('+APPKEY?') + return assert_response(self.modem.AT('+APPKEY?')) @appkey.setter def appkey(self, value: str): @@ -1302,7 +1334,7 @@ def joindc(self): independent of AT+DUTYCYCLE. Disabling one does not disable the other and vice versa. ''' - return int(self.modem.AT('+JOINDC?')) == 1 + return int(assert_response(self.modem.AT('+JOINDC?'))) == 1 @joindc.setter def joindc(self, value: bool | int): @@ -1415,7 +1447,7 @@ def rfparam(self): This property returns a tuple of all currently active RF channels. Each channel is represented by a RFConfig object. ''' - data = tuple(self.modem.AT('+RFPARAM?').split(';')) + data = tuple(assert_response(self.modem.AT('+RFPARAM?')).split(';')) if int(data[0]) != len(data) - 1: raise Exception('Could not parse RFPARAM response') @@ -1475,7 +1507,7 @@ def rfpower(self): configuration translates to the default transmit power of 14 dBm (25 mW). ''' - return tuple(map(int, self.modem.AT('+RFPOWER?').split(','))) + return tuple(map(int, assert_response(self.modem.AT('+RFPOWER?')).split(','))) @rfpower.setter def rfpower(self, value: Tuple[int, int]): @@ -1526,7 +1558,7 @@ def nwk(self): The default value upon factor reset is 1. ''' - value = int(self.modem.AT('+NWK?')) + value: LoRaNetwork | int = int(assert_response(self.modem.AT('+NWK?'))) try: value = LoRaNetwork(value) except ValueError: @@ -1554,7 +1586,7 @@ def nwk(self, value: str | LoRaNetwork | bool | int): LoRaWAN-compatible network, you should use 1 here. This is also the default value. ''' - if type(value) == str: + if isinstance(value, str): value = LoRaNetwork[value.upper()] if isinstance(value, LoRaNetwork): @@ -1613,9 +1645,9 @@ def dr(self): The default value upon factory reset is 0. ''' - value = int(self.modem.AT('+DR?')) + value: LoRaDataRate | int = int(assert_response(self.modem.AT('+DR?'))) try: - value = region_to_data_rate(self.region)(value) + value = region_to_LoRaDataRate(self.band)(value) except ValueError: pass return value @@ -1634,7 +1666,7 @@ def dr(self, value: str | LoRaDataRate | int): The default value upon factory reset is 0. ''' - value = parse_data_rate(self.region, value) + value = parse_data_rate(self.region, value) # type: ignore self.modem.AT(f'+DR={value}') data_rate = dr @@ -1655,7 +1687,7 @@ def delay(self): The default value upon factory reset is (5000,6000,1000,2000). ''' - return Delay(*map(int, self.modem.AT('+DELAY?').split(','))) + return Delay(*map(int, assert_response(self.modem.AT('+DELAY?')).split(','))) @delay.setter def delay(self, value: str | Delay | tuple): @@ -1680,7 +1712,7 @@ def delay(self, value: str | Delay | tuple): The default value upon factory reset is (5000,6000,1000,2000). ''' - if type(value) == str: + if isinstance(value, str): value = tuple(map(lambda s: int(s.strip()), value.split(','))) if len(value) != 4: @@ -1717,7 +1749,7 @@ def adrack(self): Note: The network server can override the values configured with this AT command through ADR. ''' - return tuple(map(int, self.modem.AT('+ADRACK?').split(','))) + return tuple(map(int, assert_response(self.modem.AT('+ADRACK?')).split(','))) @adrack.setter def adrack(self, value: Tuple[int, int]): @@ -1766,9 +1798,10 @@ def rx2(self): The default value upon factory reset is (869525000,0) (the default region is EU868). ''' - freq, dr = map(int, self.modem.AT('+RX2?').split(',')) + dr: LoRaDataRate | int + freq, dr = map(int, assert_response(self.modem.AT('+RX2?')).split(',')) try: - dr = region_to_data_rate(self.region)(dr) + dr = region_to_LoRaDataRate(self.band)(dr) except ValueError: pass return (freq, dr) @@ -1789,7 +1822,7 @@ def rx2(self, value: Tuple[int, str | LoRaDataRate | int]): TODO: Add support for str datarate_index type ''' - dr = parse_data_rate(self.region, value[1]) + dr = parse_data_rate(self.region, value[1]) # type: ignore self.modem.AT(f'+RX2={int(value[0])},{dr}') @property @@ -1800,7 +1833,7 @@ def dutycycle(self): regions that enforce duty cycling and False in regions that do not use duty cycling. ''' - return int(self.modem.AT('+DUTYCYCLE?')) == 1 + return int(assert_response(self.modem.AT('+DUTYCYCLE?'))) == 1 @dutycycle.setter def dutycycle(self, value: bool | int): @@ -1835,7 +1868,7 @@ def sleep(self): of the device. This property is intended for development and debugging purposes only. During normal operation it should always be True. ''' - return int(self.modem.AT('+SLEEP?')) == 1 + return int(assert_response(self.modem.AT('+SLEEP?'))) == 1 @sleep.setter def sleep(self, value: bool | int): @@ -1865,7 +1898,7 @@ def port(self): The default port number is 2. ''' - return int(self.modem.AT('+PORT?')) + return int(assert_response(self.modem.AT('+PORT?'))) @port.setter def port(self, value: int): @@ -1893,7 +1926,7 @@ def rep(self): The default value is 1. ''' - return int(self.modem.AT('+REP?')) + return int(assert_response(self.modem.AT('+REP?'))) @rep.setter def rep(self, value: int): @@ -1931,7 +1964,7 @@ def dformat(self): The default value is 0 (binary format). ''' - value = int(self.modem.AT('+DFORMAT?')) + value: DataFormat | int = int(assert_response(self.modem.AT('+DFORMAT?'))) try: value = DataFormat(value) except ValueError: @@ -1957,7 +1990,7 @@ def dformat(self, value: str | DataFormat | int): The default value is 0 (binary format). ''' - if type(value) == str: + if isinstance(value, str): value = DataFormat[value.upper()] if isinstance(value, DataFormat): @@ -1987,7 +2020,7 @@ def to(self): The value must be in the range <1, 65535> (milliseconds). The default value is 1000 milliseconds. ''' - return int(self.modem.AT('+TO?')) + return int(assert_response(self.modem.AT('+TO?'))) @to.setter def to(self, value: int): @@ -2011,7 +2044,7 @@ def to(self, value: int): ''' self.modem.AT(f'+TO={value}') - def utx(self, data: bytes, timeout: float = None, hex = False): + def utx(self, data: bytes, timeout: Optional[float] = None, hex = False): '''Send unconfirmed uplink message to the LoRaWAN network. The uplink will be transmitted up to `rep` times. The modem notifies the @@ -2040,7 +2073,7 @@ def utx(self, data: bytes, timeout: float = None, hex = False): ''' self.tx(data, confirmed=False, timeout=timeout, hex=hex) - def ctx(self, data: bytes, timeout: float = None, hex = False) -> bool: + def ctx(self, data: bytes, timeout: Optional[float] = None, hex = False) -> bool: '''Send confirmed uplink message to the LoRaWAN network. The uplink will be transmitted up to `rtynum` times. The modem notifies @@ -2072,7 +2105,7 @@ def ctx(self, data: bytes, timeout: float = None, hex = False) -> bool: assert rv is not None return rv - def tx(self, data: bytes, confirmed = False, timeout: float = None, hex = False) -> Optional[bool]: + def tx(self, data: bytes, confirmed = False, timeout: Optional[float] = None, hex = False) -> Optional[bool]: assert self.modem.port is not None type = 'C' if confirmed else 'U' with self.modem.lock: @@ -2099,7 +2132,7 @@ def mcast(self): The Murata Modem firmware supports up to 8 address. The open firmware supports up to 4 multicast addresses. ''' - data = tuple(self.modem.AT('+MCAST?').split(';')) + data = tuple(assert_response(self.modem.AT('+MCAST?')).split(';')) if int(data[0]) != len(data) - 1: raise Exception('Could not parse MCAST response') @@ -2125,7 +2158,7 @@ def mcast(self, value: McastAddr | int): multicast = mcast - def putx(self, port: int, data: bytes, timeout: float = None, hex = False): + def putx(self, port: int, data: bytes, timeout: Optional[float] = None, hex = False): '''Send unconfirmed uplink message to the LoRaWAN network. The uplink will be transmitted up to `rep` times. The modem notifies the @@ -2150,7 +2183,7 @@ def putx(self, port: int, data: bytes, timeout: float = None, hex = False): ''' self.ptx(port, data, confirmed=False, timeout=timeout, hex=hex) - def pctx(self, port: int, data: bytes, timeout: float = None, hex = False) -> bool: + def pctx(self, port: int, data: bytes, timeout: Optional[float] = None, hex = False) -> bool: '''Send confirmed uplink message to the LoRaWAN network. The uplink will be transmitted up to `rtynum` times. The modem notifies @@ -2178,7 +2211,7 @@ def pctx(self, port: int, data: bytes, timeout: float = None, hex = False) -> bo assert rv is not None return rv - def ptx(self, port: int, data: bytes, confirmed = False, timeout: float = None, hex = False) -> Optional[bool]: + def ptx(self, port: int, data: bytes, confirmed = False, timeout: Optional[float] = None, hex = False) -> Optional[bool]: assert self.modem.port is not None type = 'C' if confirmed else 'U' with self.modem.lock: @@ -2211,7 +2244,7 @@ def frmcnt(self): non-zero port number, i.e., downlink messages carrying application payload. ''' - return tuple(map(int, self.modem.AT('+FRMCNT?').split(','))) + return tuple(map(int, assert_response(self.modem.AT('+FRMCNT?')).split(','))) frame_counters = frmcnt @@ -2229,7 +2262,7 @@ def msize(self): uplink (with only the MAC commands) internally. Thus, if you try to resend your payload, the second attempt should succeed. ''' - return int(self.modem.AT('+MSIZE?')) + return int(assert_response(self.modem.AT('+MSIZE?'))) message_size = msize @@ -2249,7 +2282,7 @@ def rfq(self): This can be any packet: application downlink, JoinAccept, ADR request, device status request, or confirmed uplink acknowledgement. ''' - return tuple(map(int, self.modem.AT('+RFQ?').split(','))) + return tuple(map(int, assert_response(self.modem.AT('+RFQ?')).split(','))) @property def dwell(self): @@ -2264,7 +2297,7 @@ def dwell(self): The default value is region-specific. ''' - return tuple(map(lambda v: v == 1, map(int, self.modem.AT('+DWELL?').split(',')))) + return tuple(map(lambda v: v == 1, map(int, assert_response(self.modem.AT('+DWELL?')).split(',')))) @dwell.setter def dwell(self, value: Tuple[int | bool, int | bool]): @@ -2317,7 +2350,7 @@ def maxeirp(self): power of the LoRa transmitter, i.e., there is no guarantee that the transmitter will be capable of transmitting with MaxEIRP. ''' - return int(self.modem.AT('+MAXEIRP?')) + return int(assert_response(self.modem.AT('+MAXEIRP?'))) @maxeirp.setter def maxeirp(self, value: int): @@ -2367,7 +2400,7 @@ def rssith(self): plan does not (this is the case for most AS923 channel plans), the property returns 0. ''' - return int(self.modem.AT('+RSSITH?')) + return int(assert_response(self.modem.AT('+RSSITH?'))) @rssith.setter def rssith(self, value: int): @@ -2402,7 +2435,7 @@ def cst(self): plan does not (this is the case for most AS923 channel plans), the property returns 0. ''' - return int(self.modem.AT('+CST?')) + return int(assert_response(self.modem.AT('+CST?'))) @cst.setter def cst(self, value: int): @@ -2434,10 +2467,10 @@ def backoff(self): If the property returns a non-zero value, the end-device cannot transmit. ''' - return int(self.modem.AT('+BACKOFF?')) + return int(assert_response(self.modem.AT('+BACKOFF?'))) @property - def chmask(self) -> str: + def chmask(self): '''Return LoRaWAN channel mask. The LoRaWAN channel mask determines the set of channels the modem is @@ -2457,7 +2490,7 @@ def chmask(self) -> str: The length and default value of the channel mask depends on the active region. ''' - return self.modem.AT('+CHMASK?') + return assert_response(self.modem.AT('+CHMASK?')) @chmask.setter def chmask(self, value: str): @@ -2498,7 +2531,7 @@ def rtynum(self): The default value upon factory reset is 8. ''' - return int(self.modem.AT('+RTYNUM?')) + return int(assert_response(self.modem.AT('+RTYNUM?'))) @rtynum.setter def rtynum(self, value: int): @@ -2520,7 +2553,7 @@ def rtynum(self, value: int): rty_num = rtynum @property - def netid(self) -> str: + def netid(self): '''Return LoRaWAN network ID (NetID). The NetID is a 24-bit integer. The value is encoded as a 32-bit integer @@ -2541,7 +2574,7 @@ def netid(self) -> str: forwarding network knows where to forward the uplink based on the device's DevAddr and NetID is no longer used. ''' - return self.modem.AT('+NETID?') + return assert_response(self.modem.AT('+NETID?')) @netid.setter def netid(self, val: str): @@ -2637,7 +2670,7 @@ def version(self): ver = self.ver rv = { 'compatibility_version': ver[0], 'build_date': ver[1] } try: - ver = self.modem.AT('$VER?').split(',') + ver = assert_response(self.modem.AT('$VER?')).split(',') if len(ver) != 9: raise Exception('Unexpected response to AT$VER') rv['firmware_version'] = ver[0] @@ -2698,7 +2731,7 @@ def facnew(self, reset_devnonce=False, reset_deveui=False): factory_reset = facnew @property - def joineui(self) -> str: + def joineui(self): '''Return LoRaWAN JoinEUI. The Join EUI is a 64-bit globally unique identifier that identifiers the @@ -2708,7 +2741,7 @@ def joineui(self) -> str: The default value is 0101010101010101. ''' - return self.modem.AT('$JOINEUI?') + return assert_response(self.modem.AT('$JOINEUI?')) @joineui.setter def joineui(self, value: str): @@ -2761,7 +2794,7 @@ def appkey_10(self, value: str): app_key_10 = appkey_10 @property - def appkey(self) -> str: + def appkey(self): '''Return LoRaWAN root application key (AppKey). The AppKey is a 128-bit symmetric key that is used to derive an @@ -2771,7 +2804,7 @@ def appkey(self) -> str: factory reset is 2B7E151628AED2A6ABF7158809CF4F3C. ''' with self.modem.secret: - return self.modem.AT('$APPKEY?') + return assert_response(self.modem.AT('$APPKEY?')) @appkey.setter def appkey(self, value: str): @@ -2793,7 +2826,7 @@ def appkey(self, value: str): app_key = appkey # type: ignore - def join(self, timeout = 120, data_rate: Optional[int | LoRaDataRate] = None, max_transmissions: int = None): + def join(self, timeout = 120, data_rate: Optional[int | LoRaDataRate] = None, max_transmissions: Optional[int] = None): '''Join LoRaWAN network in over-the-air-activation (OTAA) mode. If the modem is in OTAA mode, this command sends a Join LoRaWAN request @@ -2863,7 +2896,7 @@ def join(self, timeout = 120, data_rate: Optional[int | LoRaDataRate] = None, ma @property def rfpower(self): - return tuple(map(int, self.modem.AT('$RFPOWER?').split(','))) + return tuple(map(int, assert_response(self.modem.AT('$RFPOWER?')).split(','))) @rfpower.setter def rfpower(self, value: Tuple[int, ...]): @@ -2881,7 +2914,7 @@ def rfpower(self, value: Tuple[int, ...]): @property def dr(self): - enum = region_to_data_rate(self.region) + enum = region_to_LoRaDataRate(self.band) def create(value): value = int(value) @@ -2891,18 +2924,18 @@ def create(value): pass return value - return tuple(map(create, self.modem.AT('$DR?').split(','))) + return tuple(map(create, assert_response(self.modem.AT('$DR?')).split(','))) @dr.setter def dr(self, value: (str | tuple | LoRaDataRate | int) | Tuple[str | LoRaDataRate | int, str | LoRaDataRate | int]): - if type(value) == str: + if isinstance(value, str): value = tuple(map(lambda s: s.strip(), value.split(','))) if not isinstance(value, tuple): value = (value,) if len(value) == 2: - region = self.region + region = self.band self.modem.AT(f'+DR={parse_data_rate(region, value[0])},{parse_data_rate(region, value[1])}') elif len(value) == 1: # super().dr = value @@ -2914,42 +2947,42 @@ def dr(self, value: (str | tuple | LoRaDataRate | int) | Tuple[str | LoRaDataRat @property def rx2(self): - value = tuple(map(int, self.modem.AT('$RX2?').split(','))) + value = tuple(map(int, assert_response(self.modem.AT('$RX2?')).split(','))) - enum = region_to_data_rate(self.region) + enum = region_to_LoRaDataRate(self.band) try: - dr1 = enum(value[1]) + dr1: LoRaDataRate | int = enum(value[1]) except ValueError: - pass + dr1 = value[1] try: - dr2 = enum(value[3]) + dr2: LoRaDataRate | int = enum(value[3]) except ValueError: - pass + dr2 = value[3] return (value[0], dr1, value[2], dr2) @rx2.setter def rx2(self, value: str | Tuple[str, ...] | Tuple[int, str | LoRaDataRate | int] | Tuple[int, str | LoRaDataRate | int, int, str | LoRaDataRate | int]): - if type(value) == str: + if isinstance(value, str): value = tuple(map(lambda s: s.strip(), value.split(','))) if len(value) == 2: # super().rx2 = value super(self.__class__, self.__class__).rx2.fset(self, value) # type: ignore elif len(value) == 4: - region = self.region + region = self.band self.modem.AT(f'$RX2={int(value[0])},{parse_data_rate(region, value[1])},{int(value[2])},{parse_data_rate(region, value[3])}') # type: ignore else: raise Exception('Invalid rx2 setting value') @property def chmask(self): - return tuple(self.modem.AT('$CHMASK?').split(',')) + return tuple(assert_response(self.modem.AT('$CHMASK?')).split(',')) @chmask.setter def chmask(self, value: str | Tuple[str, ...]): - if type(value) == str: + if isinstance(value, str): value = tuple(map(lambda s: s.strip(), value.split(','))) if len(value) == 1: @@ -2968,7 +3001,7 @@ def loglevel(self): Levels: 0 - disabled, 1 - error, 2 - warning, 3 - debug, 4 - all ''' - value = int(self.modem.AT('$LOGLEVEL?')) + value: LogLevel | int = int(assert_response(self.modem.AT('$LOGLEVEL?'))) try: value = LogLevel(value) except ValueError: @@ -2981,7 +3014,7 @@ def loglevel(self, value: str | LogLevel | int): Levels: 0 - disabled, 1 - error, 2 - warning, 3 - debug, 4 - all ''' - if type(value) == str: + if isinstance(value, str): value = LogLevel[value.upper()] if isinstance(value, LogLevel): @@ -3032,7 +3065,7 @@ def cert(self): The certification port is disabled by default. ''' - return int(self.modem.AT('$CERT?')) == 1 + return int(assert_response(self.modem.AT('$CERT?'))) == 1 @cert.setter def cert(self, value: bool | int): @@ -3053,7 +3086,7 @@ def cert(self, value: bool | int): certification_port = cert @property - def nwkkey(self) -> str: + def nwkkey(self): '''Return LoRaWAN 1.1 root network key (NwkKey). The NwkKey is a 128-bit symmetric key that is used to derive a network @@ -3072,7 +3105,7 @@ def nwkkey(self) -> str: factory reset is 2B7E151628AED2A6ABF7158809CF4F3C. ''' with self.modem.secret: - return self.modem.AT('$NWKKEY?') + return assert_response(self.modem.AT('$NWKKEY?')) @nwkkey.setter def nwkkey(self, value: str): @@ -3099,7 +3132,7 @@ def nwkkey(self, value: str): nwk_key = nwkkey @property - def fnwksintkey(self) -> str: + def fnwksintkey(self): '''Return LoRaWAN 1.1 forwarding network session integrity key (FNwkSIntKey). In LoRaWAN 1.1, the key used to check the integrity of LoRaWAN messages @@ -3125,7 +3158,7 @@ def fnwksintkey(self) -> str: factory reset is 2B7E151628AED2A6ABF7158809CF4F3C. ''' with self.modem.secret: - return self.modem.AT('$FNWKSINTKEY?') + return assert_response(self.modem.AT('$FNWKSINTKEY?')) @fnwksintkey.setter def fnwksintkey(self, value: str): @@ -3159,7 +3192,7 @@ def fnwksintkey(self, value: str): f_nwk_s_int_key = fnwksintkey @property - def snwksintkey(self) -> str: + def snwksintkey(self): '''Return LoRaWAN 1.1 serving network session integrity key (SNwkSIntKey). In LoRaWAN 1.1, the key used to check the integrity of LoRaWAN messages @@ -3186,7 +3219,7 @@ def snwksintkey(self) -> str: factory reset is 2B7E151628AED2A6ABF7158809CF4F3C. ''' with self.modem.secret: - return self.modem.AT('$SNWKSINTKEY?') + return assert_response(self.modem.AT('$SNWKSINTKEY?')) @snwksintkey.setter def snwksintkey(self, value: str): @@ -3221,7 +3254,7 @@ def snwksintkey(self, value: str): s_nwk_s_int_key = snwksintkey @property - def nwksenckey(self) -> str: + def nwksenckey(self): '''Return LoRaWAN 1.1 network session encryption key (NwkSEncKey). In LoRaWAN 1.1, the network session encryption key (NwkSEncKey) is used @@ -3243,7 +3276,7 @@ def nwksenckey(self) -> str: factory reset is 2B7E151628AED2A6ABF7158809CF4F3C. ''' with self.modem.secret: - return self.modem.AT('$NWKSENCKEY?') + return assert_response(self.modem.AT('$NWKSENCKEY?')) @nwksenckey.setter def nwksenckey(self, value: str): @@ -3288,7 +3321,7 @@ def session(self): `dev_addr` is the address of the device (DevAddr). ''' rv = {} - data = self.modem.AT('$SESSION?').split(',') + data = assert_response(self.modem.AT('$SESSION?')).split(',') if len(data) < 2: raise Exception('Unexpected response to AT$SESSION') @@ -3422,7 +3455,7 @@ def get_modem(): sys.exit(1) if twr_sdk: - dev = TowerSDK(port, verbose=verbose, guard=guard) + dev: TowerSDK | TypeABZ = TowerSDK(port, verbose=verbose, guard=guard) else: dev = TypeABZ(port, verbose=verbose, guard=guard) @@ -3636,6 +3669,7 @@ def state(get_modem: Callable[[], OpenLoRaModem]): click.echo(f"Current state of modem {modem}:") region = modem.band + rssi, snr = modem.rfq uplink, downlink = modem.frmcnt delay = modem.delay @@ -3643,8 +3677,8 @@ def state(get_modem: Callable[[], OpenLoRaModem]): adr_ack_limit, adr_ack_delay = modem.adrack data = [ - ['Current region', region.name], - ['LoRaWAN class', modem.CLASS.name], + ['Current region', region], + ['LoRaWAN class', modem.CLASS], ['Channel mask', modem.chmask[0]], ['Data rate', modem.dr[0]], ['Maximum message size', f'{modem.message_size} B'], @@ -3661,7 +3695,7 @@ def state(get_modem: Callable[[], OpenLoRaModem]): ['Last downlink SNR', f'{snr} dB'], ['RX1 window', f'Delay: {delay.rx_window_1} ms'], ['RX2 window', f'Delay: {delay.rx_window_2} ms, Frequency: {rx2[0] / 1000000} MHz, Data rate: {rx2[1]}'], - ['Join response windows', f'RX1: {delay.join_accept_1} ms, RX2: {delay.join_accept_2} ms'] + ['Join response windows', f'RX1: {delay.join_accept_1} ms, RX2: {delay.join_accept_2} ms'], ['Confirmed uplink transmissions', modem.rtynum], ['Unconfirmed uplink transmissions', modem.rep]] @@ -4029,7 +4063,7 @@ def join(get_modem: Callable[[], OpenLoRaModem], region, data_rate, network, joi except ValueError: click.echo('Error: Invalid region', err=True) sys.exit(1) - modem.region = region + modem.region = region # type: ignore if network is not None: modem.nwk = 1 if network else 0 @@ -4054,7 +4088,7 @@ def join(get_modem: Callable[[], OpenLoRaModem], region, data_rate, network, joi kwargs = {} if data_rate is not None: - kwargs['data_rate'] = parse_data_rate(modem.region, data_rate) + kwargs['data_rate'] = parse_data_rate(modem.band, data_rate) if timeout is not None: kwargs['timeout'] = timeout @@ -4233,7 +4267,7 @@ def get(get_modem: Callable[[], OpenLoRaModem], names, all, long, names_only): if not all or e.errno != -17: raise e else: - if isinstance(value, tuple or list): + if isinstance(value, tuple) or isinstance(value, list): value = ','.join(map(str, value)) if long: click.echo(f'{orig_name}={value}') @@ -4474,6 +4508,7 @@ def keygen(get_modem: Callable[[], OpenLoRaModem], protocol, silent, old): ''' modem = get_modem() + old_keys = None if protocol == '1.1': if old: old_keys = [ @@ -4557,7 +4592,7 @@ def show_mcast_addresses(get_modem: Callable[[], OpenLoRaModem]): headers.append('Application session key') data = [] - for entry in modem.multicast: + for entry in modem.mcast: line = [f'{entry.addr}'] if show_keys: line.append(f'{entry.nwkskey}') @@ -4589,12 +4624,12 @@ def add_mcast_address(get_modem: Callable[[], OpenLoRaModem], address, nwkskey, Please note that the modem can store up to four multicast addresses. ''' modem = get_modem() - active_addresses = modem.multicast + active_addresses = modem.mcast existing = list(filter(lambda v: v.addr == address, active_addresses)) if len(existing): if replace: - modem.multicast = McastAddr(existing[0].id, address, nwkskey, appskey) + modem.mcast = McastAddr(existing[0].id, address, nwkskey, appskey) else: click.echo(f'Multicast address {address} already exists', err=True) sys.exit(1) @@ -4607,7 +4642,7 @@ def add_mcast_address(get_modem: Callable[[], OpenLoRaModem], address, nwkskey, for id in range(0, 4): if id in existing_ids: continue - modem.multicast = McastAddr(id, address, nwkskey, appskey) + modem.mcast = McastAddr(id, address, nwkskey, appskey) break else: click.echo(f'Could not find a free multicast address slot', err=True) @@ -4627,7 +4662,7 @@ def remove_mcast_addresses(get_modem: Callable[[], OpenLoRaModem], all, addresse (-a) to remove all currently active multicast addresses. ''' modem = get_modem() - active_addresses = modem.multicast + active_addresses = modem.mcast if all: if len(addresses): @@ -4651,7 +4686,7 @@ def remove_mcast_addresses(get_modem: Callable[[], OpenLoRaModem], all, addresse click.echo(f'Ambiguous multicast address {addr}', err=True) sys.exit(1) - modem.multicast = data[0].id + modem.mcast = data[0].id @multicast.command('set') @@ -4668,7 +4703,7 @@ def update_mcast_address(get_modem: Callable[[], OpenLoRaModem], address, nwkske line options, the original key will be preserved. ''' modem = get_modem() - data = list(filter(lambda v: v.addr == address, modem.multicast)) + data = list(filter(lambda v: v.addr == address, modem.mcast)) if len(data) == 0: click.echo(f'Multicast address {address} not found', err=True) sys.exit(1) @@ -4679,7 +4714,7 @@ def update_mcast_address(get_modem: Callable[[], OpenLoRaModem], address, nwkske if nwkskey is None: nwkskey = data[0].nwkskey if appskey is None: appskey = data[0].appskey - modem.multicast = McastAddr(data[0].id, address, nwkskey, appskey) + modem.mcast = McastAddr(data[0].id, address, nwkskey, appskey) @cli.group(invoke_without_command=True) @@ -4705,22 +4740,19 @@ def show_channels(get_modem: Callable[[], OpenLoRaModem]): headers = ['Channel', 'Center frequency', 'Minimum data rate', 'Maximum data rate'] - channels = sorted(modem.rf_param, key=lambda v: v.id) - region = modem.region + channels = sorted(modem.rfparam, key=lambda v: v.id) + region = modem.band data = [] for ch in channels: - line = [f'{ch.id}', f'{ch.frequency / 1000000} MHz', region_to_data_rate(region)(ch.min_dr).name, region_to_data_rate(region)(ch.max_dr).name] + line = [f'{ch.id}', f'{ch.frequency / 1000000} MHz', region_to_LoRaDataRate(region)(ch.min_dr).name, region_to_LoRaDataRate(region)(ch.max_dr).name] data.append(line) render(data, headers=headers) def parse_frequency(frequency: str): - try: - f = int(frequency) - except ValueError: - f = float(frequency) + f = float(frequency) if f < 1000: f *= 1000000 @@ -4743,7 +4775,7 @@ def add_channel(get_modem: Callable[[], OpenLoRaModem], channel, frequency, min_ rate, and maximum data rate. If the frequency value is an integer, it is assumed to be in Hz. If the - frequency value is a floating point number, it is assumed to be in MHz. + frequency value is a floating point number, it is assumed to be in MHz. The minimum and maximum data rates can be either strings (e.g., sf12_125) or integers. @@ -4763,8 +4795,8 @@ def add_channel(get_modem: Callable[[], OpenLoRaModem], channel, frequency, min_ frequency = parse_frequency(frequency) modem = get_modem() - channels = modem.rf_param - region = modem.region + channels = modem.rfparam + region = modem.band min_dr = parse_data_rate(region, min_dr) max_dr = parse_data_rate(region, max_dr) @@ -4772,12 +4804,12 @@ def add_channel(get_modem: Callable[[], OpenLoRaModem], channel, frequency, min_ existing = list(filter(lambda v: v.id == channel, channels)) if len(existing): if replace: - modem.rf_param = RFConfig(existing[0].id, frequency, min_dr, max_dr) + modem.rfparam = RFConfig(existing[0].id, frequency, min_dr, max_dr) else: click.echo(f'Channel {channel} already exists', err=True) sys.exit(1) else: - modem.rf_param = RFConfig(channel, frequency, min_dr, max_dr) + modem.rfparam = RFConfig(channel, frequency, min_dr, max_dr) @channels.command('remove') @@ -4797,7 +4829,7 @@ def remove_channels(get_modem: Callable[[], OpenLoRaModem], channels, ignore_mis removed. ''' modem = get_modem() - active_channels = modem.rf_param + active_channels = modem.rfparam if len(channels) == 0: click.echo("Please provide at least one RF channel number", err=True) @@ -4810,7 +4842,7 @@ def remove_channels(get_modem: Callable[[], OpenLoRaModem], channels, ignore_mis click.echo(f'RF channel {ch} not found', err=True) sys.exit(1) else: - modem.rf_param = ch + modem.rfparam = ch @channels.command('set') @@ -4829,7 +4861,7 @@ def update_channel(get_modem: Callable[[], OpenLoRaModem], channel, frequency, m parameters that are not updated via the command line options. If the frequency value is an integer, it is assumed to be in Hz. If the - frequency value is a floating point number, it is assumed to be in MHz. + frequency value is a floating point number, it is assumed to be in MHz. The minimum and maximum data rates can be either strings (e.g., sf12_125) or integers. @@ -4842,7 +4874,7 @@ def update_channel(get_modem: Callable[[], OpenLoRaModem], channel, frequency, m frequency = parse_frequency(frequency) modem = get_modem() - region = modem.region + region = modem.band if min_dr is not None: min_dr = parse_data_rate(region, min_dr) @@ -4850,7 +4882,7 @@ def update_channel(get_modem: Callable[[], OpenLoRaModem], channel, frequency, m if max_dr is not None: max_dr = parse_data_rate(region, max_dr) - data = list(filter(lambda v: v.id == channel, modem.rf_param)) + data = list(filter(lambda v: v.id == channel, modem.rfparam)) if len(data) == 0: click.echo(f'RF channel {channel} not found', err=True) sys.exit(1) @@ -4859,7 +4891,7 @@ def update_channel(get_modem: Callable[[], OpenLoRaModem], channel, frequency, m if min_dr is None: min_dr = data[0].min_dr if max_dr is None: max_dr = data[0].max_dr - modem.rf_param = RFConfig(channel, frequency, min_dr, max_dr) + modem.rfparam = RFConfig(channel, frequency, min_dr, max_dr) if __name__ == '__main__':