From fdf4bf7e79790b85a487cda1debff20d247b015c Mon Sep 17 00:00:00 2001 From: Vignesh Rao Date: Thu, 28 Nov 2024 11:27:18 -0600 Subject: [PATCH] Simplify final draft before re-structure --- pyninja/macOS/draft.py | 77 +++++++++++++++++++++++------------------- 1 file changed, 42 insertions(+), 35 deletions(-) diff --git a/pyninja/macOS/draft.py b/pyninja/macOS/draft.py index d33f2dd..bba812c 100644 --- a/pyninja/macOS/draft.py +++ b/pyninja/macOS/draft.py @@ -1,61 +1,68 @@ import json import re import subprocess +from collections import defaultdict -def size_it(input_string): - match = re.search(r"\((\d+) Bytes\)", input_string) - if match: - return int(match.group(1)) +def parse_size(input_string): + """Extracts size in bytes from a string like '(12345 Bytes)'.""" + match = re.search(r'\((\d+) Bytes\)', input_string) + return int(match.group(1)) if match else 0 def update_mountpoints(disks, device_ids): + """Updates mount points for physical devices based on diskutil data.""" for disk in disks: - if disk.get("Mount Point"): - if disk.get("Part of Whole") in device_ids.keys(): - device_ids[disk["Part of Whole"]].append(disk.get("Mount Point")) + part_of_whole = disk.get("Part of Whole") + apfs_store = disk.get("APFS Physical Store", "") + if mount_point := disk.get("Mount Point"): + if part_of_whole in device_ids: + device_ids[part_of_whole].append(mount_point) for device_id in device_ids: - if disk.get("APFS Physical Store", "").startswith(device_id): - device_ids[device_id].append(disk.get("Mount Point")) + if apfs_store.startswith(device_id): + device_ids[device_id].append(mount_point) return device_ids -def diskutil_all(): - result = subprocess.run( - "diskutil info -all", shell=True, capture_output=True, text=True - ) +def parse_diskutil_output(output): + """Parses `diskutil info -all` output into structured data.""" disks = [] - data_dict = {} - for line in result.stdout.splitlines(): + disk_info = {} + for line in output.splitlines(): line = line.strip() if not line: continue if line == "**********": - disks.append(data_dict) - data_dict = {} + disks.append(disk_info) + disk_info = {} else: - key, value = line.split(":", 1) - data_dict[key.strip()] = value.strip() - data = [] - device_ids = {} + key, value = map(str.strip, line.split(":", 1)) + disk_info[key] = value + return disks + + +def diskutil_all(): + """Fetches disk information using `diskutil info -all`.""" + result = subprocess.run("diskutil info -all", shell=True, capture_output=True, text=True) + disks = parse_diskutil_output(result.stdout) + device_ids = defaultdict(list) + physical_disks = [] for disk in disks: if disk.get("Virtual") == "No": - new_dict = { - **{ - "Name": disk["Device / Media Name"], - "Size": size_it(disk["Disk Size"]), - "DeviceID": disk["Device Identifier"], - "Node": disk["Device Node"], - }, - } - data.append(new_dict) - device_ids[disk["Device Identifier"]] = [] + physical_disks.append({ + "Name": disk.get("Device / Media Name"), + "Size": parse_size(disk.get("Disk Size", "")), + "DeviceID": disk.get("Device Identifier"), + "Node": disk.get("Device Node"), + }) + # Instantiate default dict with keys as DeviceIDs and values as empty list + _ = device_ids[disk["Device Identifier"]] mountpoints = update_mountpoints(disks, device_ids) - for device in data: - device["Mountpoints"] = mountpoints[device["DeviceID"]] - return data + for disk in physical_disks: + disk["Mountpoints"] = mountpoints[disk["DeviceID"]] + return physical_disks -if __name__ == "__main__": +if __name__ == '__main__': dump = diskutil_all() print(json.dumps(dump, indent=2))