Skip to content

Commit

Permalink
Proposal for #236
Browse files Browse the repository at this point in the history
  • Loading branch information
kingofpayne committed Jul 30, 2021
1 parent 96875a8 commit 83e3e2d
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 18 deletions.
46 changes: 29 additions & 17 deletions speculos/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,31 @@ def check_status_code(response: requests.Response, url: str) -> None:

class ApduResponse:
def __init__(self, response: requests.Response) -> None:
self.response = response
self.__response = response

def receive(self) -> bytes:
check_status_code(self.response, "/apdu")
data, status = split_apdu(bytes.fromhex(self.response.json()["data"]))
if status != 0x9000:
raise ApduException(status, data)
return data
def __receive(self):
""" Get the response if not yet received and store it in `__data` and `__status` attributes """
if self.__response is not None:
check_status_code(self.__response, "/apdu")
apdu_message = bytes.fromhex(self.__response.json()["data"])
self.__data, self.__status = split_apdu(apdu_message)
self.__response = None

def data(self, expected_sw: int = 0x9000) -> bytes:
"""
:return: Received data (status word stripped)
:param expected_sw: Expected status word value.
:raises ApduException: If received status word is different from `expected_sw` parameter.
"""
self.__receive()
if self.__status != expected_sw:
raise ApduException(self.__status, self.__data)
return self.__data

def sw(self) -> int:
""" :return: Received status word """
self.__receive()
return self.__status


def split_apdu(data: bytes) -> Tuple[bytes, int]:
Expand Down Expand Up @@ -118,13 +135,8 @@ def get_screenshot(self) -> bytes:
check_status_code(response, "/screenshot")
return response.content

def _apdu_exchange(self, data: bytes) -> bytes:
with self.session.post(f"{self.api_url}/apdu", json={"data": data.hex()}) as response:
apdu_response = ApduResponse(response)
return apdu_response.receive()

def _apdu_exchange_nowait(self, data: bytes) -> requests.Response:
return self.session.post(f"{self.api_url}/apdu", json={"data": data.hex()}, stream=True)
def _apdu_exchange(self, data: bytes, stream: bool = False) -> requests.Response:
return self.session.post(f"{self.api_url}/apdu", json={"data": data.hex()}, stream=stream)

def set_automation_rules(self, rules: dict) -> None:
with self.session.post(f"{self.api_url}/automation", json=rules) as response:
Expand Down Expand Up @@ -195,9 +207,9 @@ def __init__(self, app: str, args: List[str] = [], api_url: str = "http://127.0.
SpeculosInstance.start(self)
Api.__init__(self, api_url)

def apdu_exchange(self, cla: int, ins: int, data: bytes = b"", p1: int = 0, p2: int = 0) -> bytes:
def apdu_exchange(self, cla: int, ins: int, data: bytes = b"", p1: int = 0, p2: int = 0) -> ApduResponse:
apdu = bytes([cla, ins, p1, p2, len(data)]) + data
return Api._apdu_exchange(self, apdu)
return ApduResponse(Api._apdu_exchange(self, apdu))

@contextmanager
def apdu_exchange_nowait(
Expand All @@ -206,7 +218,7 @@ def apdu_exchange_nowait(
apdu = bytes([cla, ins, p1, p2, len(data)]) + data
response = None
try:
response = Api._apdu_exchange_nowait(self, apdu)
response = Api._apdu_exchange(self, apdu, stream=True)
yield ApduResponse(response)
finally:
if response:
Expand Down
2 changes: 1 addition & 1 deletion tests/apps/test_btc.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ def test_btc_get_public_key_with_user_approval(client, app):
client.press_and_release("both")

with pytest.raises(speculos.client.ApduException) as e:
response.receive()
response.data()
assert e.value.sw == 0x6985


Expand Down

0 comments on commit 83e3e2d

Please sign in to comment.