-
Notifications
You must be signed in to change notification settings - Fork 4
/
demo-1-udisks-dbus
executable file
·168 lines (131 loc) · 6.6 KB
/
demo-1-udisks-dbus
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
#!/usr/bin/python3
import sys
import os
import dbus
SERVICE = 'org.freedesktop.UDisks2'
IFACE_PREFIX = 'org.freedesktop.UDisks2'
PATH_PREFIX = '/org/freedesktop/UDisks2'
NO_OPTIONS = dbus.Dictionary(signature="sv")
MAX_SWAP_SIZE = 1024 ** 3 # 1 GiB
MAX_SWAP_PORTION = 0.1 # 10 %
SWAP_LV_NAME = "swap"
SWAP_LABEL = "demoswap"
DATA_LV_NAME = "data"
DATA_LABEL = "demodata"
PASSPHRASE = "myshinylittlepassphrase"
def _get_block_dev_for_lv(bus, lv_obj_path):
new_blk_obj = None
obj_manager = bus.get_object(SERVICE, PATH_PREFIX)
managed_objects = obj_manager.GetManagedObjects(dbus_interface="org.freedesktop.DBus.ObjectManager").keys()
for obj_path in managed_objects:
# we are looking for a block device
if str(obj_path).startswith(PATH_PREFIX + "/block_devices"):
obj = bus.get_object(SERVICE, obj_path)
try:
cur_lv_obj_path = obj.Get(IFACE_PREFIX + ".Block.LVM2", "LogicalVolume", dbus_interface=dbus.PROPERTIES_IFACE)
except:
# not all block devices have such an interface (and property)
pass
else:
if cur_lv_obj_path == lv_obj_path:
return obj
def _create_and_format_lv(bus, vg_obj, lv_name, size, fmt, label=None, enc_passphrase=None):
lv_obj_path = vg_obj.CreatePlainVolume(lv_name, dbus.UInt64(size), NO_OPTIONS,
dbus_interface=IFACE_PREFIX + '.VolumeGroup')
# we need the block device DBus object for the LV
# BUG: there should be a property to get it easily!
new_blk_obj = _get_block_dev_for_lv(bus, lv_obj_path)
if new_blk_obj is None:
raise RuntimeError("Failed to get the block device object for a newly created LV!")
# the block device object can now be formatted with the desired parameters
opts = dbus.Dictionary(signature='sv')
if label:
opts["label"] = label
if enc_passphrase:
opts["encrypt.passphrase"] = enc_passphrase
new_blk_obj.Format(fmt, opts, dbus_interface=IFACE_PREFIX + '.Block')
def create_setup(bus, disk1, disk2, demo_name=sys.argv[0]):
# get only the device names (like "sda", "loop0",...)
if disk1.startswith("/dev/"):
disk1 = disk1[5:]
if disk2.startswith("/dev/"):
disk2 = disk2[5:]
# get the DBus objects for the devices
disk1_obj = bus.get_object(SERVICE, PATH_PREFIX + "/block_devices/" + disk1)
disk2_obj = bus.get_object(SERVICE, PATH_PREFIX + "/block_devices/" + disk2)
# wipe the existing metadata (if any)
disk1_obj.Format("empty", NO_OPTIONS, dbus_interface=IFACE_PREFIX + '.Block')
disk2_obj.Format("empty", NO_OPTIONS, dbus_interface=IFACE_PREFIX + '.Block')
# make sure modules are loaded, because we need LVM which is implemented in
# a module
manager = bus.get_object(SERVICE, PATH_PREFIX + "/Manager")
manager.EnableModules(True, dbus_interface=IFACE_PREFIX + '.Manager')
# create a new VG using disk1 and disk2 with the name based on this demo
# script's name
demo_name = os.path.basename(demo_name).rsplit(".")[0]
vg_name = demo_name.replace("-", "_")
manager = bus.get_object(SERVICE, PATH_PREFIX + "/Manager")
vg_path = manager.VolumeGroupCreate(vg_name, [disk1_obj, disk2_obj], NO_OPTIONS,
dbus_interface=IFACE_PREFIX + '.Manager.LVM2')
# get the VG's free space so that we know how much space with have for the LVs
vg_obj = bus.get_object(SERVICE, vg_path)
vg_free_size = vg_obj.Get(IFACE_PREFIX + ".VolumeGroup", "FreeSize", dbus_interface=dbus.PROPERTIES_IFACE)
# determine the size of the swap LV and create it
swap_size = min(MAX_SWAP_SIZE, vg_free_size * MAX_SWAP_PORTION)
_create_and_format_lv(bus, vg_obj, SWAP_LV_NAME, swap_size, "swap", label=SWAP_LABEL)
# determine the size of the data LV (the rest of the VG's space) and create it
vg_free_size = vg_obj.Get(IFACE_PREFIX + ".VolumeGroup", "FreeSize", dbus_interface=dbus.PROPERTIES_IFACE)
data_size = vg_free_size
_create_and_format_lv(bus, vg_obj, DATA_LV_NAME, data_size, "xfs", label=DATA_LABEL, enc_passphrase=PASSPHRASE)
def clean_setup(bus, disk1, disk2, demo_name=sys.argv[0]):
# we need to go from leaf devices to the disks
# so first we need to close the LUKS device on the 'data' LV so that it can be removed
# get the name of the VG
demo_name = os.path.basename(demo_name).rsplit(".")[0]
vg_name = demo_name.replace("-", "_")
# we also know the name of the LV so we can construct its object path
lv_obj_path = PATH_PREFIX + "/lvm/" + vg_name + "/" + DATA_LV_NAME
# but we need the block device
blk_obj = _get_block_dev_for_lv(bus, lv_obj_path)
if blk_obj is None:
raise RuntimeError("Failed to get the block device object for the %s LV!" % DATA_LV_NAME)
# now we can lock the LUKS
blk_obj.Lock(NO_OPTIONS, dbus_interface=IFACE_PREFIX + '.Encrypted')
lv_obj = bus.get_object(SERVICE, lv_obj_path)
lv_obj.Delete(NO_OPTIONS, dbus_interface=IFACE_PREFIX + '.LogicalVolume')
# now the same for the swap LV
lv_obj_path = PATH_PREFIX + "/lvm/" + vg_name + "/" + SWAP_LV_NAME
lv_obj = bus.get_object(SERVICE, lv_obj_path)
lv_obj.Delete(NO_OPTIONS, dbus_interface=IFACE_PREFIX + '.LogicalVolume')
# we also want to remove the VG (we could have done this even without
# removing the LVs first)
vg_obj_path = PATH_PREFIX + "/lvm/" + vg_name
vg_obj = bus.get_object(SERVICE, vg_obj_path)
# the first argument is 'wipe' -- whether to wipe metadata from the PVs
vg_obj.Delete(True, NO_OPTIONS, dbus_interface=IFACE_PREFIX + '.VolumeGroup')
if __name__ == "__main__":
if os.geteuid() != 0:
print("Requires to be run as root!", file=sys.stderr)
exit(1)
if len(sys.argv) < 3:
print("Requires at least two arguments!", file=sys.stderr)
print("Usage: %s [--cleanup] DISK1 DISK2", sys.argv[0], file=sys.stderr)
exit(1)
cleanup = False
if "--cleanup" in sys.argv:
if len(sys.argv) < 4:
print("--cleanup requires disks to clean!", file=sys.stderr)
print("Usage: %s [--cleanup] DISK1 DISK2", sys.argv[0], file=sys.stderr)
exit(1)
cleanup = True
sys.argv.remove("--cleanup")
disk1 = sys.argv[1]
disk2 = sys.argv[2]
bus = dbus.SystemBus()
if cleanup:
clean_setup(bus, disk1, disk2)
else:
ans = input("About to remove all existing metadata from %s and %s. Is this okay? [y/N] " % (disk1, disk2))
if ans not in ("y", "Y", "YES", "yes", "Yes"):
exit(0)
create_setup(bus, disk1, disk2)