Skip to content

Commit

Permalink
Refactor DeviceInfo caching
Browse files Browse the repository at this point in the history
  • Loading branch information
dainnilsson committed Sep 13, 2024
1 parent 5a24f57 commit d8e9cf3
Showing 1 changed file with 25 additions and 28 deletions.
53 changes: 25 additions & 28 deletions helper/helper/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -271,9 +271,13 @@ def __call__(self, *args, **kwargs):
try:
response = super().__call__(*args, **kwargs)
if "device_info" in response.flags:
# Clear DeviceInfo cache
self._info = None
self._data = None
old_info = self._info
# Refresh data, and close any open child
self._close_child()
self._data = self._refresh_data()
if old_info == self._info:
# No change to DeviceInfo, further propagation not needed.
response.flags.remove("device_info")

return response

Expand Down Expand Up @@ -318,9 +322,17 @@ def _supports_connection(self, conn_type):

def _create_connection(self, conn_type):
connection = self._device.open_connection(conn_type)
self._data = self._read_data(connection)
return ConnectionNode(self._device, connection, self._info)

def _refresh_data(self):
# Re-use existing connection if possible
if self._child and not self._child.closed:
# Make sure to close any open session
self._child._close_child()
return self._read_data(self._child._connection)

# New connection
for conn_type in (SmartCardConnection, OtpConnection, FidoConnection):
if self._supports_connection(conn_type):
try:
Expand Down Expand Up @@ -398,14 +410,17 @@ def get_data(self):
self._data = None
return super().get_data()

def _read_data(self, conn):
return dict(super()._read_data(conn), present=True)

def _refresh_data(self):
card = self._observer.card
if card is None:
return dict(present=False, status="no-card")
try:
with self._device.open_connection(SmartCardConnection) as conn:
try:
data = dict(self._read_data(conn), present=True)
data = self._read_data(conn)
except ValueError:
# Unknown device, maybe NFC restricted
try:
Expand Down Expand Up @@ -434,8 +449,8 @@ def get(self, params, event, signal):
def ccid(self):
try:
connection = self._device.open_connection(SmartCardConnection)
info = read_info(connection)
return ScpConnectionNode(self._device, connection, info)
self._data = self._read_data(connection)
return ScpConnectionNode(self._device, connection, self._info)
except (ValueError, SmartcardException, EstablishContextException) as e:
logger.warning("Error opening connection", exc_info=True)
raise ConnectionException(self._device.fingerprint, "ccid", e)
Expand All @@ -444,9 +459,9 @@ def ccid(self):
def fido(self):
try:
with self._device.open_connection(SmartCardConnection) as conn:
info = read_info(conn)
self._data = self._read_data(conn)
connection = self._device.open_connection(FidoConnection)
return ConnectionNode(self._device, connection, info)
return ConnectionNode(self._device, connection, self._info)
except (ValueError, SmartcardException, EstablishContextException) as e:
logger.warning("Error opening connection", exc_info=True)
raise ConnectionException(self._device.fingerprint, "fido", e)
Expand All @@ -458,24 +473,11 @@ def __init__(self, device, connection, info):
self._device = device
self._transport = device.transport
self._connection = connection
self._info = info or read_info(self._connection, device.pid)
self._info = info

def __call__(self, *args, **kwargs):
try:
response = super().__call__(*args, **kwargs)
if "device_info" in response.flags:
# Refresh DeviceInfo
info = read_info(self._connection, self._device.pid)
if self._info != info:
self._info = info
# Make sure any child node is re-opened after this,
# as enabled applications may have changed
self.close()
else:
# No change to DeviceInfo, further propagation not needed.
response.flags.remove("device_info")

return response
return super().__call__(*args, **kwargs)
except (SmartcardException, OSError) as e:
logger.exception("Connection error")
raise ChildResetException(f"{e}")
Expand Down Expand Up @@ -504,11 +506,6 @@ def close(self):
logger.warning("Error closing connection", exc_info=True)

def get_data(self):
if (
isinstance(self._connection, SmartCardConnection)
or self._transport == TRANSPORT.USB
):
self._info = read_info(self._connection, self._device.pid)
return dict(version=self._info.version, serial=self._info.serial)

def _init_child_node(self, child_cls, capability=CAPABILITY(0)):
Expand Down

0 comments on commit d8e9cf3

Please sign in to comment.