Skip to content

Commit

Permalink
More match-statements
Browse files Browse the repository at this point in the history
  • Loading branch information
dainnilsson committed Oct 7, 2024
1 parent d932a42 commit f8b4eff
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 44 deletions.
34 changes: 19 additions & 15 deletions helper/helper/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,20 +52,21 @@ def _handle_incoming(event, recv, error, cmd_queue):
if not request:
break
try:
kind = request["kind"]
if kind == "signal":
# Cancel signals are handled here, the rest forwarded
if request["status"] == "cancel":
event.set()
else:
# Ignore other signals
logger.error("Unhandled signal: %r", request)
elif kind == "command":
cmd_queue.join() # Wait for existing command to complete
event.clear() # Reset event for next command
cmd_queue.put(request)
else:
error("invalid-command", "Unsupported request type")
match request["kind"]:
case "signal":
# Cancel signals are handled here, the rest forwarded
if request["status"] == "cancel":
logger.debug("Got cancel signal!")
event.set()
else:
# Ignore other signals
logger.error("Unhandled signal: %r", request)
case "command":
cmd_queue.join() # Wait for existing command to complete
event.clear() # Reset event for next command
cmd_queue.put(request)
case _:
error("invalid-command", "Unsupported request type")
except KeyError as e:
error("invalid-command", str(e))
except RpcException as e:
Expand Down Expand Up @@ -171,7 +172,10 @@ def send(data):
def recv():
line = b""
while not line.endswith(b"\n"):
chunk = sock.recv(1024)
try:
chunk = sock.recv(1024)
except ConnectionError:
return None
if not chunk:
return None
line += chunk
Expand Down
48 changes: 19 additions & 29 deletions helper/helper/yubiotp.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@
from yubikit.yubiotp import (
YubiOtpSession,
SLOT,
SlotConfiguration,
UpdateConfiguration,
HmacSha1SlotConfiguration,
HotpSlotConfiguration,
StaticPasswordSlotConfiguration,
YubiOtpSlotConfiguration,
StaticTicketSlotConfiguration,
)
from ykman.otp import generate_static_pw, format_csv
from yubikit.oath import parse_b32_key
Expand Down Expand Up @@ -108,15 +108,6 @@ def format_yubiotp_csv(
)


_CONFIG_TYPES = dict(
hmac_sha1=HmacSha1SlotConfiguration,
hotp=HotpSlotConfiguration,
static_password=StaticPasswordSlotConfiguration,
yubiotp=YubiOtpSlotConfiguration,
static_ticket=StaticTicketSlotConfiguration,
)


class SlotNode(RpcNode):
def __init__(self, session, slot):
super().__init__()
Expand Down Expand Up @@ -169,7 +160,8 @@ def calculate(self, event, challenge: str):
)
return dict(response=response)

def _apply_options(self, config, options):
@staticmethod
def _apply_options(config, options) -> None:
for option in (
"serial_api_visible",
"serial_usb_visible",
Expand Down Expand Up @@ -199,31 +191,29 @@ def _apply_options(self, config, options):
token_id, *args = options.pop("token_id")
config.token_id(bytes.fromhex(token_id), *args)

return config

def _get_config(self, type, **kwargs):
config = None

if type in _CONFIG_TYPES:
if type == "hmac_sha1":
config = _CONFIG_TYPES[type](bytes.fromhex(kwargs["key"]))
elif type == "hotp":
config = _CONFIG_TYPES[type](parse_b32_key(kwargs["key"]))
elif type == "static_password":
config = _CONFIG_TYPES[type](
@staticmethod
def _get_config(cfg_type: str, **kwargs) -> SlotConfiguration:
match cfg_type:
case "hmac_sha1":
return HmacSha1SlotConfiguration(bytes.fromhex(kwargs["key"]))
case "hotp":
return HotpSlotConfiguration(parse_b32_key(kwargs["key"]))
case "static_password":
return StaticPasswordSlotConfiguration(
encode(
kwargs["password"], KEYBOARD_LAYOUT[kwargs["keyboard_layout"]]
)
)
elif type == "yubiotp":
config = _CONFIG_TYPES[type](
case "yubiotp":
return YubiOtpSlotConfiguration(
fixed=modhex_decode(kwargs["public_id"]),
uid=bytes.fromhex(kwargs["private_id"]),
key=bytes.fromhex(kwargs["key"]),
)
else:
raise ValueError("No supported configuration type provided.")
return config
case unsupported:
raise ValueError(
f"Unsupported configuration type provided: {unsupported}"
)

@action
def put(
Expand Down Expand Up @@ -252,7 +242,7 @@ def update(
params,
acc_code: str | None = None,
curr_acc_code: str | None = None,
**kwargs
**kwargs,
):
config = UpdateConfiguration()
self._apply_options(config, kwargs)
Expand Down

0 comments on commit f8b4eff

Please sign in to comment.