Skip to content

Commit 20c6597

Browse files
committed
refactor code to util
1 parent 96e42ac commit 20c6597

File tree

6 files changed

+191
-199
lines changed

6 files changed

+191
-199
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ dist
66
log_*
77
.eggs
88
nanopb-0.4.4
9+
nanopb-0.4.5
910
.*swp
1011
.coverage
1112
*.py-E

examples/scan_for_devices.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,7 @@
33
"""
44

55
import sys
6-
from meshtastic.supported_device import get_unique_vendor_ids, active_ports_on_supported_devices
7-
from meshtastic.util import detect_supported_devices
6+
from meshtastic.util import detect_supported_devices, get_unique_vendor_ids, active_ports_on_supported_devices
87

98
# simple arg check
109
if len(sys.argv) != 1:

meshtastic/supported_device.py

Lines changed: 0 additions & 113 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,6 @@
22
It is used for auto detection as to which device might be connected.
33
"""
44

5-
import platform
6-
import subprocess
7-
import re
8-
9-
import meshtastic.util
10-
115
# Goal is to detect which device and port to use from the supported devices
126
# without installing any libraries that are not currently in the python meshtastic library
137

@@ -93,110 +87,3 @@ def __init__(self, name, version=None, for_firmware=None, device_class="esp32",
9387
heltec_v1, heltec_v2_0, heltec_v2_1,
9488
meshtastic_diy_v1, techo_1, rak4631_5005, rak4631_19003,
9589
rak11200]
96-
97-
98-
def get_unique_vendor_ids():
99-
"""Return a set of unique vendor ids"""
100-
vids = set()
101-
for d in supported_devices:
102-
if d.usb_vendor_id_in_hex:
103-
vids.add(d.usb_vendor_id_in_hex)
104-
return vids
105-
106-
def get_devices_with_vendor_id(vid):
107-
"""Return a set of unique devices with the vendor id"""
108-
sd = set()
109-
for d in supported_devices:
110-
if d.usb_vendor_id_in_hex == vid:
111-
sd.add(d)
112-
return sd
113-
114-
def active_ports_on_supported_devices(sds, eliminate_duplicates=False):
115-
"""Return a set of active ports based on the supplied supported devices"""
116-
ports = set()
117-
baseports = set()
118-
system = platform.system()
119-
120-
# figure out what possible base ports there are
121-
for d in sds:
122-
if system == "Linux":
123-
baseports.add(d.baseport_on_linux)
124-
elif system == "Darwin":
125-
baseports.add(d.baseport_on_mac)
126-
elif system == "Windows":
127-
baseports.add(d.baseport_on_windows)
128-
129-
for bp in baseports:
130-
if system == "Linux":
131-
# see if we have any devices (ignoring any stderr output)
132-
command = f'ls -al /dev/{bp}* 2> /dev/null'
133-
#print(f'command:{command}')
134-
_, ls_output = subprocess.getstatusoutput(command)
135-
#print(f'ls_output:{ls_output}')
136-
# if we got output, there are ports
137-
if len(ls_output) > 0:
138-
#print('got output')
139-
# for each line of output
140-
lines = ls_output.split('\n')
141-
#print(f'lines:{lines}')
142-
for line in lines:
143-
parts = line.split(' ')
144-
#print(f'parts:{parts}')
145-
port = parts[-1]
146-
#print(f'port:{port}')
147-
ports.add(port)
148-
elif system == "Darwin":
149-
# see if we have any devices (ignoring any stderr output)
150-
command = f'ls -al /dev/{bp}* 2> /dev/null'
151-
#print(f'command:{command}')
152-
_, ls_output = subprocess.getstatusoutput(command)
153-
#print(f'ls_output:{ls_output}')
154-
# if we got output, there are ports
155-
if len(ls_output) > 0:
156-
#print('got output')
157-
# for each line of output
158-
lines = ls_output.split('\n')
159-
#print(f'lines:{lines}')
160-
for line in lines:
161-
parts = line.split(' ')
162-
#print(f'parts:{parts}')
163-
port = parts[-1]
164-
#print(f'port:{port}')
165-
ports.add(port)
166-
elif system == "Windows":
167-
# for each device in supported devices found
168-
for d in sds:
169-
# find the port(s)
170-
com_ports = detect_windows_port(d)
171-
#print(f'com_ports:{com_ports}')
172-
# add all ports
173-
for com_port in com_ports:
174-
ports.add(com_port)
175-
if eliminate_duplicates:
176-
ports = meshtastic.util.eliminate_duplicate_port(list(ports))
177-
ports.sort()
178-
ports = set(ports)
179-
return ports
180-
181-
182-
def detect_windows_port(sd):
183-
"""detect if Windows port"""
184-
ports = set()
185-
186-
if sd:
187-
system = platform.system()
188-
189-
if system == "Windows":
190-
command = ('powershell.exe "[Console]::OutputEncoding = [Text.UTF8Encoding]::UTF8;'
191-
'Get-PnpDevice -PresentOnly | Where-Object{ ($_.DeviceId -like ')
192-
command += f"'*{sd.usb_vendor_id_in_hex.upper()}*'"
193-
command += ')} | Format-List"'
194-
195-
#print(f'command:{command}')
196-
_, sp_output = subprocess.getstatusoutput(command)
197-
#print(f'sp_output:{sp_output}')
198-
p = re.compile(r'\(COM(.*)\)')
199-
for x in p.findall(sp_output):
200-
#print(f'x:{x}')
201-
ports.add(f'COM{x}')
202-
return ports

meshtastic/tests/test_supported_device.py

Lines changed: 0 additions & 82 deletions
This file was deleted.

meshtastic/tests/test_util.py

Lines changed: 78 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,9 @@
1212
remove_keys_from_dict, Timeout, hexstr,
1313
ipstr, readnet_u16, findPorts, convert_mac_addr,
1414
snake_to_camel, camel_to_snake, eliminate_duplicate_port,
15-
is_windows11)
15+
is_windows11, active_ports_on_supported_devices)
16+
17+
from meshtastic.supported_device import SupportedDevice
1618

1719

1820
@pytest.mark.unit
@@ -361,3 +363,78 @@ def test_is_windows11_false_win8_1(patched_platform, patched_release):
361363
assert is_windows11() is False
362364
patched_platform.assert_called()
363365
patched_release.assert_called()
366+
367+
368+
@pytest.mark.unit
369+
@patch('platform.system', return_value='Linux')
370+
def test_active_ports_on_supported_devices_empty(mock_platform):
371+
"""Test active_ports_on_supported_devices()"""
372+
sds = set()
373+
assert active_ports_on_supported_devices(sds) == set()
374+
mock_platform.assert_called()
375+
376+
377+
@pytest.mark.unit
378+
@patch('subprocess.getstatusoutput')
379+
@patch('platform.system', return_value='Linux')
380+
def test_active_ports_on_supported_devices_linux(mock_platform, mock_sp):
381+
"""Test active_ports_on_supported_devices()"""
382+
mock_sp.return_value = (None, 'crw-rw-rw- 1 root wheel 0x9000000 Feb 8 22:22 /dev/ttyUSBfake')
383+
fake_device = SupportedDevice(name='a', for_firmware='heltec-v2.1', baseport_on_linux='ttyUSB')
384+
fake_supported_devices = [fake_device]
385+
assert active_ports_on_supported_devices(fake_supported_devices) == {'/dev/ttyUSBfake'}
386+
mock_platform.assert_called()
387+
mock_sp.assert_called()
388+
389+
390+
@pytest.mark.unit
391+
@patch('subprocess.getstatusoutput')
392+
@patch('platform.system', return_value='Darwin')
393+
def test_active_ports_on_supported_devices_mac(mock_platform, mock_sp):
394+
"""Test active_ports_on_supported_devices()"""
395+
mock_sp.return_value = (None, 'crw-rw-rw- 1 root wheel 0x9000000 Feb 8 22:22 /dev/cu.usbserial-foo')
396+
fake_device = SupportedDevice(name='a', for_firmware='heltec-v2.1', baseport_on_linux='cu.usbserial-')
397+
fake_supported_devices = [fake_device]
398+
assert active_ports_on_supported_devices(fake_supported_devices) == {'/dev/cu.usbserial-foo'}
399+
mock_platform.assert_called()
400+
mock_sp.assert_called()
401+
402+
403+
@pytest.mark.unit
404+
@patch('meshtastic.util.detect_windows_port', return_value={'COM2'})
405+
@patch('platform.system', return_value='Windows')
406+
def test_active_ports_on_supported_devices_win(mock_platform, mock_dwp):
407+
"""Test active_ports_on_supported_devices()"""
408+
fake_device = SupportedDevice(name='a', for_firmware='heltec-v2.1')
409+
fake_supported_devices = [fake_device]
410+
assert active_ports_on_supported_devices(fake_supported_devices) == {'COM2'}
411+
mock_platform.assert_called()
412+
mock_dwp.assert_called()
413+
414+
415+
@pytest.mark.unit
416+
@patch('subprocess.getstatusoutput')
417+
@patch('platform.system', return_value='Darwin')
418+
def test_active_ports_on_supported_devices_mac_no_duplicates_check(mock_platform, mock_sp):
419+
"""Test active_ports_on_supported_devices()"""
420+
mock_sp.return_value = (None, ('crw-rw-rw- 1 root wheel 0x9000005 Mar 8 10:05 /dev/cu.usbmodem53230051441\n'
421+
'crw-rw-rw- 1 root wheel 0x9000003 Mar 8 10:06 /dev/cu.wchusbserial53230051441'))
422+
fake_device = SupportedDevice(name='a', for_firmware='tbeam', baseport_on_mac='cu.usbmodem')
423+
fake_supported_devices = [fake_device]
424+
assert active_ports_on_supported_devices(fake_supported_devices, False) == {'/dev/cu.usbmodem53230051441', '/dev/cu.wchusbserial53230051441'}
425+
mock_platform.assert_called()
426+
mock_sp.assert_called()
427+
428+
429+
@pytest.mark.unit
430+
@patch('subprocess.getstatusoutput')
431+
@patch('platform.system', return_value='Darwin')
432+
def test_active_ports_on_supported_devices_mac_duplicates_check(mock_platform, mock_sp):
433+
"""Test active_ports_on_supported_devices()"""
434+
mock_sp.return_value = (None, ('crw-rw-rw- 1 root wheel 0x9000005 Mar 8 10:05 /dev/cu.usbmodem53230051441\n'
435+
'crw-rw-rw- 1 root wheel 0x9000003 Mar 8 10:06 /dev/cu.wchusbserial53230051441'))
436+
fake_device = SupportedDevice(name='a', for_firmware='tbeam', baseport_on_mac='cu.usbmodem')
437+
fake_supported_devices = [fake_device]
438+
assert active_ports_on_supported_devices(fake_supported_devices, True) == {'/dev/cu.wchusbserial53230051441'}
439+
mock_platform.assert_called()
440+
mock_sp.assert_called()

0 commit comments

Comments
 (0)