diff --git a/tools/commissioner_thci/commissioner.py b/tools/commissioner_thci/commissioner.py index fecd24dc..c9b1c192 100644 --- a/tools/commissioner_thci/commissioner.py +++ b/tools/commissioner_thci/commissioner.py @@ -1,4 +1,4 @@ -#!/usr/bin/env python +#!/usr/bin/env python3 # -*- coding: utf-8 -*- # Copyright (c) 2019, The OpenThread Commissioner Authors. diff --git a/tools/commissioner_thci/commissioner_ctl.py b/tools/commissioner_thci/commissioner_ctl.py index f7e7b038..8a4e0cb6 100755 --- a/tools/commissioner_thci/commissioner_ctl.py +++ b/tools/commissioner_thci/commissioner_ctl.py @@ -1,4 +1,4 @@ -#!/usr/bin/env python +#!/usr/bin/env python3 # -*- coding: utf-8 -*- # Copyright (c) 2019, The OpenThread Commissioner Authors. @@ -30,8 +30,6 @@ Commissioner daemon controller """ -from __future__ import print_function - import os import argparse import multiprocessing.connection diff --git a/tools/commissioner_thci/commissioner_impl.py b/tools/commissioner_thci/commissioner_impl.py index 38479e39..10cc7a64 100644 --- a/tools/commissioner_thci/commissioner_impl.py +++ b/tools/commissioner_thci/commissioner_impl.py @@ -1,4 +1,4 @@ -#!/usr/bin/env python +#!/usr/bin/env python3 # -*- coding: utf-8 -*- # Copyright (c) 2019, The OpenThread Commissioner Authors. @@ -142,16 +142,15 @@ def __init__(self, config, handler, simulator=None): self._handler = handler self._lines = [] - self._command('stty cols {}'.format(TTY_COLS)) + self._command(f'stty cols {TTY_COLS}') - config_path = '/tmp/commissioner.{}.json'.format(uuid.uuid4()) + config_path = f'/tmp/commissioner.{uuid.uuid4()}.json' self._write_config(config_path=config_path, config=config) - response = self._command('{} init "{}"'.format(COMMISSIONER_CTL, - config_path)) + response = self._command(f'{COMMISSIONER_CTL} init "{config_path}"') if self._command('echo $?')[0] != '0': - raise commissioner.Error('Failed to init, error:\n{}'.format( - '\n'.join(response))) + raise commissioner.Error( + f"Failed to init, error:\n{'\n'.join(response)}") @staticmethod def makeLocalCommissioner(config, simulator): @@ -166,12 +165,9 @@ def makeHarnessCommissioner(config, serial_handler): return OTCommissioner(config, serial_handler) def start(self, borderAgentAddr, borderAgentPort): - self._command('sudo rm {}'.format(self.log_file)) - self._command('sudo touch {}'.format(self.log_file)) - self._execute_and_check('start {} {}'.format( - borderAgentAddr, - borderAgentPort, - )) + self._command(f'sudo rm {self.log_file}') + self._command(f'sudo touch {self.log_file}') + self._execute_and_check(f'start {borderAgentAddr} {borderAgentPort}') def stop(self): self._execute_and_check('stop') @@ -183,8 +179,7 @@ def isActive(self): elif 'false' in response[0]: return False else: - raise commissioner.Error('Unrecognized result "{}"'.format( - response[0])) + raise commissioner.Error(f'Unrecognized result "{response[0]}"') def getSessionId(self): response = self._execute_and_check('sessionid') @@ -194,7 +189,7 @@ def getSessionId(self): raise_(commissioner.Error, repr(e), sys.exc_info()[2]) def MGMT_COMMISSIONER_GET(self, tlvTypes): - types = ' '.join(map(lambda x: TLV_TYPE_TO_STRING[x], tlvTypes)) + types = ' '.join([TLV_TYPE_TO_STRING[x] for x in tlvTypes]) command = 'commdataset get ' + types response = self._execute_and_check(command) @@ -226,7 +221,7 @@ def MGMT_COMMISSIONER_SET(self, commDataset): TLV_TYPE_TO_STRING[key]: commDataset[key] for key in commDataset } data = json.dumps(dataset) - self._execute_and_check("commdataset set '{}'".format(data)) + self._execute_and_check(f"commdataset set '{data}'") def enableJoiner(self, joinerType, eui64=None, password=None): command = ['joiner', 'enable', JOINER_TYPE_TO_STRING[joinerType]] @@ -252,9 +247,8 @@ def disableJoiner(self, joinerType, eui64=None): self._execute_and_check(' '.join(command)) def MGMT_ACTIVE_GET(self, tlvTypes): - types = ' '.join(map(lambda x: TLV_TYPE_TO_STRING[x], tlvTypes)) - result = self._execute_and_check( - 'opdataset get active {}'.format(types)) + types = ' '.join([TLV_TYPE_TO_STRING[x] for x in tlvTypes]) + result = self._execute_and_check(f'opdataset get active {types}') try: return OTCommissioner._active_op_dataset_from_json(' '.join( @@ -263,13 +257,13 @@ def MGMT_ACTIVE_GET(self, tlvTypes): raise_(commissioner.Error, repr(e), sys.exc_info()[2]) def MGMT_ACTIVE_SET(self, activeOpDataset): - self._execute_and_check("opdataset set active '{}'".format( - OTCommissioner._active_op_dataset_to_json(activeOpDataset))) + self._execute_and_check( + f"opdataset set active '{OTCommissioner._active_op_dataset_to_json(activeOpDataset)}'" + ) def MGMT_PENDING_GET(self, tlvTypes): - types = ' '.join(map(lambda x: TLV_TYPE_TO_STRING[x], tlvTypes)) - result = self._execute_and_check( - 'opdataset get pending {}'.format(types)) + types = ' '.join([TLV_TYPE_TO_STRING[x] for x in tlvTypes]) + result = self._execute_and_check(f'opdataset get pending {types}') try: return OTCommissioner._pending_op_dataset_from_json(' '.join( @@ -278,12 +272,13 @@ def MGMT_PENDING_GET(self, tlvTypes): raise_(commissioner.Error, repr(e), sys.exc_info()[2]) def MGMT_PENDING_SET(self, pendingOpDataset): - self._execute_and_check("opdataset set pending '{}'".format( - OTCommissioner._pending_op_dataset_to_json(pendingOpDataset))) + self._execute_and_check( + f"opdataset set pending '{OTCommissioner._pending_op_dataset_to_json(pendingOpDataset)}'" + ) def MGMT_BBR_GET(self, tlvTypes): - types = ' '.join(map(lambda x: TLV_TYPE_TO_STRING[x], tlvTypes)) - result = self._execute_and_check('bbrdataset get {}'.format(types)) + types = ' '.join([TLV_TYPE_TO_STRING[x] for x in tlvTypes]) + result = self._execute_and_check(f'bbrdataset get {types}') try: result = json.loads(' '.join(result[:-1])) @@ -296,49 +291,33 @@ def MGMT_BBR_SET(self, bbrDataset): TLV_TYPE_TO_STRING[key]: bbrDataset[key] for key in bbrDataset } dataset = json.dumps(dataset) - self._execute_and_check("bbrdataset set '{}'".format(dataset)) + self._execute_and_check(f"bbrdataset set '{dataset}'") def MLR(self, multicastAddrs, timeout): - self._execute_and_check('mlr {} {}'.format( - ' '.join(multicastAddrs), - timeout, - ), + self._execute_and_check(f"mlr {' '.join(multicastAddrs)} {timeout}", check=False) def MGMT_ANNOUNCE_BEGIN(self, channelMask, count, period, dstAddr): - self._execute_and_check('announce {} {} {} {}'.format( - channelMask, - count, - period, - dstAddr, - )) + self._execute_and_check( + f'announce {channelMask} {count} {period} {dstAddr}') def MGMT_PANID_QUERY(self, channelMask, panId, dstAddr, timeout): - self._execute_and_check('panid query {} {} {}'.format( - channelMask, - panId, - dstAddr, - )) + self._execute_and_check(f'panid query {channelMask} {panId} {dstAddr}') self._sleep(timeout) - result = self._execute_and_check('panid conflict {}'.format(panId)) + result = self._execute_and_check(f'panid conflict {panId}') result = int(result[0]) return False if result == 0 else True def MGMT_ED_SCAN(self, channelMask, count, period, scanDuration, dstAddr, timeout): - self._execute_and_check('energy scan {} {} {} {} {}'.format( - channelMask, - count, - period, - scanDuration, - dstAddr, - )) + self._execute_and_check( + f'energy scan {channelMask} {count} {period} {scanDuration} {dstAddr}' + ) self._sleep(timeout) - result = self._execute_and_check('energy report {}'.format(dstAddr)) + result = self._execute_and_check(f'energy report {dstAddr}') if result[0] == 'null': - raise commissioner.Error( - 'No energy report found for {}'.format(dstAddr)) + raise commissioner.Error(f'No energy report found for {dstAddr}') try: result = json.loads(' '.join(result[:-1])) @@ -352,31 +331,26 @@ def MGMT_ED_SCAN(self, channelMask, count, period, scanDuration, dstAddr, raise_(commissioner.Error, repr(e), sys.exc_info()[2]) def MGMT_REENROLL(self, dstAddr): - self._execute_and_check('reenroll {}'.format(dstAddr)) + self._execute_and_check(f'reenroll {dstAddr}') def MGMT_DOMAIN_RESET(self, dstAddr): - self._execute_and_check('domainreset {}'.format(dstAddr)) + self._execute_and_check(f'domainreset {dstAddr}') def MGMT_NET_MIGRATE(self, dstAddr, designatedNetwork): - self._execute_and_check('migrate {} {}'.format( - dstAddr, - designatedNetwork, - )) + self._execute_and_check(f'migrate {dstAddr} {designatedNetwork}') def requestCOM_TOK(self, registrarAddr, registrarPort): - self._execute_and_check('token request {} {}'.format( - registrarAddr, - registrarPort, - )) + self._execute_and_check( + f'token request {registrarAddr} {registrarPort}') def setCOM_TOK(self, signedCOM_TOK): - path_token = '/tmp/commissioner.token.{}'.format(uuid.uuid4()) + path_token = f'/tmp/commissioner.token.{uuid.uuid4()}' step = 40 for i in range(0, len(signedCOM_TOK), step): data = self._bytes_to_hex(signedCOM_TOK[i:i + step]) - self._command('echo {} >> "{}"'.format(data, path_token)) + self._command(f'echo {data} >> "{path_token}"') - self._execute_and_check('token set {}'.format(path_token)) + self._execute_and_check(f'token set {path_token}') def getCOM_TOK(self): result = self._execute_and_check('token print') @@ -422,13 +396,12 @@ def getMlrLogs(self): return processed_logs def _getThciLogs(self): - return self._command("grep \"\\[ thci \\]\" {}".format(self.log_file)) + return self._command(f"grep \"\\[ thci \\]\" {self.log_file}") def _execute_and_check(self, command, check=True): # Escape quotes for bash command = command.replace('"', r'"\""') - response = self._command('{} execute "{}"'.format( - COMMISSIONER_CTL, command)) + response = self._command(f'{COMMISSIONER_CTL} execute "{command}"') if check: response = OTCommissioner._check_response(response) return response @@ -457,29 +430,29 @@ def _write_config(self, config_path, config): if config.isCcmMode: if config.privateKey: - path = '/tmp/commissioner.private_key.{}'.format(uuid.uuid4()) + path = f'/tmp/commissioner.private_key.{uuid.uuid4()}' self._send_file(local_path=config.privateKey, remote_path=path) data['PrivateKeyFile'] = path if config.cert: - path = '/tmp/commissioner.cert.{}'.format(uuid.uuid4()) + path = f'/tmp/commissioner.cert.{uuid.uuid4()}' self._send_file(local_path=config.cert, remote_path=path) data['CertificateFile'] = path if config.trustAnchor: - path = '/tmp/commissioner.trush_anchor.{}'.format(uuid.uuid4()) + path = f'/tmp/commissioner.trush_anchor.{uuid.uuid4()}' self._send_file(local_path=config.trustAnchor, remote_path=path) data['TrustAnchorFile'] = path - self._command("echo '{}' >> '{}'".format(json.dumps(data), config_path)) + self._command(f"echo '{json.dumps(data)}' >> '{config_path}'") def _send_file(self, local_path, remote_path): with open(local_path, 'rb') as f: b64 = base64.b64encode(f.read()).decode() - self._command('echo "{}" | base64 -d - > "{}"'.format(b64, remote_path)) + self._command(f'echo "{b64}" | base64 -d - > "{remote_path}"') @staticmethod def _check_response(response): if response[-1] != '[done]': - raise commissioner.Error('Error message:\n{!r}'.format(response)) + raise commissioner.Error(f'Error message:\n{response!r}') return response @staticmethod diff --git a/tools/commissioner_thci/commissionerd.py b/tools/commissioner_thci/commissionerd.py index 7571a800..3f09f38c 100755 --- a/tools/commissioner_thci/commissionerd.py +++ b/tools/commissioner_thci/commissionerd.py @@ -1,4 +1,4 @@ -#!/usr/bin/env python +#!/usr/bin/env python3 # -*- coding: utf-8 -*- # Copyright (c) 2019, The OpenThread Commissioner Authors. @@ -30,8 +30,6 @@ Commissioner daemon process """ -from __future__ import print_function - import os import multiprocessing.connection import sys @@ -169,7 +167,7 @@ def _execute_command(self, command, wait_for_result=True): if not self._process: raise self._AppException('CLI not started yet') - print('Executing: "{}"'.format(command)) + print(f'Executing: "{command}"') self._process.sendline(command) if wait_for_result: try: @@ -180,8 +178,7 @@ def _execute_command(self, command, wait_for_result=True): self._process.kill(sig=signal.SIGINT) try: self._process.expect(r'> $', timeout=1) - return 'Timed out executing "{}"\n{}'.format( - command, self._process.before.decode()) + return f'Timed out executing "{command}"\n{self._process.before.decode()}' except pexpect.exceptions.TIMEOUT as e: raise self._AppException(e) @@ -203,8 +200,7 @@ def _exit(self): def _discard_process_with_error(self): message = self._process.before.decode() self._reset_states() - raise self._AppException( - 'CLI process exited with error:\n{}'.format(message)) + raise self._AppException(f'CLI process exited with error:\n{message}') def _reset_states(self): self._process = None diff --git a/tools/commissioner_thci/example_send_mlr.py b/tools/commissioner_thci/example_send_mlr.py index 938473ac..f6c1d009 100644 --- a/tools/commissioner_thci/example_send_mlr.py +++ b/tools/commissioner_thci/example_send_mlr.py @@ -1,4 +1,4 @@ -#!/usr/bin/env python +#!/usr/bin/env python3 # -*- coding: utf-8 -*- # Copyright (c) 2019, The OpenThread Commissioner Authors. @@ -71,8 +71,7 @@ def test_mlr(): comm.start(BORDER_AGENT_ADDR, BORDER_AGENT_PORT) assert comm.isActive() - print("commissioner connected, session ID = {}".format( - comm.getSessionId())) + print(f"commissioner connected, session ID = {comm.getSessionId()}") ## Send MLR.req comm.MLR([MA1, MA2], 60)