Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[thci] convert Python2 script to Python3 #310

Merged
merged 2 commits into from
Jan 9, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion tools/commissioner_thci/commissioner.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#!/usr/bin/env python
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

# Copyright (c) 2019, The OpenThread Commissioner Authors.
Expand Down
4 changes: 1 addition & 3 deletions tools/commissioner_thci/commissioner_ctl.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#!/usr/bin/env python
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

# Copyright (c) 2019, The OpenThread Commissioner Authors.
Expand Down Expand Up @@ -30,8 +30,6 @@
Commissioner daemon controller
"""

from __future__ import print_function

import os
import argparse
import multiprocessing.connection
Expand Down
129 changes: 51 additions & 78 deletions tools/commissioner_thci/commissioner_impl.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#!/usr/bin/env python
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

# Copyright (c) 2019, The OpenThread Commissioner Authors.
Expand Down Expand Up @@ -142,16 +142,15 @@ def __init__(self, config, handler, simulator=None):
self._handler = handler
self._lines = []

self._command('stty cols {}'.format(TTY_COLS))
self._command(f'stty cols {TTY_COLS}')

config_path = '/tmp/commissioner.{}.json'.format(uuid.uuid4())
config_path = f'/tmp/commissioner.{uuid.uuid4()}.json'
self._write_config(config_path=config_path, config=config)

response = self._command('{} init "{}"'.format(COMMISSIONER_CTL,
config_path))
response = self._command(f'{COMMISSIONER_CTL} init "{config_path}"')
if self._command('echo $?')[0] != '0':
raise commissioner.Error('Failed to init, error:\n{}'.format(
'\n'.join(response)))
raise commissioner.Error(
f"Failed to init, error:\n{'\n'.join(response)}")

@staticmethod
def makeLocalCommissioner(config, simulator):
Expand All @@ -166,12 +165,9 @@ def makeHarnessCommissioner(config, serial_handler):
return OTCommissioner(config, serial_handler)

def start(self, borderAgentAddr, borderAgentPort):
self._command('sudo rm {}'.format(self.log_file))
self._command('sudo touch {}'.format(self.log_file))
self._execute_and_check('start {} {}'.format(
borderAgentAddr,
borderAgentPort,
))
self._command(f'sudo rm {self.log_file}')
self._command(f'sudo touch {self.log_file}')
self._execute_and_check(f'start {borderAgentAddr} {borderAgentPort}')

def stop(self):
self._execute_and_check('stop')
Expand All @@ -183,8 +179,7 @@ def isActive(self):
elif 'false' in response[0]:
return False
else:
raise commissioner.Error('Unrecognized result "{}"'.format(
response[0]))
raise commissioner.Error(f'Unrecognized result "{response[0]}"')

def getSessionId(self):
response = self._execute_and_check('sessionid')
Expand All @@ -194,7 +189,7 @@ def getSessionId(self):
raise_(commissioner.Error, repr(e), sys.exc_info()[2])

def MGMT_COMMISSIONER_GET(self, tlvTypes):
types = ' '.join(map(lambda x: TLV_TYPE_TO_STRING[x], tlvTypes))
types = ' '.join([TLV_TYPE_TO_STRING[x] for x in tlvTypes])
command = 'commdataset get ' + types
response = self._execute_and_check(command)

Expand Down Expand Up @@ -226,7 +221,7 @@ def MGMT_COMMISSIONER_SET(self, commDataset):
TLV_TYPE_TO_STRING[key]: commDataset[key] for key in commDataset
}
data = json.dumps(dataset)
self._execute_and_check("commdataset set '{}'".format(data))
self._execute_and_check(f"commdataset set '{data}'")

def enableJoiner(self, joinerType, eui64=None, password=None):
command = ['joiner', 'enable', JOINER_TYPE_TO_STRING[joinerType]]
Expand All @@ -252,9 +247,8 @@ def disableJoiner(self, joinerType, eui64=None):
self._execute_and_check(' '.join(command))

def MGMT_ACTIVE_GET(self, tlvTypes):
types = ' '.join(map(lambda x: TLV_TYPE_TO_STRING[x], tlvTypes))
result = self._execute_and_check(
'opdataset get active {}'.format(types))
types = ' '.join([TLV_TYPE_TO_STRING[x] for x in tlvTypes])
result = self._execute_and_check(f'opdataset get active {types}')

try:
return OTCommissioner._active_op_dataset_from_json(' '.join(
Expand All @@ -263,13 +257,13 @@ def MGMT_ACTIVE_GET(self, tlvTypes):
raise_(commissioner.Error, repr(e), sys.exc_info()[2])

def MGMT_ACTIVE_SET(self, activeOpDataset):
self._execute_and_check("opdataset set active '{}'".format(
OTCommissioner._active_op_dataset_to_json(activeOpDataset)))
self._execute_and_check(
f"opdataset set active '{OTCommissioner._active_op_dataset_to_json(activeOpDataset)}'"
)

def MGMT_PENDING_GET(self, tlvTypes):
types = ' '.join(map(lambda x: TLV_TYPE_TO_STRING[x], tlvTypes))
result = self._execute_and_check(
'opdataset get pending {}'.format(types))
types = ' '.join([TLV_TYPE_TO_STRING[x] for x in tlvTypes])
result = self._execute_and_check(f'opdataset get pending {types}')

try:
return OTCommissioner._pending_op_dataset_from_json(' '.join(
Expand All @@ -278,12 +272,13 @@ def MGMT_PENDING_GET(self, tlvTypes):
raise_(commissioner.Error, repr(e), sys.exc_info()[2])

def MGMT_PENDING_SET(self, pendingOpDataset):
self._execute_and_check("opdataset set pending '{}'".format(
OTCommissioner._pending_op_dataset_to_json(pendingOpDataset)))
self._execute_and_check(
f"opdataset set pending '{OTCommissioner._pending_op_dataset_to_json(pendingOpDataset)}'"
)

def MGMT_BBR_GET(self, tlvTypes):
types = ' '.join(map(lambda x: TLV_TYPE_TO_STRING[x], tlvTypes))
result = self._execute_and_check('bbrdataset get {}'.format(types))
types = ' '.join([TLV_TYPE_TO_STRING[x] for x in tlvTypes])
result = self._execute_and_check(f'bbrdataset get {types}')

try:
result = json.loads(' '.join(result[:-1]))
Expand All @@ -296,49 +291,33 @@ def MGMT_BBR_SET(self, bbrDataset):
TLV_TYPE_TO_STRING[key]: bbrDataset[key] for key in bbrDataset
}
dataset = json.dumps(dataset)
self._execute_and_check("bbrdataset set '{}'".format(dataset))
self._execute_and_check(f"bbrdataset set '{dataset}'")

def MLR(self, multicastAddrs, timeout):
self._execute_and_check('mlr {} {}'.format(
' '.join(multicastAddrs),
timeout,
),
self._execute_and_check(f"mlr {' '.join(multicastAddrs)} {timeout}",
check=False)

def MGMT_ANNOUNCE_BEGIN(self, channelMask, count, period, dstAddr):
self._execute_and_check('announce {} {} {} {}'.format(
channelMask,
count,
period,
dstAddr,
))
self._execute_and_check(
f'announce {channelMask} {count} {period} {dstAddr}')

def MGMT_PANID_QUERY(self, channelMask, panId, dstAddr, timeout):
self._execute_and_check('panid query {} {} {}'.format(
channelMask,
panId,
dstAddr,
))
self._execute_and_check(f'panid query {channelMask} {panId} {dstAddr}')
self._sleep(timeout)
result = self._execute_and_check('panid conflict {}'.format(panId))
result = self._execute_and_check(f'panid conflict {panId}')
result = int(result[0])
return False if result == 0 else True

def MGMT_ED_SCAN(self, channelMask, count, period, scanDuration, dstAddr,
timeout):
self._execute_and_check('energy scan {} {} {} {} {}'.format(
channelMask,
count,
period,
scanDuration,
dstAddr,
))
self._execute_and_check(
f'energy scan {channelMask} {count} {period} {scanDuration} {dstAddr}'
)

self._sleep(timeout)
result = self._execute_and_check('energy report {}'.format(dstAddr))
result = self._execute_and_check(f'energy report {dstAddr}')
if result[0] == 'null':
raise commissioner.Error(
'No energy report found for {}'.format(dstAddr))
raise commissioner.Error(f'No energy report found for {dstAddr}')

try:
result = json.loads(' '.join(result[:-1]))
Expand All @@ -352,31 +331,26 @@ def MGMT_ED_SCAN(self, channelMask, count, period, scanDuration, dstAddr,
raise_(commissioner.Error, repr(e), sys.exc_info()[2])

def MGMT_REENROLL(self, dstAddr):
self._execute_and_check('reenroll {}'.format(dstAddr))
self._execute_and_check(f'reenroll {dstAddr}')

def MGMT_DOMAIN_RESET(self, dstAddr):
self._execute_and_check('domainreset {}'.format(dstAddr))
self._execute_and_check(f'domainreset {dstAddr}')

def MGMT_NET_MIGRATE(self, dstAddr, designatedNetwork):
self._execute_and_check('migrate {} {}'.format(
dstAddr,
designatedNetwork,
))
self._execute_and_check(f'migrate {dstAddr} {designatedNetwork}')

def requestCOM_TOK(self, registrarAddr, registrarPort):
self._execute_and_check('token request {} {}'.format(
registrarAddr,
registrarPort,
))
self._execute_and_check(
f'token request {registrarAddr} {registrarPort}')

def setCOM_TOK(self, signedCOM_TOK):
path_token = '/tmp/commissioner.token.{}'.format(uuid.uuid4())
path_token = f'/tmp/commissioner.token.{uuid.uuid4()}'
step = 40
for i in range(0, len(signedCOM_TOK), step):
data = self._bytes_to_hex(signedCOM_TOK[i:i + step])
self._command('echo {} >> "{}"'.format(data, path_token))
self._command(f'echo {data} >> "{path_token}"')

self._execute_and_check('token set {}'.format(path_token))
self._execute_and_check(f'token set {path_token}')

def getCOM_TOK(self):
result = self._execute_and_check('token print')
Expand Down Expand Up @@ -422,13 +396,12 @@ def getMlrLogs(self):
return processed_logs

def _getThciLogs(self):
return self._command("grep \"\\[ thci \\]\" {}".format(self.log_file))
return self._command(f"grep \"\\[ thci \\]\" {self.log_file}")

def _execute_and_check(self, command, check=True):
# Escape quotes for bash
command = command.replace('"', r'"\""')
response = self._command('{} execute "{}"'.format(
COMMISSIONER_CTL, command))
response = self._command(f'{COMMISSIONER_CTL} execute "{command}"')
if check:
response = OTCommissioner._check_response(response)
return response
Expand Down Expand Up @@ -457,29 +430,29 @@ def _write_config(self, config_path, config):

if config.isCcmMode:
if config.privateKey:
path = '/tmp/commissioner.private_key.{}'.format(uuid.uuid4())
path = f'/tmp/commissioner.private_key.{uuid.uuid4()}'
self._send_file(local_path=config.privateKey, remote_path=path)
data['PrivateKeyFile'] = path
if config.cert:
path = '/tmp/commissioner.cert.{}'.format(uuid.uuid4())
path = f'/tmp/commissioner.cert.{uuid.uuid4()}'
self._send_file(local_path=config.cert, remote_path=path)
data['CertificateFile'] = path
if config.trustAnchor:
path = '/tmp/commissioner.trush_anchor.{}'.format(uuid.uuid4())
path = f'/tmp/commissioner.trush_anchor.{uuid.uuid4()}'
self._send_file(local_path=config.trustAnchor, remote_path=path)
data['TrustAnchorFile'] = path

self._command("echo '{}' >> '{}'".format(json.dumps(data), config_path))
self._command(f"echo '{json.dumps(data)}' >> '{config_path}'")

def _send_file(self, local_path, remote_path):
with open(local_path, 'rb') as f:
b64 = base64.b64encode(f.read()).decode()
self._command('echo "{}" | base64 -d - > "{}"'.format(b64, remote_path))
self._command(f'echo "{b64}" | base64 -d - > "{remote_path}"')

@staticmethod
def _check_response(response):
if response[-1] != '[done]':
raise commissioner.Error('Error message:\n{!r}'.format(response))
raise commissioner.Error(f'Error message:\n{response!r}')
return response

@staticmethod
Expand Down
12 changes: 4 additions & 8 deletions tools/commissioner_thci/commissionerd.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#!/usr/bin/env python
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

# Copyright (c) 2019, The OpenThread Commissioner Authors.
Expand Down Expand Up @@ -30,8 +30,6 @@
Commissioner daemon process
"""

from __future__ import print_function

import os
import multiprocessing.connection
import sys
Expand Down Expand Up @@ -169,7 +167,7 @@ def _execute_command(self, command, wait_for_result=True):
if not self._process:
raise self._AppException('CLI not started yet')

print('Executing: "{}"'.format(command))
print(f'Executing: "{command}"')
self._process.sendline(command)
if wait_for_result:
try:
Expand All @@ -180,8 +178,7 @@ def _execute_command(self, command, wait_for_result=True):
self._process.kill(sig=signal.SIGINT)
try:
self._process.expect(r'> $', timeout=1)
return 'Timed out executing "{}"\n{}'.format(
command, self._process.before.decode())
return f'Timed out executing "{command}"\n{self._process.before.decode()}'
except pexpect.exceptions.TIMEOUT as e:
raise self._AppException(e)

Expand All @@ -203,8 +200,7 @@ def _exit(self):
def _discard_process_with_error(self):
message = self._process.before.decode()
self._reset_states()
raise self._AppException(
'CLI process exited with error:\n{}'.format(message))
raise self._AppException(f'CLI process exited with error:\n{message}')

def _reset_states(self):
self._process = None
Expand Down
5 changes: 2 additions & 3 deletions tools/commissioner_thci/example_send_mlr.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#!/usr/bin/env python
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

# Copyright (c) 2019, The OpenThread Commissioner Authors.
Expand Down Expand Up @@ -71,8 +71,7 @@ def test_mlr():
comm.start(BORDER_AGENT_ADDR, BORDER_AGENT_PORT)
assert comm.isActive()

print("commissioner connected, session ID = {}".format(
comm.getSessionId()))
print(f"commissioner connected, session ID = {comm.getSessionId()}")

## Send MLR.req
comm.MLR([MA1, MA2], 60)
Expand Down
Loading