-
Notifications
You must be signed in to change notification settings - Fork 0
/
local_fs.py
82 lines (63 loc) · 1.99 KB
/
local_fs.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
import os
import shutil
from dynamo_fs import concatPath
from array import array
# DynamoFS look-alike that just forwards every call to the underlying
# operating system.
class LocalFS:
# root is the local path to use for this object's root.
def __init__(self, root):
# shutil.rmtree(root)
try:
os.mkdir(root)
except OSError, e:
pass
self.root = root
def _fromRoot(self, path):
return concatPath(self.root, path)
# mode can be 'r' or 'w'
def open(self, path, mode):
path = self._fromRoot(path)
return LocalFile(path, mode)
def isdir(self, path):
path = self._fromRoot(path)
return os.path.isdir(path)
def rm(self, path):
path = self._fromRoot(path)
if os.path.isdir(path):
os.rmdir(path)
else:
os.unlink(path)
def mkdir(self, path, newName):
path = self._fromRoot(path)
os.mkdir(concatPath(path, newName))
def ls(self, path):
path = self._fromRoot(path)
return os.listdir(path)
def mv(self, old_name, new_name):
old_name = self._fromRoot(old_name)
new_name = self._fromRoot(new_name)
os.rename(old_name, new_name)
def flush(self):
pass # No-op.
def __str__(self):
return "LocalFS"
# Object returned by LocalFS.open().
class LocalFile:
def __init__(self, filename, mode):
self.f = open(filename, mode)
def write_array(self, data):
self.f.write("".join(map(chr, data)))
def read_array(self, max_len):
return map(ord, self.f.read(max_len))
def write(self, data):
self.f.write(data)
def read(self, max_len):
return self.f.read(max_len)
def seek(self, offset, whence):
self.f.seek(offset, whence)
def close(self):
self.f.close()
del self.f
# This class is optimized for reading and writing strings.
stringOptimized = True