Skip to content

Commit 7889254

Browse files
committed
sqliteutil: add Buffer object
1 parent b4c1089 commit 7889254

File tree

2 files changed

+432
-0
lines changed

2 files changed

+432
-0
lines changed

sqliteutil/buffer.go

Lines changed: 308 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,308 @@
1+
// Copyright (c) 2018 David Crawshaw <[email protected]>
2+
//
3+
// Permission to use, copy, modify, and distribute this software for any
4+
// purpose with or without fee is hereby granted, provided that the above
5+
// copyright notice and this permission notice appear in all copies.
6+
//
7+
// THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
8+
// WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
9+
// MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
10+
// ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
11+
// WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
12+
// ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
13+
// OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
14+
15+
package sqliteutil
16+
17+
import (
18+
"errors"
19+
"io"
20+
"sqlite"
21+
)
22+
23+
// A Buffer is a variable-sized bytes buffer backed by SQLite blobs.
24+
//
25+
// The bytes are broken into pages, with the first and last pages
26+
// stored in memory, and intermediate pages loaded into blobs.
27+
// Unlike a single SQLite blob, a Buffer can grow beyond its initial size.
28+
// The blobs are allocated in a temporary table.
29+
//
30+
// A Buffer is very similar to a bytes.Buffer.
31+
type Buffer struct {
32+
io.Reader
33+
io.Writer
34+
io.ByteScanner
35+
36+
err error
37+
conn *sqlite.Conn
38+
39+
// cap(rbuf) == cap(wbuf) == blobs[N].Size()
40+
41+
rbuf []byte // read buffer
42+
roff int // read head position in roff
43+
blobs []tblob // blobs storing data between rbuf and wbuf
44+
wbuf []byte // write buffer
45+
46+
freelist []tblob
47+
}
48+
49+
type tblob struct {
50+
blob *sqlite.Blob
51+
rowid int64
52+
}
53+
54+
// NewBuffer creates a Buffer with 16KB pages.
55+
func NewBuffer(conn *sqlite.Conn) (*Buffer, error) {
56+
return NewBufferSize(conn, 16*1024)
57+
}
58+
59+
// NewBufferSize creates a Buffer with a specified page size.
60+
func NewBufferSize(conn *sqlite.Conn, pageSize int) (*Buffer, error) {
61+
bb := &Buffer{
62+
conn: conn,
63+
rbuf: make([]byte, 0, pageSize),
64+
wbuf: make([]byte, 0, pageSize),
65+
}
66+
stmt := conn.Prep("CREATE TEMP TABLE IF NOT EXISTS BlobBuffer (blob BLOB);")
67+
if _, err := stmt.Step(); err != nil {
68+
return nil, err
69+
}
70+
return bb, nil
71+
}
72+
73+
func (bb *Buffer) alloc() (tblob, error) {
74+
if len(bb.freelist) > 0 {
75+
b := bb.freelist[len(bb.freelist)-1]
76+
bb.freelist = bb.freelist[:len(bb.freelist)-1]
77+
return b, nil
78+
}
79+
80+
stmt := bb.conn.Prep("INSERT INTO BlobBuffer (blob) VALUES ($blob);")
81+
stmt.SetZeroBlob("$blob", int64(len(bb.rbuf)))
82+
if _, err := stmt.Step(); err != nil {
83+
return tblob{}, err
84+
}
85+
rowid := bb.conn.LastInsertRowID()
86+
blob, err := bb.conn.OpenBlob("temp", "BlobBuffer", "blob", rowid, true)
87+
if err != nil {
88+
return tblob{}, err
89+
}
90+
return tblob{
91+
blob: blob,
92+
rowid: rowid,
93+
}, nil
94+
}
95+
96+
func (bb *Buffer) free(b tblob) {
97+
bb.freelist = append(bb.freelist, b)
98+
}
99+
100+
func (bb *Buffer) wbufEnsureSpace() error {
101+
if len(bb.wbuf) < cap(bb.wbuf) {
102+
return nil
103+
}
104+
105+
// Flush the write buffer.
106+
if len(bb.blobs) == 0 && bb.roff == len(bb.rbuf) {
107+
// Short cut. The write buffer is full, but
108+
// there are no on-disk blobs and the read
109+
// buffer is empty. So push these bytes
110+
// directly to the front of the Buffer.
111+
bb.rbuf, bb.wbuf = bb.wbuf, bb.rbuf[:0]
112+
bb.roff = 0
113+
} else {
114+
tblob, err := bb.alloc()
115+
if err != nil {
116+
bb.err = err
117+
return err
118+
}
119+
if _, err := tblob.blob.WriteAt(bb.wbuf, 0); err != nil {
120+
bb.err = err
121+
return err
122+
}
123+
bb.blobs = append(bb.blobs, tblob)
124+
bb.wbuf = bb.wbuf[:0]
125+
}
126+
127+
return nil
128+
}
129+
130+
// WriteByte appends a byte to the buffer, growing it as needed.
131+
func (bb *Buffer) WriteByte(c byte) error {
132+
if bb.err != nil {
133+
return bb.err
134+
}
135+
if err := bb.wbufEnsureSpace(); err != nil {
136+
return err
137+
}
138+
bb.wbuf = append(bb.wbuf, c)
139+
return nil
140+
}
141+
142+
func (bb *Buffer) UnreadByte() error {
143+
if bb.err != nil {
144+
return bb.err
145+
}
146+
if bb.roff == 0 {
147+
return errors.New("sqliteutil.Buffer: UnreadByte: no byte to unread")
148+
}
149+
bb.roff--
150+
return nil
151+
}
152+
153+
func (bb *Buffer) Write(p []byte) (n int, err error) {
154+
if bb.err != nil {
155+
return 0, bb.err
156+
}
157+
158+
for len(p) > 0 {
159+
if err := bb.wbufEnsureSpace(); err != nil {
160+
return n, err
161+
}
162+
163+
// TODO: shortcut for writing large p directly into a new blob
164+
165+
nn := len(p)
166+
if rem := cap(bb.wbuf) - len(bb.wbuf); nn > rem {
167+
nn = rem
168+
}
169+
bb.wbuf = append(bb.wbuf, p[:nn]...) // never grows wbuf
170+
n += nn
171+
p = p[nn:]
172+
}
173+
174+
return n, nil
175+
}
176+
177+
func (bb *Buffer) WriteString(p string) (n int, err error) {
178+
if bb.err != nil {
179+
return 0, bb.err
180+
}
181+
182+
for len(p) > 0 {
183+
if err := bb.wbufEnsureSpace(); err != nil {
184+
return n, err
185+
}
186+
187+
// TODO: shortcut for writing large p directly into a new blob
188+
189+
nn := len(p)
190+
if rem := cap(bb.wbuf) - len(bb.wbuf); nn > rem {
191+
nn = rem
192+
}
193+
bb.wbuf = append(bb.wbuf, p[:nn]...) // never grows wbuf
194+
n += nn
195+
p = p[nn:]
196+
}
197+
198+
return n, nil
199+
}
200+
201+
func (bb *Buffer) rbufFill() error {
202+
if bb.roff < len(bb.rbuf) {
203+
return nil
204+
}
205+
206+
// Read buffer is empty. Fill it.
207+
if len(bb.blobs) > 0 {
208+
// Read the first blob entirely into the read buffer.
209+
// TODO: shortcut for if len(p) >= blob.Size()
210+
bb.roff = 0
211+
bb.rbuf = bb.rbuf[:cap(bb.rbuf)]
212+
213+
tblob := bb.blobs[0]
214+
bb.blobs = bb.blobs[1:]
215+
if nn, err := tblob.blob.ReadAt(bb.rbuf, 0); err != nil {
216+
bb.err = err
217+
return err
218+
} else if nn != len(bb.rbuf) {
219+
panic("sqliteutil.Buffer: short read from blob")
220+
}
221+
bb.free(tblob)
222+
return nil
223+
}
224+
if len(bb.wbuf) > 0 {
225+
// No blobs. Swap the write buffer bytes here directly.
226+
bb.rbuf, bb.wbuf = bb.wbuf, bb.rbuf[:0]
227+
bb.roff = 0
228+
}
229+
230+
if bb.roff == len(bb.rbuf) {
231+
return io.EOF
232+
}
233+
return nil
234+
}
235+
236+
func (bb *Buffer) ReadByte() (byte, error) {
237+
if bb.err != nil {
238+
return 0, bb.err
239+
}
240+
if err := bb.rbufFill(); err != nil {
241+
return 0, err
242+
}
243+
c := bb.rbuf[bb.roff]
244+
bb.roff++
245+
return c, nil
246+
}
247+
248+
func (bb *Buffer) Read(p []byte) (n int, err error) {
249+
if bb.err != nil {
250+
return 0, bb.err
251+
}
252+
if err := bb.rbufFill(); err != nil {
253+
return 0, err
254+
}
255+
if bb.roff == len(bb.rbuf) {
256+
return 0, io.EOF
257+
}
258+
259+
n = copy(p, bb.rbuf[bb.roff:])
260+
bb.roff += n
261+
return n, nil
262+
}
263+
264+
func (bb *Buffer) Len() (n int64) {
265+
n = int64(len(bb.rbuf) - bb.roff)
266+
n += int64(cap(bb.rbuf) * len(bb.blobs))
267+
n += int64(len(bb.wbuf))
268+
return n
269+
}
270+
271+
func (bb *Buffer) Cap() (n int64) {
272+
pageSize := int64(cap(bb.rbuf))
273+
return (2 + int64(len(bb.blobs)+len(bb.freelist))) * pageSize
274+
}
275+
276+
func (bb *Buffer) Reset() {
277+
bb.rbuf = bb.rbuf[:0]
278+
bb.wbuf = bb.wbuf[:0]
279+
bb.roff = 0
280+
bb.freelist = append(bb.freelist, bb.blobs...)
281+
bb.blobs = nil
282+
}
283+
284+
func (bb *Buffer) Close() error {
285+
stmt := bb.conn.Prep("DELETE FROM BlobBuffer WHERE rowid = $rowid;")
286+
del := func(tblob tblob) {
287+
err := tblob.blob.Close()
288+
if bb.err == nil {
289+
bb.err = err
290+
}
291+
stmt.Reset()
292+
stmt.SetInt64("$rowid", tblob.rowid)
293+
if _, err := stmt.Step(); err != nil && bb.err == nil {
294+
bb.err = err
295+
}
296+
}
297+
298+
for _, tblob := range bb.blobs {
299+
del(tblob)
300+
}
301+
for _, tblob := range bb.freelist {
302+
del(tblob)
303+
}
304+
bb.blobs = nil
305+
bb.freelist = nil
306+
307+
return bb.err
308+
}

0 commit comments

Comments
 (0)