Skip to content

Commit

Permalink
Time it on final draft for macOS drives
Browse files Browse the repository at this point in the history
  • Loading branch information
dormant-user committed Nov 28, 2024
1 parent fdf4bf7 commit a02de25
Showing 1 changed file with 42 additions and 6 deletions.
48 changes: 42 additions & 6 deletions pyninja/macOS/draft.py
Original file line number Diff line number Diff line change
@@ -1,29 +1,59 @@
import json
import logging
import re
import subprocess
import time
from collections import defaultdict

LOGGER = logging.getLogger(__name__)
LOGGER.setLevel(logging.DEBUG)
HANDLER = logging.StreamHandler()
DEFAULT_FORMATTER = logging.Formatter(
datefmt="%b-%d-%Y %I:%M:%S %p",
fmt="%(asctime)s - %(levelname)s - [%(module)s:%(lineno)d] - %(funcName)s - %(message)s",
)
HANDLER.setFormatter(DEFAULT_FORMATTER)
LOGGER.addHandler(HANDLER)


def time_it(func):
"""Decorator to measure the execution time of a function."""

def wrapper(*args, **kwargs):
start = time.time()
result = func(*args, **kwargs)
end = time.time()
LOGGER.info(f"Function '{func.__name__}' executed in {end - start:.4f} seconds.")
return result

return wrapper


@time_it
def parse_size(input_string):
"""Extracts size in bytes from a string like '(12345 Bytes)'."""
match = re.search(r'\((\d+) Bytes\)', input_string)
return int(match.group(1)) if match else 0
return int(match.group(1)) if match else None


@time_it
def update_mountpoints(disks, device_ids):
"""Updates mount points for physical devices based on diskutil data."""
for disk in disks:
part_of_whole = disk.get("Part of Whole")
apfs_store = disk.get("APFS Physical Store", "")
if mount_point := disk.get("Mount Point"):
mount_point = disk.get("Mount Point")
if mount_point and not mount_point.startswith("/System/Volumes/"):
if part_of_whole in device_ids:
device_ids[part_of_whole].append(mount_point)
for device_id in device_ids:
if apfs_store.startswith(device_id):
device_ids[device_id].append(mount_point)
else:
for device_id in device_ids:
if apfs_store.startswith(device_id):
device_ids[device_id].append(mount_point)
return device_ids


@time_it
def parse_diskutil_output(output):
"""Parses `diskutil info -all` output into structured data."""
disks = []
Expand All @@ -41,9 +71,15 @@ def parse_diskutil_output(output):
return disks


@time_it
def extract_raw_data() -> subprocess.CompletedProcess[str]:
"""Extract all diskutil info."""
return subprocess.run("diskutil info -all", shell=True, capture_output=True, text=True)


def diskutil_all():
"""Fetches disk information using `diskutil info -all`."""
result = subprocess.run("diskutil info -all", shell=True, capture_output=True, text=True)
result = extract_raw_data()
disks = parse_diskutil_output(result.stdout)
device_ids = defaultdict(list)
physical_disks = []
Expand Down

0 comments on commit a02de25

Please sign in to comment.