-
Notifications
You must be signed in to change notification settings - Fork 22
/
tx.go
205 lines (172 loc) · 4.69 KB
/
tx.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
package LibraDB
type tx struct {
dirtyNodes map[pgnum]*Node
pagesToDelete []pgnum
// new pages allocated during the transaction. They will be released if rollback is called.
allocatedPageNums []pgnum
write bool
db *DB
}
func newTx(db *DB, write bool) *tx {
return &tx{
map[pgnum]*Node{},
make([]pgnum, 0),
make([]pgnum, 0),
write,
db,
}
}
func (tx *tx) newNode(items []*Item, childNodes []pgnum) *Node {
node := NewEmptyNode()
node.items = items
node.childNodes = childNodes
node.pageNum = tx.db.getNextPage()
node.tx = tx
node.tx.allocatedPageNums = append(node.tx.allocatedPageNums, node.pageNum)
return node
}
func (tx *tx) getNode(pageNum pgnum) (*Node, error) {
if node, ok := tx.dirtyNodes[pageNum]; ok {
return node, nil
}
node, err := tx.db.getNode(pageNum)
if err != nil {
return nil, err
}
node.tx = tx
return node, nil
}
func (tx *tx) writeNode(node *Node) *Node {
tx.dirtyNodes[node.pageNum] = node
node.tx = tx
return node
}
func (tx *tx) deleteNode(node *Node) {
tx.pagesToDelete = append(tx.pagesToDelete, node.pageNum)
}
func (tx *tx) Rollback() {
if !tx.write {
tx.db.rwlock.RUnlock()
return
}
tx.dirtyNodes = nil
tx.pagesToDelete = nil
for _, pageNum := range tx.allocatedPageNums {
tx.db.freelist.releasePage(pageNum)
}
tx.allocatedPageNums = nil
tx.db.rwlock.Unlock()
}
func (tx *tx) Commit() error {
if !tx.write {
tx.db.rwlock.RUnlock()
return nil
}
for _, node := range tx.dirtyNodes {
_, err := tx.db.writeNode(node)
if err != nil {
return err
}
}
for _, pageNum := range tx.pagesToDelete {
tx.db.deleteNode(pageNum)
}
_, err := tx.db.writeFreelist()
if err != nil {
return err
}
tx.dirtyNodes = nil
tx.pagesToDelete = nil
tx.allocatedPageNums = nil
tx.db.rwlock.Unlock()
return nil
}
// This will be used for implementing COW. The idea is to mark all the dirty collection, then for each collection,
// traverse it's dirty in post order and commit child page. Then take the new page numbers, assign them to the parent,
// and save him as well and so. After we wrote all collections, rewrite the root node with the new collections roots.
// Rewrite the freelist with the newly allocated pages. Finally, rewrite the meta page, so the new root node will take
// effect.
// COW will give us atomicity as new pages cannot be seen until the root node is written. This way, in case of a failure
// or a rollback no harm will be done as nothing was committed to the database.
//func (tx *tx) commitNode(node *Node) error {
// oldPageNum := node.num
// node.num = 0
//
// newNode, err := tx.db.writeNode(node)
// if err != nil {
// return err
// }
// tx.committedNodes[oldPageNum] = newNode.num
// tx.deleteNode(node)
// return nil
//}
//
//// saveDirtyNodes saves the tree in a post order way. post order is used since child pages are written to the disk and
//// are given new page id, only then we can update the parent node with new page of the child node.
//func (tx *tx) saveDirtyNodes(node *Node) error {
// if len(node.childNodes) == 0 {
// return tx.commitNode(node)
// }
//
// for i, childNodePgid := range node.childNodes {
// if childNode, ok := tx.dirtyNodes[childNodePgid]; ok {
// err := tx.saveDirtyNodes(childNode)
// if err != nil {
// return err
// }
// }
// node.childNodes[i] = tx.committedNodes[childNodePgid]
// }
//
// return tx.commitNode(node)
//}
func (tx *tx) getRootCollection() *Collection {
rootCollection := newEmptyCollection()
rootCollection.root = tx.db.root
rootCollection.tx = tx
return rootCollection
}
func (tx *tx) GetCollection(name []byte) (*Collection, error) {
rootCollection := tx.getRootCollection()
item, err := rootCollection.Find(name)
if err != nil {
return nil, err
}
if item == nil {
return nil, nil
}
collection := newEmptyCollection()
collection.deserialize(item)
collection.tx = tx
return collection, nil
}
func (tx *tx) CreateCollection(name []byte) (*Collection, error) {
if !tx.write {
return nil, writeInsideReadTxErr
}
newCollectionPage, err := tx.db.writeNode(NewEmptyNode())
if err != nil {
return nil, err
}
newCollection := newEmptyCollection()
newCollection.name = name
newCollection.root = newCollectionPage.pageNum
return tx.createCollection(newCollection)
}
func (tx *tx) DeleteCollection(name []byte) error {
if !tx.write {
return writeInsideReadTxErr
}
rootCollection := tx.getRootCollection()
return rootCollection.Remove(name)
}
func (tx *tx) createCollection(collection *Collection) (*Collection, error) {
collection.tx = tx
collectionBytes := collection.serialize()
rootCollection := tx.getRootCollection()
err := rootCollection.Put(collection.name, collectionBytes.value)
if err != nil {
return nil, err
}
return collection, nil
}