From 90eea876c38d0ff679706c0bd367f930776c4061 Mon Sep 17 00:00:00 2001 From: Vignesh Rao Date: Thu, 28 Nov 2024 11:16:40 -0600 Subject: [PATCH] Add draft scripts for macOS drive split --- pyninja/macOS/diskutil.py | 61 +++++++++++++++++++++++++++++++++++ pyninja/macOS/draft.py | 61 +++++++++++++++++++++++++++++++++++ pyninja/macOS/sys_profiler.py | 27 ++++++++++++++++ 3 files changed, 149 insertions(+) create mode 100644 pyninja/macOS/diskutil.py create mode 100644 pyninja/macOS/draft.py create mode 100644 pyninja/macOS/sys_profiler.py diff --git a/pyninja/macOS/diskutil.py b/pyninja/macOS/diskutil.py new file mode 100644 index 0000000..43940be --- /dev/null +++ b/pyninja/macOS/diskutil.py @@ -0,0 +1,61 @@ +import json +import re +import subprocess + + +def size_it(input_string): + match = re.search(r"\((\d+) Bytes\)", input_string) + if match: + return int(match.group(1)) + + +def get_mountpoints(disk_node): + result = subprocess.run( + [f"diskutil info -plist {disk_node}"], + shell=True, + capture_output=True, + text=True, + ) + import plistlib + + if mountpoint := plistlib.loads(result.stdout.encode())["MountPoint"]: + return mountpoint + else: + print("No mountpoint found!!") + return "/" + + +def diskutil_all(): + result = subprocess.run( + "diskutil info -all", shell=True, capture_output=True, text=True + ) + disks = [] + data_dict = {} + for line in result.stdout.splitlines(): + line = line.strip() + if not line: + continue + if line == "**********": + disks.append(data_dict) + data_dict = {} + else: + key, value = line.split(":", 1) + data_dict[key.strip()] = value.strip() + data = [] + for disk in disks: + if disk.get("Virtual") == "No": + new_dict = { + "Name": disk["Device / Media Name"], + "Size": size_it(disk["Disk Size"]), + "DeviceID": disk["Device Identifier"], + "Node": disk["Device Node"], + "Mountpoints": get_mountpoints(disk["Device Node"]), + } + new_dict["total"] = new_dict["Size"] + data.append(new_dict) + return data + + +if __name__ == "__main__": + data = diskutil_all() + print(json.dumps(data, indent=2)) diff --git a/pyninja/macOS/draft.py b/pyninja/macOS/draft.py new file mode 100644 index 0000000..d33f2dd --- /dev/null +++ b/pyninja/macOS/draft.py @@ -0,0 +1,61 @@ +import json +import re +import subprocess + + +def size_it(input_string): + match = re.search(r"\((\d+) Bytes\)", input_string) + if match: + return int(match.group(1)) + + +def update_mountpoints(disks, device_ids): + for disk in disks: + if disk.get("Mount Point"): + if disk.get("Part of Whole") in device_ids.keys(): + device_ids[disk["Part of Whole"]].append(disk.get("Mount Point")) + for device_id in device_ids: + if disk.get("APFS Physical Store", "").startswith(device_id): + device_ids[device_id].append(disk.get("Mount Point")) + return device_ids + + +def diskutil_all(): + result = subprocess.run( + "diskutil info -all", shell=True, capture_output=True, text=True + ) + disks = [] + data_dict = {} + for line in result.stdout.splitlines(): + line = line.strip() + if not line: + continue + if line == "**********": + disks.append(data_dict) + data_dict = {} + else: + key, value = line.split(":", 1) + data_dict[key.strip()] = value.strip() + data = [] + device_ids = {} + for disk in disks: + if disk.get("Virtual") == "No": + new_dict = { + **{ + "Name": disk["Device / Media Name"], + "Size": size_it(disk["Disk Size"]), + "DeviceID": disk["Device Identifier"], + "Node": disk["Device Node"], + }, + } + data.append(new_dict) + device_ids[disk["Device Identifier"]] = [] + mountpoints = update_mountpoints(disks, device_ids) + for device in data: + device["Mountpoints"] = mountpoints[device["DeviceID"]] + return data + + +if __name__ == "__main__": + dump = diskutil_all() + print(json.dumps(dump, indent=2)) diff --git a/pyninja/macOS/sys_profiler.py b/pyninja/macOS/sys_profiler.py new file mode 100644 index 0000000..8f8c335 --- /dev/null +++ b/pyninja/macOS/sys_profiler.py @@ -0,0 +1,27 @@ +import json +import subprocess + + +def sys_profiler(): + result = subprocess.run( + ["/usr/sbin/system_profiler", "SPStorageDataType", "-json"], + capture_output=True, + text=True, + ) + data = json.loads(result.stdout) + usable_volumes = [] + for volume in data["SPStorageDataType"]: + if volume.get("writable") == "yes": + usable_volumes.append(volume) + for mlist in usable_volumes: + yield { + "DeviceID": mlist["_name"], + "Size": mlist["size_in_bytes"], + "Name": mlist["physical_drive"]["device_name"], + "Mountpoints": mlist["mount_point"], + } + + +if __name__ == "__main__": + data = list(sys_profiler()) + print(json.dumps(data, indent=2))