Skip to content

Commit

Permalink
Merge pull request #111 from input-output-hk/matthias/run-serialisation
Browse files Browse the repository at this point in the history
Implement run (de)serialisation
  • Loading branch information
mheinzel authored Mar 13, 2024
2 parents 8e198aa + 1155043 commit 0ef734e
Show file tree
Hide file tree
Showing 17 changed files with 458 additions and 304 deletions.
22 changes: 13 additions & 9 deletions bench/micro/Bench/Database/LSMTree/Internal/Lookup.hs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@
module Bench.Database.LSMTree.Internal.Lookup (benchmarks, analysis) where

import Bench.Database.LSMTree.Internal.Run.BloomFilter (elems)
import Bench.Database.LSMTree.Internal.Run.Index.Compact (searches)
import Bench.Database.LSMTree.Internal.Run.Index.Compact
(constructCompactIndex, searches)
import Control.DeepSeq (NFData)
import Control.Monad
import Criterion.Main (Benchmark, bench, bgroup, env, nf, whnf)
Expand All @@ -22,13 +23,15 @@ import Data.List.NonEmpty (NonEmpty (..))
import qualified Data.List.NonEmpty as NonEmpty
import Data.Maybe (fromMaybe)
import Data.Proxy (Proxy (..))
import Database.LSMTree.Generators (RFPrecision (..), UTxOKey)
import Database.LSMTree.Generators (ChunkSize (..), RFPrecision (..),
UTxOKey)
import Database.LSMTree.Internal.Lookup (prepLookups)
import Database.LSMTree.Internal.Run.BloomFilter (Bloom)
import qualified Database.LSMTree.Internal.Run.BloomFilter as Bloom
import Database.LSMTree.Internal.Run.Index.Compact (Append (..),
CompactIndex)
import Database.LSMTree.Internal.Run.Index.Compact (CompactIndex)
import qualified Database.LSMTree.Internal.Run.Index.Compact as Index
import Database.LSMTree.Internal.Run.Index.Compact.Construction
(Append (..))
import Database.LSMTree.Internal.Serialise (SerialiseKey,
SerialisedKey, keyTopBits16, serialiseKey)
import Database.LSMTree.Util.Orphans ()
Expand Down Expand Up @@ -120,7 +123,7 @@ data Config = Config {
-- | If 'Nothing', use 'suggestRangeFinderPrecision'.
, rfprecDef :: !(Maybe Int)
-- | Chunk size for compact index construction
, csize :: !Int
, csize :: !ChunkSize
-- | Number of pages in total
--
-- Note: the actual number of pages can be higher, because of the
Expand All @@ -139,7 +142,7 @@ defaultConfig :: Config
defaultConfig = Config {
name = "default"
, rfprecDef = Nothing
, csize = 100
, csize = ChunkSize 100
, npages = 50_000
, npageEntries = 40
, npos = 10_000
Expand All @@ -157,14 +160,15 @@ prepLookupsEnv ::
prepLookupsEnv _ Config {..} = do
(storedKeys, lookupKeys) <- lookupsEnv @k (mkStdGen 17) totalEntries npos nneg
let b = Bloom.fromList fpr $ fmap serialiseKey storedKeys
ps = mkPages (RFPrecision rfprec) $ NonEmpty.fromList storedKeys
ps = mkPages rfprec $ NonEmpty.fromList storedKeys
ps' = fmap serialiseKey ps
ps'' = fromPage <$> getPages ps'
ci = Index.fromList rfprec csize ps''
ci = constructCompactIndex csize (rfprec, ps'')
pure (b, ci, fmap serialiseKey lookupKeys)
where
totalEntries = npages * npageEntries
rfprec = fromMaybe (Index.suggestRangeFinderPrecision npages) rfprecDef
rfprec = RFPrecision $
fromMaybe (Index.suggestRangeFinderPrecision npages) rfprecDef

-- | Generate keys to store and keys to lookup
lookupsEnv ::
Expand Down
11 changes: 8 additions & 3 deletions bench/micro/Bench/Database/LSMTree/Internal/Run/Index/Compact.hs
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,16 @@ module Bench.Database.LSMTree.Internal.Run.Index.Compact (
benchmarks
-- * Benchmarked functions
, searches
, constructCompactIndex
) where

import Control.DeepSeq (deepseq)
import Control.Monad.ST (runST)
import Criterion.Main
import Data.Foldable (Foldable (..))
import Database.LSMTree.Generators
import Database.LSMTree.Internal.Run.Index.Compact
import Database.LSMTree.Internal.Run.Index.Compact.Construction
import Database.LSMTree.Internal.Serialise (SerialisedKey,
serialiseKey)
import System.Random
Expand Down Expand Up @@ -71,6 +74,8 @@ constructCompactIndex ::
ChunkSize
-> (RFPrecision, [Append]) -- ^ Pages to add in succession
-> CompactIndex
constructCompactIndex (ChunkSize csize) (RFPrecision rfprec, ps) =
-- under the hood, 'fromList' uses the incremental construction interface
fromList rfprec csize ps
constructCompactIndex (ChunkSize csize) (RFPrecision rfprec, apps) = runST $ do
mci <- new rfprec csize
mapM_ (`append` mci) apps
(_, index) <- unsafeEnd mci
pure index
55 changes: 52 additions & 3 deletions src/Database/LSMTree/Internal/ByteString.hs
Original file line number Diff line number Diff line change
@@ -1,17 +1,66 @@
{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE CPP #-}
{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE CPP #-}
{-# LANGUAGE MagicHash #-}
{-# LANGUAGE UnboxedTuples #-}
-- | @bytestring@ extras
module Database.LSMTree.Internal.ByteString (
tryCheapToShort,
tryGetByteArray,
shortByteStringFromTo,
byteArrayFromTo,
) where

import qualified Data.ByteString as BS
import qualified Data.ByteString.Builder as BB
import qualified Data.ByteString.Builder.Internal as BB
import qualified Data.ByteString.Internal as BS.Internal
import Data.ByteString.Short (ShortByteString (SBS))
import qualified Data.ByteString.Short.Internal as SBS
import Data.Primitive.ByteArray (ByteArray (..))
import Data.Primitive.ByteArray (ByteArray (..), emptyByteArray,
sizeofByteArray)
import Foreign.Ptr (minusPtr, plusPtr)
import GHC.Exts (eqAddr#, mutableByteArrayContents#, realWorld#,
unsafeFreezeByteArray#)
import qualified GHC.ForeignPtr as Foreign

-- | \( O(1) \) conversion, if possible.
--
-- In addition to the conditions explained for 'tryGetByteArray', the
-- bytestring must use the full length of the underlying byte array.
tryCheapToShort :: BS.ByteString -> Either String ShortByteString
tryCheapToShort bs =
tryGetByteArray bs >>= \(ba , n) ->
if n /= sizeofByteArray ba then
Left "ByteString does not use full ByteArray"
else
let !(ByteArray ba#) = ba in Right (SBS ba#)


-- | \( O(1) \) conversion from a strict 'BS.ByteString' to its underlying
-- pinned 'ByteArray', if possible.
--
-- Strict bytestrings are allocated using 'mallocPlainForeignPtrBytes', so we
-- are expecting a 'PlainPtr' (or 'FinalPtr' when the length is 0).
-- We also require that bytestrings referencing a byte array point point at the
-- beginning, without any offset.
tryGetByteArray :: BS.ByteString -> Either String (ByteArray, Int)
tryGetByteArray (BS.Internal.BS (Foreign.ForeignPtr addr# contents) n) =
case contents of
Foreign.PlainPtr mba# ->
case mutableByteArrayContents# mba# `eqAddr#` addr# of
0# -> Left "non-zero offset into ByteArray"
_ -> -- safe, ByteString's content is considered immutable
Right $ case unsafeFreezeByteArray# mba# realWorld# of
(# _, ba# #) -> (ByteArray ba#, n)
Foreign.FinalPtr | n == 0 ->
-- We can also handle empty bytestrings ('BS.empty' uses 'FinalPtr').
Right (emptyByteArray, 0)
Foreign.FinalPtr ->
Left ("unsupported FinalPtr (length " <> show n <> ")")
Foreign.MallocPtr {} ->
Left ("unsupported MallocPtr (length " <> show n <> ")")
Foreign.PlainForeignPtr {} ->
Left ("unsupported PlainForeignPtr (length " <> show n <> ")")

-- | Copy of 'SBS.shortByteString', but with bounds (unchecked).
--
Expand Down
24 changes: 23 additions & 1 deletion src/Database/LSMTree/Internal/CRC32C.hs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ module Database.LSMTree.Internal.CRC32C (

-- * I\/O with checksum calculation
hGetSomeCRC32C,
hGetExactlyCRC32C,
hPutSomeCRC32C,
hPutAllCRC32C,
hPutAllChunksCRC32C,
Expand Down Expand Up @@ -66,14 +67,35 @@ updateCRC32C bs (CRC32C crc) = CRC32C (CRC.crc32c_update crc bs)

hGetSomeCRC32C :: Monad m
=> HasFS m h
-> Handle h -> Word64
-> Handle h
-> Word64
-> CRC32C -> m (BS.ByteString, CRC32C)
hGetSomeCRC32C fs h n crc = do
bs <- hGetSome fs h n
let !crc' = updateCRC32C bs crc
return (bs, crc')


-- | This function ensures that exactly the requested number of bytes is read.
-- If the file is too short, an 'FsError' of type 'FsReachedEOF' is thrown.
--
-- It attempts to read everything into a single strict chunk, which should
-- almost always succeed. If it doesn't, multiple chunks are produced.
--
-- TODO: To reliably return a strict bytestring without additional copying,
-- @fs-api@ needs to support directly reading into a buffer, which is currently
-- work in progress: <https://github.com/input-output-hk/fs-sim/pull/46>
hGetExactlyCRC32C :: MonadThrow m
=> HasFS m h
-> Handle h
-> Word64
-> CRC32C -> m (BSL.ByteString, CRC32C)
hGetExactlyCRC32C fs h n crc = do
lbs <- hGetExactly fs h n
let !crc' = BSL.foldlChunks (flip updateCRC32C) crc lbs
return (lbs, crc')


hPutSomeCRC32C :: Monad m
=> HasFS m h
-> Handle h
Expand Down
95 changes: 84 additions & 11 deletions src/Database/LSMTree/Internal/Run.hs
Original file line number Diff line number Diff line change
Expand Up @@ -37,19 +37,27 @@ module Database.LSMTree.Internal.Run (
module FsPaths
-- * Run
, RefCount
, Run
, Run (..)
, addReference
, removeReference
, fromWriteBuffer
, openFromDisk
) where

import Control.Exception (finally)
import Control.Exception (Exception, finally, throwIO)
import Control.Monad (when)
import qualified Data.ByteString.Lazy as LBS
import qualified Data.ByteString.Short as SBS
import Data.Foldable (for_)
import Data.IORef
import Database.LSMTree.Internal.BloomFilter (bloomFilterFromSBS)
import Database.LSMTree.Internal.ByteString (tryCheapToShort)
import qualified Database.LSMTree.Internal.CRC32C as CRC
import Database.LSMTree.Internal.Entry (NumEntries (..))
import Database.LSMTree.Internal.Run.BloomFilter (Bloom)
import Database.LSMTree.Internal.Run.FsPaths as FsPaths
import Database.LSMTree.Internal.Run.Index.Compact (CompactIndex)
import qualified Database.LSMTree.Internal.Run.Index.Compact as Index
import Database.LSMTree.Internal.Run.Mutable
import Database.LSMTree.Internal.Serialise
import Database.LSMTree.Internal.WriteBuffer (WriteBuffer)
Expand All @@ -61,27 +69,28 @@ import System.FS.API (HasFS)
-- | The in-memory representation of a completed LSM run.
--
data Run fhandle = Run {
lsmRunNumEntries :: !NumEntries
-- | The reference count for the LSM run. This counts the
-- number of references from LSM handles to this run. When
-- this drops to zero the open files will be closed.
lsmRunRefCount :: !RefCount
, lsmRunRefCount :: !RefCount
-- | The file system paths for all the files used by the run.
, lsmRunFsPaths :: !RunFsPaths
, lsmRunFsPaths :: !RunFsPaths
-- | The bloom filter for the set of keys in this run.
, lsmRunFilter :: !(Bloom SerialisedKey)
, lsmRunFilter :: !(Bloom SerialisedKey)
-- | The in-memory index mapping keys to page numbers in the
-- Key\/Ops file. In future we may support alternative index
-- representations.
, lsmRunIndex :: !CompactIndex
, lsmRunIndex :: !CompactIndex
-- | The file handle for the Key\/Ops file. This file is opened
-- read-only and is accessed in a page-oriented way, i.e. only
-- reading whole pages, at page offsets. It will be opened with
-- 'O_DIRECT' on supported platforms.
, lsmRunKOpsFile :: !fhandle
, lsmRunKOpsFile :: !fhandle
-- | The file handle for the BLOBs file. This file is opened
-- read-only and is accessed in a normal style using buffered
-- I\/O, reading arbitrary file offset and length spans.
, lsmRunBlobFile :: !fhandle
, lsmRunBlobFile :: !fhandle
}

-- | Increase the reference count by one.
Expand Down Expand Up @@ -113,7 +122,7 @@ close fs Run {..} = do
-- | Create a run by finalising a mutable run.
fromMutable :: HasFS IO h -> MRun (FS.Handle h) -> IO (Run (FS.Handle h))
fromMutable fs mrun = do
(lsmRunRefCount, lsmRunFsPaths, lsmRunFilter, lsmRunIndex) <-
(lsmRunRefCount, lsmRunFsPaths, lsmRunFilter, lsmRunIndex, lsmRunNumEntries) <-
unsafeFinalise fs mrun
lsmRunKOpsFile <- FS.hOpen fs (runKOpsPath lsmRunFsPaths) FS.ReadMode
lsmRunBlobFile <- FS.hOpen fs (runBlobPath lsmRunFsPaths) FS.ReadMode
Expand All @@ -130,8 +139,9 @@ fromMutable fs mrun = do
-- This is currently done very crudely based on the assumption that a k\/op pair
-- requires approximately 100 bytes of disk space.
-- To do better, we would need information about key and value size.
fromWriteBuffer :: HasFS IO h -> RunFsPaths -> WriteBuffer k v b ->
IO (Run (FS.Handle h))
fromWriteBuffer ::
HasFS IO h -> RunFsPaths -> WriteBuffer k v b
-> IO (Run (FS.Handle h))
fromWriteBuffer fs fsPaths buffer = do
-- We just estimate the number of pages to be one, as the write buffer is
-- expected to be small enough not to benefit from more precise tuning.
Expand All @@ -141,3 +151,66 @@ fromWriteBuffer fs fsPaths buffer = do
for_ (WB.content buffer) $ \(k, e) ->
addFullKOp fs mrun k e
fromMutable fs mrun

data ChecksumError = ChecksumError FS.FsPath CRC.CRC32C CRC.CRC32C
deriving Show

instance Exception ChecksumError

data FileFormatError = FileFormatError FS.FsPath String
deriving Show

instance Exception FileFormatError

-- | Load a previously written run from disk, checking each file's checksum
-- against the checksum file.
--
-- Exceptions will be raised when any of the file's contents don't match their
-- checksum ('ChecksumError') or can't be parsed ('FileFormatError').
openFromDisk :: HasFS IO h -> RunFsPaths -> IO (Run (FS.Handle h))
openFromDisk fs lsmRunFsPaths = do
expectedChecksums <-
expectValidFile (runChecksumsPath lsmRunFsPaths) . fromChecksumsFile
=<< CRC.readChecksumsFile fs (runChecksumsPath lsmRunFsPaths)

-- verify checksums of files we don't read yet
let paths = runFsPaths lsmRunFsPaths
checkCRC (forRunKOps expectedChecksums) (forRunKOps paths)
checkCRC (forRunBlob expectedChecksums) (forRunBlob paths)

-- read and try parsing files
lsmRunFilter <-
expectValidFile (forRunFilter paths) . bloomFilterFromSBS
=<< readCRC (forRunFilter expectedChecksums) (forRunFilter paths)
(lsmRunNumEntries, lsmRunIndex) <-
expectValidFile (forRunIndex paths) . Index.fromSBS
=<< readCRC (forRunIndex expectedChecksums) (forRunIndex paths)

lsmRunKOpsFile <- FS.hOpen fs (runKOpsPath lsmRunFsPaths) FS.ReadMode
lsmRunBlobFile <- FS.hOpen fs (runBlobPath lsmRunFsPaths) FS.ReadMode
lsmRunRefCount <- newIORef 1
return Run {..}
where
checkCRC :: CRC.CRC32C -> FS.FsPath -> IO ()
checkCRC expected fp = do
checksum <- CRC.readFileCRC32C fs fp
expectChecksum fp expected checksum

readCRC :: CRC.CRC32C -> FS.FsPath -> IO SBS.ShortByteString
readCRC expected fp = FS.withFile fs fp FS.ReadMode $ \h -> do
-- Read the whole file, which should usually return a single chunk.
-- In this case, 'toStrict' is O(1).
size <- FS.hGetSize fs h
(lbs, crc) <- CRC.hGetExactlyCRC32C fs h size CRC.initialCRC32C
expectChecksum fp expected crc
let bs = LBS.toStrict lbs
return $ case tryCheapToShort bs of
Right sbs -> sbs
Left _err -> SBS.toShort bs -- Should not happen, but just in case

expectChecksum fp expected checksum =
when (expected /= checksum) $
throwIO $ ChecksumError fp expected checksum

expectValidFile _ (Right x) = pure x
expectValidFile fp (Left err) = throwIO $ FileFormatError fp err
Loading

0 comments on commit 0ef734e

Please sign in to comment.