-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathlibcfbf.py
519 lines (411 loc) · 14.5 KB
/
libcfbf.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
# This file is part of the libcfbf distribution (https://git.bobc.io/bobc/libcfbf).
# Copyright (c) 2020 Bob Carroll ([email protected])
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, version 2.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import math
from itertools import chain, takewhile
from io import BytesIO, SEEK_CUR, SEEK_END
class CfbfDocument(object):
"""
An implementation of Microsoft's Compound File Binary Format.
:param file: an open file resource
:param header: a CFBF header structure
"""
LITTLE_ENDIAN = b'\xfe\xff'
FREE_SEC_ID = -1
END_OF_CHAIN_ID = -2
SAT_SEC_ID = -3
MSAT_SEC_ID = -4
def __init__(self, file, header):
self.header = header
self._file = file
self._sector_size = int(math.pow(2, header.sector_size))
self._sector_at = None
self._file_size = None
self._root_dir = None
def __del__(self):
self.close()
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
@property
def sector_size(self):
"""
:returns: the width of a sector in bytes
"""
return self._sector_size
@property
def file_size(self):
"""
:returns: the byte size of the file
"""
if self._file_size is None:
self._file.seek(0, SEEK_END)
self._file_size = self._file.tell()
return self._file_size
@property
def sector_alloc_table(self):
"""
:returns: the Sector Allocation Table
"""
if not self._sector_at:
self._sector_at = list(self.build_sector_alloc_table())
return self._sector_at
@property
def root_entry(self):
"""
:returns: the root directory entry
"""
if self._root_dir is None:
self._root_dir = DirectoryEntry.open(self)
return self._root_dir
@classmethod
def open(cls, path):
"""
Opens a CFBF file for reading.
:param path: document file path
:returns: a document object
"""
file = open(path, 'rb')
try:
header = CfbfHeader.read(file)
if header.byte_order != cls.LITTLE_ENDIAN:
raise CfbfError('Only little-endian encoding is supported')
document = cls(file, header)
except: # noqa: E722
file.close()
raise
else:
return document
def close(self):
"""
Closes the file stream.
"""
if self._file:
self._file.flush()
self._file.close()
self._file = None
def compute_sector_offset(self, sec_id):
"""
Compute a sector's byte offset in the file.
:param sec_id: sector ID
:returns: a byte offset
"""
return self.header.size + (sec_id * self.sector_size)
def read_sector(self, sec_id):
"""
Reads sector data from the file.
:param sec_id: sector ID
:returns: sector data
"""
if sec_id < 0:
raise CfbfError('Bad sector ID')
offset = self.compute_sector_offset(sec_id)
if offset >= self.file_size:
raise CfbfError('End of file')
self._file.seek(offset)
return self._file.read(self.sector_size)
def partition_sectors(self, data):
"""
Splits the buffer into sector IDs.
:param data: buffer to read
:returns: a list of sector IDs
"""
n = self.header.SEC_ID_SIZE
if len(data) % n != 0:
raise CfbfError('Buffer length is not divisible by sector ID size')
return [btoi(data[i:i + n]) for i in range(0, len(data), n)]
def walk_msat_sectors(self, sec_id, results, chain=None):
"""
Reads the extended MSAT from sectors.
:param sec_id: the sector ID to read
:param results: running list of results
:param chain: running set of sector IDs visited
:returns: a set of MSAT sector IDs
"""
if chain is None:
chain = set()
sector = self.partition_sectors(self.read_sector(sec_id))
results.extend(takewhile(lambda x: x != self.FREE_SEC_ID, sector[:-1]))
if sector[-1] == self.END_OF_CHAIN_ID:
return chain | {sec_id}
elif sector[-1] in chain:
raise CfbfError('Extended Master Sector Allocation Table is corrupted')
return self.walk_msat_sectors(sector[-1], results, chain | {sec_id})
def read_msat_sectors(self):
"""
Reads the Master Sector Allocation Table and generates a list of sectors.
:return: a tuple of a list of SAT sector IDs and a list of MSAT sector IDs
"""
self._file.seek(self.header.master_sat_offset)
data = self._file.read(self.header.MSAT_SIZE)
sat_ids = self.partition_sectors(data)[:self.header.sat_sectors]
if self.header.master_sat_first != self.END_OF_CHAIN_ID:
msat_ids = self.walk_msat_sectors(self.header.master_sat_first, sat_ids)
else:
msat_ids = []
return (sat_ids, msat_ids)
def check_msat_integrity(self, msat, ids):
"""
Perform integrity checking on the Master Sector Allocation Table.
:param msat: the MSAT
:param ids: set of sector IDs for the extended table
:returns: True if clean, False otherwise
"""
if msat[0] < 0:
return False
elif len(ids) != self.header.master_sat_count:
return False
visited = set()
for sec_id in takewhile(lambda x: x != self.FREE_SEC_ID, msat):
if sec_id * self.sector_size + self.header.size >= self.file_size:
return False
elif sec_id in visited:
return False
else:
visited.add(sec_id)
return True
def build_sector_alloc_table(self):
"""
Builds the Sector Allocation Table as an array of sector ID chains.
:returns: the SAT
"""
def raise_():
raise CfbfError('Master Sector Allocation Table is corrupted')
def check(x):
if x[0] in msat and x[1] != self.SAT_SEC_ID:
raise_()
if x[0] in msat_ids and x[1] != self.MSAT_SEC_ID:
raise_()
else:
return x[1]
msat, msat_ids = self.read_msat_sectors()
if not self.check_msat_integrity(msat, msat_ids):
raise_()
return map(check, enumerate(
chain(*(self.partition_sectors(self.read_sector(x)) for x in msat))))
def walk_sector_chain(self, start):
"""
Generator for discovering sector IDs from the Sector Allocation Table.
:param start: entry point in the SAT where the chain begins
:returns: an iterator that yields sector IDs
"""
index = start
visited = set()
def raise_():
raise CfbfError('Sector Allocation Table is corrupted')
while index != self.END_OF_CHAIN_ID:
if index < 0 or index in visited:
raise raise_()
yield index
visited.add(index)
try:
index = self.sector_alloc_table[index]
except IndexError:
raise_()
def concat_sectors(self, start):
"""
Creates a concatenated byte string of sectors.
:param start: the starting sector ID
:returns: a byte string
"""
buf = b''
for x in self.walk_sector_chain(start):
buf += self.read_sector(x)
return buf
class CfbfHeader(object):
"""
Compound Document Format header structure.
"""
CFBF_MAGIC_NUMBER = b'\xd0\xcf\x11\xe0\xa1\xb1\x1a\xe1'
SEC_ID_SIZE = 4
MSAT_SIZE = 436
def __init__(self, file):
self.uid = file.read(16)
self.revision = btoi(file.read(2))
self.version = btoi(file.read(2))
self.byte_order = file.read(2)
self.sector_size = btoi(file.read(2))
self.short_sector_size = btoi(file.read(2))
file.seek(10, SEEK_CUR)
self.sat_sectors = btoi(file.read(4))
self.directory_sector = btoi(file.read(4))
file.seek(4, SEEK_CUR)
self.min_stream_size = btoi(file.read(4))
self.short_sat_first = btoi(file.read(4))
self.short_sat_count = btoi(file.read(4))
self.master_sat_first = btoi(file.read(4))
self.master_sat_count = btoi(file.read(4))
self._master_sat_offset = file.tell()
self.master_sat = file.read(self.MSAT_SIZE)
self._header_size = file.tell()
@property
def master_sat_offset(self):
"""
:returns: the offset of the start of the MSAT
"""
return self._master_sat_offset
@property
def size(self):
"""
:returns: the width of the header in bytes
"""
return self._header_size
@classmethod
def read(cls, file):
"""
Reads the CFBF file header into a structure.
:param file: an open file resource
:returns: a CFBF header structure
"""
file.seek(0)
if file.read(8) != cls.CFBF_MAGIC_NUMBER:
raise CfbfError('Bad magic number')
return cls(file)
class CfbfError(Exception):
"""
Catch-all exception for CFBF read errors.
"""
pass
class DirectoryEntry(object):
"""
Compound Document Format storage container.
:param document: an open CFBF document
:param stream: directory entry byte stream
"""
DIR_ENTRY_SIZE = 128
DIR_ENTRY_NULL = -1
TYPE_EMPTY = 0x0
TYPE_USER_STORAGE = 0x1
TYPE_USER_STREAM = 0x2
TYPE_LOCK_BYTES = 0x3
TYPE_PROPERTY = 0x4
TYPE_ROOT_STORAGE = 0x5
def __init__(self, document, stream):
self._document = document
self._stream = stream
self._children = None
name = stream.read(64)
self.entry_name_sz = btoi(stream.read(2))
self.entry_type = btoi(stream.read(1))
self.node_color = btoi(stream.read(1))
self.node_left = btoi(stream.read(4))
self.node_right = btoi(stream.read(4))
self.node_root = btoi(stream.read(4))
self.uid = stream.read(16)
self.user_flags = stream.read(4)
self.created_at = stream.read(8)
self.modified_at = stream.read(8)
self.stream_sector = btoi(stream.read(4))
self.stream_size = btoi(stream.read(4))
stream.seek(4, SEEK_CUR)
self.entry_name = str(name[:self.entry_name_sz - 2], 'utf-16-le')
def __getitem__(self, key):
"""
Looks up children by name. Key can be a path separated by /.
:param key: child entry name to find
:returns: the entry or None
"""
ent = next((x for x in self.children if x.entry_name == key), None)
if ent is None:
raise KeyError(key)
return ent
@property
def storage_type(self):
"""
:returns: True if this entry is a storage type, False otherwise
"""
return self.entry_type in [self.TYPE_USER_STORAGE, self.TYPE_ROOT_STORAGE]
@property
def stream_type(self):
"""
:returns: True if this entry is a user stream, False otherwise
"""
return self.entry_type == self.TYPE_USER_STREAM
@property
def children(self):
"""
:returns: a list of all direct child entries
"""
if self._children is None:
self._children = self.find_children()
return self._children
def compute_offset(self, id_):
"""
Computes the byte offset of the given directory entry.
:param id_: directory entry ID
:return: stream offset where entry begins
"""
return id_ * self.DIR_ENTRY_SIZE
@classmethod
def open(cls, document):
"""
Opens the root directory.
:param document: a CFBF document
:returns: the root directory entry
"""
sec_id = document.header.directory_sector
if sec_id < 0 and sec_id >= len(document.sector_alloc_table):
raise CfbfError('Bad directory sector')
buf = document.concat_sectors(sec_id)
return cls(document, BytesIO(buf))
def stream(self):
"""
Opens a stream for reading.
:returns: a byte stream object
"""
if not self.stream_type:
raise CfbfError('Entry is not a stream')
elif self.stream_size < self._document.header.min_stream_size:
raise CfbfError('Short streams are not implemented')
buf = self._document.concat_sectors(self.stream_sector)
return BytesIO(buf)
def _descend(self, id_, results):
"""
Recursively descends into the given node and finds siblings.
:param id_: directory entry ID to descend to
:param results: resulting entries collected during descent
"""
self._stream.seek(self.compute_offset(id_))
ent = DirectoryEntry(self._document, self._stream)
results.append(ent)
results.extend(ent.find_siblings())
def find_children(self):
"""
Finds all direct child entries of this entry.
:returns: a list of child entries.
"""
results = []
if self.storage_type and self.node_root != self.DIR_ENTRY_NULL:
self._descend(self.node_root, results)
return results
def find_siblings(self):
"""
Walks the red-black tree looking for nodes that are sibling entries.
:returns: a list of sibling entries
"""
results = []
if self.node_left != self.DIR_ENTRY_NULL:
self._descend(self.node_left, results)
if self.node_right != self.DIR_ENTRY_NULL:
self._descend(self.node_right, results)
return results
def btoi(bstr, signed=True):
"""
Converts a little-endian byte string to an integer.
:param buf: a byte string
:param signed: optional flag for sign-magnitude
:returns: an integer
"""
return int.from_bytes(bstr, byteorder='little', signed=signed)