Skip to content

Commit aa1dcb7

Browse files
authored
Merge pull request #298 from mkinney/master
refactor code to util; add duplicate check
2 parents e28f0d5 + 1a2519d commit aa1dcb7

File tree

4 files changed

+192
-111
lines changed

4 files changed

+192
-111
lines changed

examples/scan_for_devices.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,7 @@
33
"""
44

55
import sys
6-
from meshtastic.supported_device import get_unique_vendor_ids, active_ports_on_supported_devices
7-
from meshtastic.util import detect_supported_devices
6+
from meshtastic.util import detect_supported_devices, get_unique_vendor_ids, active_ports_on_supported_devices
87

98
# simple arg check
109
if len(sys.argv) != 1:

meshtastic/supported_device.py

Lines changed: 0 additions & 107 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,6 @@
22
It is used for auto detection as to which device might be connected.
33
"""
44

5-
import platform
6-
import subprocess
7-
import re
8-
95
# Goal is to detect which device and port to use from the supported devices
106
# without installing any libraries that are not currently in the python meshtastic library
117

@@ -91,106 +87,3 @@ def __init__(self, name, version=None, for_firmware=None, device_class="esp32",
9187
heltec_v1, heltec_v2_0, heltec_v2_1,
9288
meshtastic_diy_v1, techo_1, rak4631_5005, rak4631_19003,
9389
rak11200]
94-
95-
96-
def get_unique_vendor_ids():
97-
"""Return a set of unique vendor ids"""
98-
vids = set()
99-
for d in supported_devices:
100-
if d.usb_vendor_id_in_hex:
101-
vids.add(d.usb_vendor_id_in_hex)
102-
return vids
103-
104-
def get_devices_with_vendor_id(vid):
105-
"""Return a set of unique devices with the vendor id"""
106-
sd = set()
107-
for d in supported_devices:
108-
if d.usb_vendor_id_in_hex == vid:
109-
sd.add(d)
110-
return sd
111-
112-
def active_ports_on_supported_devices(sds):
113-
"""Return a set of active ports based on the supplied supported devices"""
114-
ports = set()
115-
baseports = set()
116-
system = platform.system()
117-
118-
# figure out what possible base ports there are
119-
for d in sds:
120-
if system == "Linux":
121-
baseports.add(d.baseport_on_linux)
122-
elif system == "Darwin":
123-
baseports.add(d.baseport_on_mac)
124-
elif system == "Windows":
125-
baseports.add(d.baseport_on_windows)
126-
127-
for bp in baseports:
128-
if system == "Linux":
129-
# see if we have any devices (ignoring any stderr output)
130-
command = f'ls -al /dev/{bp}* 2> /dev/null'
131-
#print(f'command:{command}')
132-
_, ls_output = subprocess.getstatusoutput(command)
133-
#print(f'ls_output:{ls_output}')
134-
# if we got output, there are ports
135-
if len(ls_output) > 0:
136-
#print('got output')
137-
# for each line of output
138-
lines = ls_output.split('\n')
139-
#print(f'lines:{lines}')
140-
for line in lines:
141-
parts = line.split(' ')
142-
#print(f'parts:{parts}')
143-
port = parts[-1]
144-
#print(f'port:{port}')
145-
ports.add(port)
146-
elif system == "Darwin":
147-
# see if we have any devices (ignoring any stderr output)
148-
command = f'ls -al /dev/{bp}* 2> /dev/null'
149-
#print(f'command:{command}')
150-
_, ls_output = subprocess.getstatusoutput(command)
151-
#print(f'ls_output:{ls_output}')
152-
# if we got output, there are ports
153-
if len(ls_output) > 0:
154-
#print('got output')
155-
# for each line of output
156-
lines = ls_output.split('\n')
157-
#print(f'lines:{lines}')
158-
for line in lines:
159-
parts = line.split(' ')
160-
#print(f'parts:{parts}')
161-
port = parts[-1]
162-
#print(f'port:{port}')
163-
ports.add(port)
164-
elif system == "Windows":
165-
# for each device in supported devices found
166-
for d in sds:
167-
# find the port(s)
168-
com_ports = detect_windows_port(d)
169-
#print(f'com_ports:{com_ports}')
170-
# add all ports
171-
for com_port in com_ports:
172-
ports.add(com_port)
173-
return ports
174-
175-
176-
def detect_windows_port(sd):
177-
"""detect if Windows port"""
178-
ports = set()
179-
180-
if sd:
181-
system = platform.system()
182-
183-
if system == "Windows":
184-
command = ('powershell.exe "[Console]::OutputEncoding = [Text.UTF8Encoding]::UTF8;'
185-
'Get-PnpDevice -PresentOnly | Where-Object{ ($_.DeviceId -like ')
186-
command += f"'*{sd.usb_vendor_id_in_hex.upper()}*'"
187-
command += ')} | Format-List"'
188-
189-
#print(f'command:{command}')
190-
_, sp_output = subprocess.getstatusoutput(command)
191-
#print(f'sp_output:{sp_output}')
192-
p = re.compile(r'\(COM(.*)\)')
193-
for x in p.findall(sp_output):
194-
#print(f'x:{x}')
195-
ports.add(f'COM{x}')
196-
return ports

meshtastic/tests/test_util.py

Lines changed: 80 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,9 @@
1212
remove_keys_from_dict, Timeout, hexstr,
1313
ipstr, readnet_u16, findPorts, convert_mac_addr,
1414
snake_to_camel, camel_to_snake, eliminate_duplicate_port,
15-
is_windows11)
15+
is_windows11, active_ports_on_supported_devices)
16+
17+
from meshtastic.supported_device import SupportedDevice
1618

1719

1820
@pytest.mark.unit
@@ -336,6 +338,8 @@ def test_eliminate_duplicate_port():
336338
assert eliminate_duplicate_port(['/dev/cu.usbserial-0001', '/dev/cu.SLAB_USBtoUART']) == ['/dev/cu.usbserial-0001']
337339
assert eliminate_duplicate_port(['/dev/cu.usbmodem11301', '/dev/cu.wchusbserial11301']) == ['/dev/cu.wchusbserial11301']
338340
assert eliminate_duplicate_port(['/dev/cu.wchusbserial11301', '/dev/cu.usbmodem11301']) == ['/dev/cu.wchusbserial11301']
341+
assert eliminate_duplicate_port(['/dev/cu.usbmodem53230051441', '/dev/cu.wchusbserial53230051441']) == ['/dev/cu.wchusbserial53230051441']
342+
assert eliminate_duplicate_port(['/dev/cu.wchusbserial53230051441', '/dev/cu.usbmodem53230051441']) == ['/dev/cu.wchusbserial53230051441']
339343

340344
@patch('platform.version', return_value='10.0.22000.194')
341345
@patch('platform.release', return_value='10')
@@ -377,3 +381,78 @@ def test_is_windows11_false_win8_1(patched_platform, patched_release):
377381
assert is_windows11() is False
378382
patched_platform.assert_called()
379383
patched_release.assert_called()
384+
385+
386+
@pytest.mark.unit
387+
@patch('platform.system', return_value='Linux')
388+
def test_active_ports_on_supported_devices_empty(mock_platform):
389+
"""Test active_ports_on_supported_devices()"""
390+
sds = set()
391+
assert active_ports_on_supported_devices(sds) == set()
392+
mock_platform.assert_called()
393+
394+
395+
@pytest.mark.unit
396+
@patch('subprocess.getstatusoutput')
397+
@patch('platform.system', return_value='Linux')
398+
def test_active_ports_on_supported_devices_linux(mock_platform, mock_sp):
399+
"""Test active_ports_on_supported_devices()"""
400+
mock_sp.return_value = (None, 'crw-rw-rw- 1 root wheel 0x9000000 Feb 8 22:22 /dev/ttyUSBfake')
401+
fake_device = SupportedDevice(name='a', for_firmware='heltec-v2.1', baseport_on_linux='ttyUSB')
402+
fake_supported_devices = [fake_device]
403+
assert active_ports_on_supported_devices(fake_supported_devices) == {'/dev/ttyUSBfake'}
404+
mock_platform.assert_called()
405+
mock_sp.assert_called()
406+
407+
408+
@pytest.mark.unit
409+
@patch('subprocess.getstatusoutput')
410+
@patch('platform.system', return_value='Darwin')
411+
def test_active_ports_on_supported_devices_mac(mock_platform, mock_sp):
412+
"""Test active_ports_on_supported_devices()"""
413+
mock_sp.return_value = (None, 'crw-rw-rw- 1 root wheel 0x9000000 Feb 8 22:22 /dev/cu.usbserial-foo')
414+
fake_device = SupportedDevice(name='a', for_firmware='heltec-v2.1', baseport_on_linux='cu.usbserial-')
415+
fake_supported_devices = [fake_device]
416+
assert active_ports_on_supported_devices(fake_supported_devices) == {'/dev/cu.usbserial-foo'}
417+
mock_platform.assert_called()
418+
mock_sp.assert_called()
419+
420+
421+
@pytest.mark.unit
422+
@patch('meshtastic.util.detect_windows_port', return_value={'COM2'})
423+
@patch('platform.system', return_value='Windows')
424+
def test_active_ports_on_supported_devices_win(mock_platform, mock_dwp):
425+
"""Test active_ports_on_supported_devices()"""
426+
fake_device = SupportedDevice(name='a', for_firmware='heltec-v2.1')
427+
fake_supported_devices = [fake_device]
428+
assert active_ports_on_supported_devices(fake_supported_devices) == {'COM2'}
429+
mock_platform.assert_called()
430+
mock_dwp.assert_called()
431+
432+
433+
@pytest.mark.unit
434+
@patch('subprocess.getstatusoutput')
435+
@patch('platform.system', return_value='Darwin')
436+
def test_active_ports_on_supported_devices_mac_no_duplicates_check(mock_platform, mock_sp):
437+
"""Test active_ports_on_supported_devices()"""
438+
mock_sp.return_value = (None, ('crw-rw-rw- 1 root wheel 0x9000005 Mar 8 10:05 /dev/cu.usbmodem53230051441\n'
439+
'crw-rw-rw- 1 root wheel 0x9000003 Mar 8 10:06 /dev/cu.wchusbserial53230051441'))
440+
fake_device = SupportedDevice(name='a', for_firmware='tbeam', baseport_on_mac='cu.usbmodem')
441+
fake_supported_devices = [fake_device]
442+
assert active_ports_on_supported_devices(fake_supported_devices, False) == {'/dev/cu.usbmodem53230051441', '/dev/cu.wchusbserial53230051441'}
443+
mock_platform.assert_called()
444+
mock_sp.assert_called()
445+
446+
447+
@pytest.mark.unit
448+
@patch('subprocess.getstatusoutput')
449+
@patch('platform.system', return_value='Darwin')
450+
def test_active_ports_on_supported_devices_mac_duplicates_check(mock_platform, mock_sp):
451+
"""Test active_ports_on_supported_devices()"""
452+
mock_sp.return_value = (None, ('crw-rw-rw- 1 root wheel 0x9000005 Mar 8 10:05 /dev/cu.usbmodem53230051441\n'
453+
'crw-rw-rw- 1 root wheel 0x9000003 Mar 8 10:06 /dev/cu.wchusbserial53230051441'))
454+
fake_device = SupportedDevice(name='a', for_firmware='tbeam', baseport_on_mac='cu.usbmodem')
455+
fake_supported_devices = [fake_device]
456+
assert active_ports_on_supported_devices(fake_supported_devices, True) == {'/dev/cu.wchusbserial53230051441'}
457+
mock_platform.assert_called()
458+
mock_sp.assert_called()

meshtastic/util.py

Lines changed: 111 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,8 @@
1414
import serial
1515
import serial.tools.list_ports
1616
import pkg_resources
17-
from meshtastic.supported_device import get_unique_vendor_ids, get_devices_with_vendor_id
17+
18+
from meshtastic.supported_device import supported_devices
1819

1920
"""Some devices such as a seger jlink we never want to accidentally open"""
2021
blacklistVids = dict.fromkeys([0x1366])
@@ -433,3 +434,112 @@ def is_windows11():
433434
except Exception as e:
434435
print(f'problem detecting win11 e:{e}')
435436
return is_win11
437+
438+
439+
def get_unique_vendor_ids():
440+
"""Return a set of unique vendor ids"""
441+
vids = set()
442+
for d in supported_devices:
443+
if d.usb_vendor_id_in_hex:
444+
vids.add(d.usb_vendor_id_in_hex)
445+
return vids
446+
447+
448+
def get_devices_with_vendor_id(vid):
449+
"""Return a set of unique devices with the vendor id"""
450+
sd = set()
451+
for d in supported_devices:
452+
if d.usb_vendor_id_in_hex == vid:
453+
sd.add(d)
454+
return sd
455+
456+
457+
def active_ports_on_supported_devices(sds, eliminate_duplicates=False):
458+
"""Return a set of active ports based on the supplied supported devices"""
459+
ports = set()
460+
baseports = set()
461+
system = platform.system()
462+
463+
# figure out what possible base ports there are
464+
for d in sds:
465+
if system == "Linux":
466+
baseports.add(d.baseport_on_linux)
467+
elif system == "Darwin":
468+
baseports.add(d.baseport_on_mac)
469+
elif system == "Windows":
470+
baseports.add(d.baseport_on_windows)
471+
472+
for bp in baseports:
473+
if system == "Linux":
474+
# see if we have any devices (ignoring any stderr output)
475+
command = f'ls -al /dev/{bp}* 2> /dev/null'
476+
#print(f'command:{command}')
477+
_, ls_output = subprocess.getstatusoutput(command)
478+
#print(f'ls_output:{ls_output}')
479+
# if we got output, there are ports
480+
if len(ls_output) > 0:
481+
#print('got output')
482+
# for each line of output
483+
lines = ls_output.split('\n')
484+
#print(f'lines:{lines}')
485+
for line in lines:
486+
parts = line.split(' ')
487+
#print(f'parts:{parts}')
488+
port = parts[-1]
489+
#print(f'port:{port}')
490+
ports.add(port)
491+
elif system == "Darwin":
492+
# see if we have any devices (ignoring any stderr output)
493+
command = f'ls -al /dev/{bp}* 2> /dev/null'
494+
#print(f'command:{command}')
495+
_, ls_output = subprocess.getstatusoutput(command)
496+
#print(f'ls_output:{ls_output}')
497+
# if we got output, there are ports
498+
if len(ls_output) > 0:
499+
#print('got output')
500+
# for each line of output
501+
lines = ls_output.split('\n')
502+
#print(f'lines:{lines}')
503+
for line in lines:
504+
parts = line.split(' ')
505+
#print(f'parts:{parts}')
506+
port = parts[-1]
507+
#print(f'port:{port}')
508+
ports.add(port)
509+
elif system == "Windows":
510+
# for each device in supported devices found
511+
for d in sds:
512+
# find the port(s)
513+
com_ports = detect_windows_port(d)
514+
#print(f'com_ports:{com_ports}')
515+
# add all ports
516+
for com_port in com_ports:
517+
ports.add(com_port)
518+
if eliminate_duplicates:
519+
ports = eliminate_duplicate_port(list(ports))
520+
ports.sort()
521+
ports = set(ports)
522+
return ports
523+
524+
525+
def detect_windows_port(sd):
526+
"""detect if Windows port"""
527+
ports = set()
528+
529+
if sd:
530+
system = platform.system()
531+
532+
if system == "Windows":
533+
command = ('powershell.exe "[Console]::OutputEncoding = [Text.UTF8Encoding]::UTF8;'
534+
'Get-PnpDevice -PresentOnly | Where-Object{ ($_.DeviceId -like ')
535+
command += f"'*{sd.usb_vendor_id_in_hex.upper()}*'"
536+
command += ')} | Format-List"'
537+
538+
#print(f'command:{command}')
539+
_, sp_output = subprocess.getstatusoutput(command)
540+
#print(f'sp_output:{sp_output}')
541+
p = re.compile(r'\(COM(.*)\)')
542+
for x in p.findall(sp_output):
543+
#print(f'x:{x}')
544+
ports.add(f'COM{x}')
545+
return ports

0 commit comments

Comments
 (0)