Skip to content

Commit 1a2519d

Browse files
committed
refactor code to util
1 parent a3572ef commit 1a2519d

File tree

5 files changed

+190
-199
lines changed

5 files changed

+190
-199
lines changed

examples/scan_for_devices.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,7 @@
33
"""
44

55
import sys
6-
from meshtastic.supported_device import get_unique_vendor_ids, active_ports_on_supported_devices
7-
from meshtastic.util import detect_supported_devices
6+
from meshtastic.util import detect_supported_devices, get_unique_vendor_ids, active_ports_on_supported_devices
87

98
# simple arg check
109
if len(sys.argv) != 1:

meshtastic/supported_device.py

Lines changed: 0 additions & 113 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,6 @@
22
It is used for auto detection as to which device might be connected.
33
"""
44

5-
import platform
6-
import subprocess
7-
import re
8-
9-
import meshtastic.util
10-
115
# Goal is to detect which device and port to use from the supported devices
126
# without installing any libraries that are not currently in the python meshtastic library
137

@@ -93,110 +87,3 @@ def __init__(self, name, version=None, for_firmware=None, device_class="esp32",
9387
heltec_v1, heltec_v2_0, heltec_v2_1,
9488
meshtastic_diy_v1, techo_1, rak4631_5005, rak4631_19003,
9589
rak11200]
96-
97-
98-
def get_unique_vendor_ids():
99-
"""Return a set of unique vendor ids"""
100-
vids = set()
101-
for d in supported_devices:
102-
if d.usb_vendor_id_in_hex:
103-
vids.add(d.usb_vendor_id_in_hex)
104-
return vids
105-
106-
def get_devices_with_vendor_id(vid):
107-
"""Return a set of unique devices with the vendor id"""
108-
sd = set()
109-
for d in supported_devices:
110-
if d.usb_vendor_id_in_hex == vid:
111-
sd.add(d)
112-
return sd
113-
114-
def active_ports_on_supported_devices(sds, eliminate_duplicates=False):
115-
"""Return a set of active ports based on the supplied supported devices"""
116-
ports = set()
117-
baseports = set()
118-
system = platform.system()
119-
120-
# figure out what possible base ports there are
121-
for d in sds:
122-
if system == "Linux":
123-
baseports.add(d.baseport_on_linux)
124-
elif system == "Darwin":
125-
baseports.add(d.baseport_on_mac)
126-
elif system == "Windows":
127-
baseports.add(d.baseport_on_windows)
128-
129-
for bp in baseports:
130-
if system == "Linux":
131-
# see if we have any devices (ignoring any stderr output)
132-
command = f'ls -al /dev/{bp}* 2> /dev/null'
133-
#print(f'command:{command}')
134-
_, ls_output = subprocess.getstatusoutput(command)
135-
#print(f'ls_output:{ls_output}')
136-
# if we got output, there are ports
137-
if len(ls_output) > 0:
138-
#print('got output')
139-
# for each line of output
140-
lines = ls_output.split('\n')
141-
#print(f'lines:{lines}')
142-
for line in lines:
143-
parts = line.split(' ')
144-
#print(f'parts:{parts}')
145-
port = parts[-1]
146-
#print(f'port:{port}')
147-
ports.add(port)
148-
elif system == "Darwin":
149-
# see if we have any devices (ignoring any stderr output)
150-
command = f'ls -al /dev/{bp}* 2> /dev/null'
151-
#print(f'command:{command}')
152-
_, ls_output = subprocess.getstatusoutput(command)
153-
#print(f'ls_output:{ls_output}')
154-
# if we got output, there are ports
155-
if len(ls_output) > 0:
156-
#print('got output')
157-
# for each line of output
158-
lines = ls_output.split('\n')
159-
#print(f'lines:{lines}')
160-
for line in lines:
161-
parts = line.split(' ')
162-
#print(f'parts:{parts}')
163-
port = parts[-1]
164-
#print(f'port:{port}')
165-
ports.add(port)
166-
elif system == "Windows":
167-
# for each device in supported devices found
168-
for d in sds:
169-
# find the port(s)
170-
com_ports = detect_windows_port(d)
171-
#print(f'com_ports:{com_ports}')
172-
# add all ports
173-
for com_port in com_ports:
174-
ports.add(com_port)
175-
if eliminate_duplicates:
176-
ports = meshtastic.util.eliminate_duplicate_port(list(ports))
177-
ports.sort()
178-
ports = set(ports)
179-
return ports
180-
181-
182-
def detect_windows_port(sd):
183-
"""detect if Windows port"""
184-
ports = set()
185-
186-
if sd:
187-
system = platform.system()
188-
189-
if system == "Windows":
190-
command = ('powershell.exe "[Console]::OutputEncoding = [Text.UTF8Encoding]::UTF8;'
191-
'Get-PnpDevice -PresentOnly | Where-Object{ ($_.DeviceId -like ')
192-
command += f"'*{sd.usb_vendor_id_in_hex.upper()}*'"
193-
command += ')} | Format-List"'
194-
195-
#print(f'command:{command}')
196-
_, sp_output = subprocess.getstatusoutput(command)
197-
#print(f'sp_output:{sp_output}')
198-
p = re.compile(r'\(COM(.*)\)')
199-
for x in p.findall(sp_output):
200-
#print(f'x:{x}')
201-
ports.add(f'COM{x}')
202-
return ports

meshtastic/tests/test_supported_device.py

Lines changed: 0 additions & 82 deletions
This file was deleted.

meshtastic/tests/test_util.py

Lines changed: 78 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,9 @@
1212
remove_keys_from_dict, Timeout, hexstr,
1313
ipstr, readnet_u16, findPorts, convert_mac_addr,
1414
snake_to_camel, camel_to_snake, eliminate_duplicate_port,
15-
is_windows11)
15+
is_windows11, active_ports_on_supported_devices)
16+
17+
from meshtastic.supported_device import SupportedDevice
1618

1719

1820
@pytest.mark.unit
@@ -379,3 +381,78 @@ def test_is_windows11_false_win8_1(patched_platform, patched_release):
379381
assert is_windows11() is False
380382
patched_platform.assert_called()
381383
patched_release.assert_called()
384+
385+
386+
@pytest.mark.unit
387+
@patch('platform.system', return_value='Linux')
388+
def test_active_ports_on_supported_devices_empty(mock_platform):
389+
"""Test active_ports_on_supported_devices()"""
390+
sds = set()
391+
assert active_ports_on_supported_devices(sds) == set()
392+
mock_platform.assert_called()
393+
394+
395+
@pytest.mark.unit
396+
@patch('subprocess.getstatusoutput')
397+
@patch('platform.system', return_value='Linux')
398+
def test_active_ports_on_supported_devices_linux(mock_platform, mock_sp):
399+
"""Test active_ports_on_supported_devices()"""
400+
mock_sp.return_value = (None, 'crw-rw-rw- 1 root wheel 0x9000000 Feb 8 22:22 /dev/ttyUSBfake')
401+
fake_device = SupportedDevice(name='a', for_firmware='heltec-v2.1', baseport_on_linux='ttyUSB')
402+
fake_supported_devices = [fake_device]
403+
assert active_ports_on_supported_devices(fake_supported_devices) == {'/dev/ttyUSBfake'}
404+
mock_platform.assert_called()
405+
mock_sp.assert_called()
406+
407+
408+
@pytest.mark.unit
409+
@patch('subprocess.getstatusoutput')
410+
@patch('platform.system', return_value='Darwin')
411+
def test_active_ports_on_supported_devices_mac(mock_platform, mock_sp):
412+
"""Test active_ports_on_supported_devices()"""
413+
mock_sp.return_value = (None, 'crw-rw-rw- 1 root wheel 0x9000000 Feb 8 22:22 /dev/cu.usbserial-foo')
414+
fake_device = SupportedDevice(name='a', for_firmware='heltec-v2.1', baseport_on_linux='cu.usbserial-')
415+
fake_supported_devices = [fake_device]
416+
assert active_ports_on_supported_devices(fake_supported_devices) == {'/dev/cu.usbserial-foo'}
417+
mock_platform.assert_called()
418+
mock_sp.assert_called()
419+
420+
421+
@pytest.mark.unit
422+
@patch('meshtastic.util.detect_windows_port', return_value={'COM2'})
423+
@patch('platform.system', return_value='Windows')
424+
def test_active_ports_on_supported_devices_win(mock_platform, mock_dwp):
425+
"""Test active_ports_on_supported_devices()"""
426+
fake_device = SupportedDevice(name='a', for_firmware='heltec-v2.1')
427+
fake_supported_devices = [fake_device]
428+
assert active_ports_on_supported_devices(fake_supported_devices) == {'COM2'}
429+
mock_platform.assert_called()
430+
mock_dwp.assert_called()
431+
432+
433+
@pytest.mark.unit
434+
@patch('subprocess.getstatusoutput')
435+
@patch('platform.system', return_value='Darwin')
436+
def test_active_ports_on_supported_devices_mac_no_duplicates_check(mock_platform, mock_sp):
437+
"""Test active_ports_on_supported_devices()"""
438+
mock_sp.return_value = (None, ('crw-rw-rw- 1 root wheel 0x9000005 Mar 8 10:05 /dev/cu.usbmodem53230051441\n'
439+
'crw-rw-rw- 1 root wheel 0x9000003 Mar 8 10:06 /dev/cu.wchusbserial53230051441'))
440+
fake_device = SupportedDevice(name='a', for_firmware='tbeam', baseport_on_mac='cu.usbmodem')
441+
fake_supported_devices = [fake_device]
442+
assert active_ports_on_supported_devices(fake_supported_devices, False) == {'/dev/cu.usbmodem53230051441', '/dev/cu.wchusbserial53230051441'}
443+
mock_platform.assert_called()
444+
mock_sp.assert_called()
445+
446+
447+
@pytest.mark.unit
448+
@patch('subprocess.getstatusoutput')
449+
@patch('platform.system', return_value='Darwin')
450+
def test_active_ports_on_supported_devices_mac_duplicates_check(mock_platform, mock_sp):
451+
"""Test active_ports_on_supported_devices()"""
452+
mock_sp.return_value = (None, ('crw-rw-rw- 1 root wheel 0x9000005 Mar 8 10:05 /dev/cu.usbmodem53230051441\n'
453+
'crw-rw-rw- 1 root wheel 0x9000003 Mar 8 10:06 /dev/cu.wchusbserial53230051441'))
454+
fake_device = SupportedDevice(name='a', for_firmware='tbeam', baseport_on_mac='cu.usbmodem')
455+
fake_supported_devices = [fake_device]
456+
assert active_ports_on_supported_devices(fake_supported_devices, True) == {'/dev/cu.wchusbserial53230051441'}
457+
mock_platform.assert_called()
458+
mock_sp.assert_called()

0 commit comments

Comments
 (0)