-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathdblist.py
76 lines (48 loc) · 1.48 KB
/
dblist.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
#!/usr/bin/python
# -*- coding: utf-8 -*-
import collections
import cPickle
import bsddb3
def _dumps(o):
return cPickle.dumps(o, -1).encode('zlib')
def _loads(o):
return cPickle.loads(o.decode('zlib'))
def open(dbenv, filename, dbname=None, flags=bsddb3.db.DB_CREATE, mode=0660, txn=None):
d = DBList(dbenv)
d.open(filename, dbname, bsddb3.db.DB_RECNO, flags, mode, txn)
return d
class DBList(collections.MutableSequence):
def __init__(self, dbenv):
self.db = bsddb3.db.DB(dbenv)
self._closed = True
def __del__(self):
self.close()
def __repr__(self):
if self._closed:
return '<DBList @ 0x%x - closed>' % (id(self))
else:
return repr(list(self))
def __getitem__(self, index):
try:
data = self.db[index+1]
return _loads(data)
except:
raise IndexError
def __setitem__(self, index, value):
data = _dumps(value)
self.db[index+1] = data
def __delitem__(self, index):
del self.db[index+1]
def __len__(self):
return len(self.db)
def insert(self, index, value):
pass
def append(self, value, txn=None):
data = _dumps(value)
return self.db.append(data, txn)
def open(self, *args, **kwargs):
self.db.open(*args, **kwargs)
self._closed = False
def close(self, *args, **kwargs):
self.db.close(*args, **kwargs)
self._closed = True