Skip to content

Commit

Permalink
Abstraction for blockio-style file operations
Browse files Browse the repository at this point in the history
Changes include:
* Add blockio-uring dependency
* Add an abstract API that captures the file
operations from `blockio-uring`.
* Implementations of this for three different
operating systems: Linux, MacOS, or Windows. The
Linux implementation uses `blockio-uring` and
benefits from async IO. MacOS and Windows use a
simple implementation that performs file I/O
sequentially instead of in asynchronous batches.
* Implement some basic tests for the API.
  • Loading branch information
jorisdral committed Mar 15, 2024
1 parent 0ef734e commit 083b2f3
Show file tree
Hide file tree
Showing 12 changed files with 511 additions and 7 deletions.
25 changes: 20 additions & 5 deletions .github/workflows/haskell.yml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,25 @@ jobs:
cabal-version: ${{ matrix.cabal }}
cabal-update: true

- name: Install liburing (on Linux)
id: setup-liburing
if: matrix.os == 'ubuntu-latest'
run: |
sudo apt-get update
sudo apt-get -y install pkg-config
echo "PKG_CONFIG_PATH=$PKG_CONFIG_PATH"
mkdir tmp
cd tmp
git clone https://github.com/axboe/liburing.git
cd liburing
git checkout liburing-2.5
./configure --cc=gcc --cxx=g++
make -j$(nproc)
sudo make install
cd ../..
sudo rm -rf ./tmp
pkg-config --modversion liburing
- name: Configure the build
run: |
cabal configure --enable-tests --enable-benchmark --ghc-options="-Werror" --ghc-options="-fno-ignore-asserts"
Expand Down Expand Up @@ -131,11 +150,7 @@ jobs:
cache-name: cache-cabal-stylish
with:
path: ${{ steps.setup-haskell.outputs.cabal-store }}
key: ${{ runner.os }}-${{ matrix.ghc }}-${{ env.cache-name }}-${{ hashFiles('**/*.cabal') }}-${{ hashFiles('**/cabal.project') }}
restore-keys: |
${{ runner.os }}-${{ matrix.ghc }}-${{ env.cache-name }}-${{ hashFiles('**/*.cabal') }}-${{ hashFiles('**/cabal.project') }}
${{ runner.os }}-${{ matrix.ghc }}-${{ env.cache-name }}-${{ hashFiles('**/*.cabal') }}-
${{ runner.os }}-${{ matrix.ghc }}-${{ env.cache-name }}-
key: ${{ runner.os }}-${{ matrix.ghc }}-${{ env.cache-name }}

- name: Install stylish-haskell
run: cabal install --ignore-project stylish-haskell --constraint 'stylish-haskell == 0.14.6.0'
Expand Down
5 changes: 4 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,7 @@ It has a number of custom features that are primarily tailored towards performan

## System requirements

This library only supports 64-bit, little-endian systems.
This library only supports 64-bit, little-endian systems.

Provide the -threaded flag to executables, test suites and benchmark suites if
you use this library on Linux systems.
15 changes: 15 additions & 0 deletions cabal.project
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,18 @@ package lsm-tree
-- apply this to all components
-- relevant mostly only for development & testing
ghc-options: -fno-ignore-asserts

if(os(linux))
source-repository-package
type: git
location: https://github.com/well-typed/blockio-uring
tag: bbeb81130ec3eafd8ced81564cc8bd46d24aff08

-- fs-api with support for I/O using user-supplied buffers
source-repository-package
type: git
location: https://github.com/input-output-hk/fs-sim
tag: 6a4a456640dd1fed434ccb4cbb553482afe8e2d4
subdir:
fs-api
fs-sim
94 changes: 94 additions & 0 deletions fs-api-blockio/src-linux/System/FS/BlockIO/Async.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
{-# LANGUAGE NamedFieldPuns #-}

module System.FS.BlockIO.Async (
asyncHasBlockIO
) where

import Control.Exception
import qualified Control.Exception as E
import Control.Monad
import Foreign.C.Error
import GHC.IO.Exception
import GHC.Stack
import System.FS.API (BufferOffset (..), FsErrorPath, Handle (..),
HasFS (..), SomeHasFS (..), ioToFsError)
import qualified System.FS.BlockIO.API as API
import System.FS.BlockIO.API (IOOp (..), IOResult (..), ioopHandle)
import System.FS.IO (HandleIO)
import System.FS.IO.Internal.Handle
import qualified System.IO.BlockIO as I
import System.IO.Error (ioeSetErrorString, isResourceVanishedError)
import System.Posix.Types

-- | IO instantiation of 'HasBlockIO', using @blockio-uring@.
asyncHasBlockIO :: HasFS IO HandleIO -> Maybe API.IOCtxParams -> IO (API.HasBlockIO IO HandleIO)
asyncHasBlockIO hasFS ctxParams = do
ctx <- I.initIOCtx (maybe I.defaultIOCtxParams ctxParamsConv ctxParams)
pure $ API.HasBlockIO {
API.close = I.closeIOCtx ctx
, API.submitIO = submitIO hasFS ctx
}

ctxParamsConv :: API.IOCtxParams -> I.IOCtxParams
ctxParamsConv API.IOCtxParams{API.ioctxBatchSizeLimit, API.ioctxConcurrencyLimit} =
I.IOCtxParams {
I.ioctxBatchSizeLimit = ioctxBatchSizeLimit
, I.ioctxConcurrencyLimit = ioctxConcurrencyLimit
}

submitIO ::
HasFS IO HandleIO
-> I.IOCtx
-> [IOOp IO HandleIO]
-> IO [IOResult]
submitIO hasFS ioctx ioops = do
ioops' <- mapM ioopConv ioops
ress <- I.submitIO ioctx ioops' `catch` rethrowClosedError
zipWithM rethrowErrno ioops ress
where
rethrowClosedError :: IOError -> IO a
rethrowClosedError e@IOError{} =
-- Pattern matching on the error is brittle, because the structure of
-- the exception might change between versions of @blockio-uring@.
-- Nonetheless, it's better than nothing.
if isResourceVanishedError e && ioe_location e == "IOCtx closed"
then throwIO (API.mkClosedError (SomeHasFS hasFS) "submitIO")
else throwIO e

rethrowErrno ::
HasCallStack
=> IOOp IO HandleIO
-> I.IOResult
-> IO IOResult
rethrowErrno ioop res = do
case res of
I.IOResult c -> pure (IOResult c)
I.IOError e -> throwAsFsError e
where
throwAsFsError :: HasCallStack => Errno -> IO a
throwAsFsError errno = E.throwIO $ ioToFsError fep (fromErrno errno)

fep :: FsErrorPath
fep = mkFsErrorPath hasFS (handlePath (ioopHandle ioop))

fromErrno :: Errno -> IOError
fromErrno errno = ioeSetErrorString
(errnoToIOError "submitIO" errno Nothing Nothing)
("submitIO failed: " <> ioopType)

ioopType :: String
ioopType = case ioop of
IOOpRead{} -> "IOOpRead"
IOOpWrite{} -> "IOOpWrite"

ioopConv :: IOOp IO HandleIO -> IO (I.IOOp IO)
ioopConv (IOOpRead h off buf bufOff c) = handleFd h >>= \fd ->
pure (I.IOOpRead fd off buf (unBufferOffset bufOff) c)
ioopConv (IOOpWrite h off buf bufOff c) = handleFd h >>= \fd ->
pure (I.IOOpWrite fd off buf (unBufferOffset bufOff) c)

-- This only checks whether the handle is open when we convert to an Fd. After
-- that, the handle could be closed when we're still performing blockio
-- operations.
handleFd :: Handle HandleIO -> IO Fd
handleFd h = withOpenHandle "submitIO" (handleRaw h) pure
15 changes: 15 additions & 0 deletions fs-api-blockio/src-linux/System/FS/BlockIO/Internal.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
module System.FS.BlockIO.Internal (
ioHasBlockIO
) where

import System.FS.API (HasBufFS, HasFS)
import System.FS.BlockIO.API (HasBlockIO, IOCtxParams)
import qualified System.FS.BlockIO.Async as I
import System.FS.IO (HandleIO)

ioHasBlockIO ::
HasFS IO HandleIO
-> HasBufFS IO HandleIO
-> Maybe IOCtxParams
-> IO (HasBlockIO IO HandleIO)
ioHasBlockIO hfs _bhfs = I.asyncHasBlockIO hfs
15 changes: 15 additions & 0 deletions fs-api-blockio/src-macos/System/FS/BlockIO/Internal.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
module System.FS.BlockIO.Internal (
ioHasBlockIO
) where

import System.FS.API (HasBufFS, HasFS)
import System.FS.BlockIO.API (HasBlockIO, IOCtxParams)
import qualified System.FS.BlockIO.Serial as Serial
import System.FS.IO (HandleIO)

ioHasBlockIO ::
HasFS IO HandleIO
-> HasBufFS IO HandleIO
-> Maybe IOCtxParams
-> IO (HasBlockIO IO HandleIO)
ioHasBlockIO hasFS hasBufFS _ = Serial.serialHasBlockIO hasFS hasBufFS
15 changes: 15 additions & 0 deletions fs-api-blockio/src-windows/System/FS/BlockIO/Internal.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
module System.FS.BlockIO.Internal (
ioHasBlockIO
) where

import System.FS.API (HasBufFS, HasFS)
import System.FS.BlockIO.API (HasBlockIO, IOCtxParams)
import qualified System.FS.BlockIO.Serial as Serial
import System.FS.IO (HandleIO)

ioHasBlockIO ::
HasFS IO HandleIO
-> HasBufFS IO HandleIO
-> Maybe IOCtxParams
-> IO (HasBlockIO IO HandleIO)
ioHasBlockIO hasFS hasBufFS _ = Serial.serialHasBlockIO hasFS hasBufFS
64 changes: 64 additions & 0 deletions fs-api-blockio/src/System/FS/BlockIO/API.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
{-# LANGUAGE GADTs #-}
{-# LANGUAGE RankNTypes #-}

module System.FS.BlockIO.API (
HasBlockIO (..)
, IOCtxParams (..)
, mkClosedError
, IOOp (..)
, ioopHandle
, IOResult (..)
-- * Re-exports
, ByteCount
, FileOffset
) where

import Control.Monad.Primitive (PrimMonad (PrimState))
import Data.Primitive.ByteArray (MutableByteArray)
import GHC.IO.Exception (IOErrorType (ResourceVanished))
import System.FS.API
import System.IO.Error (ioeSetErrorString, mkIOError)
import System.Posix.Types (ByteCount, FileOffset)
import Util.CallStack

-- | Abstract interface for submitting large batches of I\/O operations.
data HasBlockIO m h = HasBlockIO {
-- | (Idempotent) close the interface.
--
-- Using 'submitIO' after 'close' should thrown an 'FsError' exception. See
-- 'mkClosedError'.
close :: HasCallStack => m ()
-- | Submit a batch of I\/O operations and wait for the result.
--
-- Results correspond to input 'IOOp's in a pair-wise manner, i.e., one can
-- match 'IOOp's with 'IOResult's by zipping the input and output list.
--
-- If any of the I\/O operations fails, an 'FsError' exception will be thrown.
, submitIO :: HasCallStack => [IOOp m h] -> m [IOResult]
}

-- | Concurrency parameters for initialising a 'HasBlockIO. Can be ignored by
-- serial implementations.
data IOCtxParams = IOCtxParams {
ioctxBatchSizeLimit :: !Int,
ioctxConcurrencyLimit :: !Int
}

mkClosedError :: HasCallStack => SomeHasFS m -> String -> FsError
mkClosedError (SomeHasFS hasFS) loc = ioToFsError (mkFsErrorPath hasFS (mkFsPath [])) ioerr
where ioerr =
ioeSetErrorString
(mkIOError ResourceVanished loc Nothing Nothing)
("HasBlockIO closed: " <> loc)


data IOOp m h =
IOOpRead !(Handle h) !FileOffset !(MutableByteArray (PrimState m)) !BufferOffset !ByteCount
| IOOpWrite !(Handle h) !FileOffset !(MutableByteArray (PrimState m)) !BufferOffset !ByteCount

ioopHandle :: IOOp m h -> Handle h
ioopHandle (IOOpRead h _ _ _ _) = h
ioopHandle (IOOpWrite h _ _ _ _) = h

-- | Number of read/written bytes.
newtype IOResult = IOResult ByteCount
16 changes: 16 additions & 0 deletions fs-api-blockio/src/System/FS/BlockIO/IO.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
module System.FS.BlockIO.IO (
ioHasBlockIO
) where

import System.FS.API (HasBufFS, HasFS)
import System.FS.BlockIO.API (HasBlockIO, IOCtxParams)
import qualified System.FS.BlockIO.Internal as I
import System.FS.IO (HandleIO)

-- | Platform-dependent IO instantiation of 'HasBlockIO'.
ioHasBlockIO ::
HasFS IO HandleIO
-> HasBufFS IO HandleIO
-> Maybe IOCtxParams
-> IO (HasBlockIO IO HandleIO)
ioHasBlockIO = I.ioHasBlockIO
76 changes: 76 additions & 0 deletions fs-api-blockio/src/System/FS/BlockIO/Serial.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
{-# LANGUAGE LambdaCase #-}

module System.FS.BlockIO.Serial (
serialHasBlockIO
) where

import Control.Concurrent.Class.MonadMVar
import Control.Monad.Class.MonadThrow
import System.FS.API
import qualified System.FS.BlockIO.API as API
import System.FS.BlockIO.API (IOOp (..), IOResult (..))

-- | IO instantiation of 'HasBlockIO', using serialised I\/O.
serialHasBlockIO ::
(MonadThrow m, MonadMVar m, Eq h)
=> HasFS m h
-> HasBufFS m h
-> m (API.HasBlockIO m h)
serialHasBlockIO hfs hbfs = do
ctx <- initIOCtx (SomeHasFS hfs)
pure $ API.HasBlockIO {
API.close = close ctx
, API.submitIO = submitIO hfs hbfs ctx
}

data IOCtx m = IOCtx { ctxFS :: SomeHasFS m, openVar :: MVar m CtxState }

data CtxState = Open Word | Closing Word | Closed

addSubmitter :: (MonadMVar m, MonadThrow m) => IOCtx m -> m ()
addSubmitter ctx = modifyMVar_ (openVar ctx) $ \case
Open n -> pure (Open (n+1))
Closing _ -> throwIO (API.mkClosedError (ctxFS ctx) "submitIO")
Closed -> throwIO (API.mkClosedError (ctxFS ctx) "submitIO")

removeSubmitter :: (MonadMVar m, MonadThrow m) => IOCtx m -> m ()
removeSubmitter ctx = modifyMVar_ (openVar ctx) $ \case
Open n -> pure (Open (n-1))
Closing n
| n - 1 == 0 -> pure Closed
| otherwise -> pure (Closing (n-1))
Closed -> throwIO (API.mkClosedError (ctxFS ctx) "submitIO")

initIOCtx :: MonadMVar m => SomeHasFS m -> m (IOCtx m)
initIOCtx someHasFS = IOCtx someHasFS <$> newMVar (Open 0)

close :: MonadMVar m => IOCtx m -> m ()
close ctx = modifyMVar_ (openVar ctx) $ \case
Open n -> pure (Closing n)
Closing n -> pure (Closing n)
Closed -> pure Closed

submitIO ::
(MonadMVar m, MonadThrow m)
=> HasFS m h
-> HasBufFS m h
-> IOCtx m
-> [IOOp m h]
-> m [IOResult]
submitIO hfs hbfs ctx ioops = do
addSubmitter ctx
ress <- mapM (ioop hfs hbfs) ioops
removeSubmitter ctx
pure ress

-- | Perform the IOOp using synchronous I\/O.
ioop ::
MonadThrow m
=> HasFS m h
-> HasBufFS m h
-> IOOp m h
-> m IOResult
ioop hfs hbfs (IOOpRead h off buf bufOff c) =
IOResult <$> hGetBufExactlyAt hfs hbfs h buf bufOff c (fromIntegral off)
ioop _hfs hbfs (IOOpWrite h off buf bufOff c) =
IOResult <$> hPutBufExactlyAt hbfs h buf bufOff c (fromIntegral off)
Loading

0 comments on commit 083b2f3

Please sign in to comment.