Skip to content

Commit

Permalink
[work-pool] Add work-pool subproject
Browse files Browse the repository at this point in the history
* This is an extraction from my mutant-manager project.
  • Loading branch information
mbj committed Jun 3, 2024
1 parent e4ec728 commit e24cbf3
Show file tree
Hide file tree
Showing 9 changed files with 587 additions and 0 deletions.
3 changes: 3 additions & 0 deletions stack-9.4.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ flags:
development: true
tasty-mgolden:
development: true
work-pool:
development: true
xray:
development: true
packages:
Expand All @@ -97,4 +99,5 @@ packages:
- source-constraints
- stack-deploy
- tasty-mgolden
- work-pool
- xray
3 changes: 3 additions & 0 deletions stack-9.6.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ flags:
development: true
tasty-mgolden:
development: true
work-pool:
development: true
xray:
development: true
packages:
Expand All @@ -89,4 +91,5 @@ packages:
- source-constraints
- stack-deploy
- tasty-mgolden
- work-pool
- xray
5 changes: 5 additions & 0 deletions work-pool/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# mio-log

Minimal mio-based logger.

See Test module for usage example
25 changes: 25 additions & 0 deletions work-pool/package.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
_common/package: !include "../common/package.yaml"

name: work-pool
synopsis: Introspectable work pool
homepage: https://github.com/mbj/mhs#readme
github: mbj/mhs
version: 0.0.1

<<: *defaults

dependencies:
- base
- mprelude
- text
- unliftio

tests:
test:
<<: *test
dependencies:
- containers
- devtools
- tasty
- tasty-hunit
- work-pool
89 changes: 89 additions & 0 deletions work-pool/src/WorkPool.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
module WorkPool
( Config(..)
, Pool
, pushJob
, runPool
)
where

import MPrelude
import Prelude (succ)

import qualified UnliftIO.Async as UnliftIO
import qualified UnliftIO.STM as UnliftIO

-- Worker pool configuration
data Config a = Config
{ produceJobs :: forall m . MonadUnliftIO m => Pool a -> m ()
-- ^ function called from the main thread producing work, use `pushJob` to
-- create workable jobs.
, queueSize :: Natural
-- ^ maximum size of the jobs queued
, workerCount :: Natural
-- ^ number of workers to boot
, workerRun :: forall m . MonadUnliftIO m => Natural -> m (a -> m ())
-- ^ function called when a worker is booted, argument is the worker index,
-- returns an action to be called per job assigned to this worker.
}

-- Running pool
newtype Pool a = Pool
{ queue :: UnliftIO.TBQueue (Event a)
}

-- Add (dynamically) created a job to the pool
--
-- This function will block if the max queue size would be overflown.
-- As the workers create space in the queue this function will unblock.
pushJob :: MonadIO m => Pool a -> a -> m ()
pushJob Pool{..} item
= UnliftIO.atomically
$ UnliftIO.writeTBQueue queue (Job item)

-- Run worker pool with specified config
--
-- The function will return if either the `produceJobs` function returns
-- or a worker or the producer throws an error.
runPool :: forall a m . MonadUnliftIO m => Config a -> m ()
runPool Config{..} = do
pool <- newPool queueSize

boot pool $ \handles -> do
produceJobs pool
pushQuit pool
traverse_ UnliftIO.wait handles
where
boot :: Pool a -> ([UnliftIO.Async ()] -> m ()) -> m ()
boot pool withAsyncHandlers = go 0 []
where
go :: Natural -> [UnliftIO.Async ()] -> m ()
go index asyncHandlers =
if index == workerCount
then withAsyncHandlers asyncHandlers
else
UnliftIO.withAsync
(workLoop =<< workerRun index)
(\async -> go (succ index) (async:asyncHandlers))

workLoop action =
pullJob pool >>= maybe (pure ()) (\item -> action item >> workLoop action)

pushQuit :: MonadIO m => Pool a -> m ()
pushQuit Pool{..}
= UnliftIO.atomically
$ UnliftIO.writeTBQueue queue Quit

pullJob :: MonadIO m => Pool a -> m (Maybe a)
pullJob Pool{..}
= UnliftIO.atomically
$ UnliftIO.readTBQueue queue >>= \case
(Job item) -> pure $ pure item
Quit -> UnliftIO.writeTBQueue queue Quit $> empty

newPool :: MonadIO m => Natural -> m (Pool a)
newPool queueSize =
UnliftIO.atomically $ do
queue <- UnliftIO.newTBQueue queueSize
pure Pool{..}

data Event a = Quit | Job a
86 changes: 86 additions & 0 deletions work-pool/test/Test.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
import Control.Arrow (left)
import Control.Monad (when)
import MPrelude
import Test.Tasty
import Test.Tasty.HUnit

import qualified Data.List as List
import qualified Data.Set as Set
import qualified Data.String as String
import qualified Devtools
import qualified UnliftIO.Concurrent as UnliftIO
import qualified UnliftIO.Exception as UnliftIO
import qualified WorkPool

main :: IO ()
main
= defaultMain
$ testGroup "work-pool"
[ Devtools.testTree $$(Devtools.readDependencies [Devtools.Target "work-pool"])
, mkSuccess 1 1
, mkSuccess 1 100
, mkSuccess 100 1
, mkSuccess 100 100
, mkSuccess 1000 1000
, producerFailure
, workerFailure
]
where
mkSuccess :: Natural -> Natural -> TestTree
mkSuccess queueSize workerCount =
testCase ("queue size: " <> show queueSize <> ", workerCount: " <> show workerCount) $ do
output <- UnliftIO.newMVar []
WorkPool.runPool $ config output
assertEqual "" (Set.fromList values) =<< UnliftIO.readMVar output
where
config output = WorkPool.Config{..}
where
workerRun :: MonadUnliftIO m => Natural -> m (Natural -> m ())
workerRun _index = pure $ \value -> do
void $ UnliftIO.modifyMVar output $ \set -> pure (Set.insert value set, ())

workerFailure :: TestTree
workerFailure = testCase "worker failure" $ do
result <- UnliftIO.try (WorkPool.runPool config)
assertEqual "" (Left "intentional error\n") (left formatException result)
where
config = WorkPool.Config{queueSize = 100, workerCount = 100, ..}

workerRun :: MonadIO m => Natural -> m (Natural -> m ())
workerRun _index = pure $ \value ->
when (value == 100) $ UnliftIO.throwString "intentional error"

producerFailure :: TestTree
producerFailure = testCase "producer failure" $ do
result <- UnliftIO.try (WorkPool.runPool config)
assertEqual "" (Left "intentional error\n") (left formatException result)
where
config
= WorkPool.Config
{ produceJobs = produceJobsFailing
, queueSize = 100
, workerCount = 100
, ..
}

produceJobsFailing :: MonadIO m => WorkPool.Pool Natural -> m ()
produceJobsFailing pool = do
WorkPool.pushJob pool 1
UnliftIO.throwString "intentional error"

workerRun :: MonadIO m => Natural -> m (Natural -> m ())
workerRun _index = pure . const $ pure ()

formatException :: UnliftIO.SomeException -> String
formatException
= String.unlines
. List.drop 2
. List.take 3
. String.lines
. UnliftIO.displayException

produceJobs :: MonadIO m => WorkPool.Pool Natural -> m ()
produceJobs pool = traverse_ (WorkPool.pushJob pool) values

values :: [Natural]
values = [0..1000]
122 changes: 122 additions & 0 deletions work-pool/test/stack-9.4-dependencies.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
Diff 0.4.1
OneTuple 0.4.1.1
QuickCheck 2.14.3
StateVar 1.2.2
aeson 2.1.2.1
alex 3.3.0.0
ansi-terminal 0.11.5
ansi-terminal-types 0.11.5
ansi-wl-pprint 0.6.9
array 0.5.4.0
assoc 1.1
async 2.2.5
attoparsec 0.14.4
base 4.17.2.1
base-compat 0.12.3
base-compat-batteries 0.12.3
base-orphans 0.9.1
bifunctors 5.5.15
binary 0.8.9.1
bitvec 1.1.5.0
bytestring 0.11.5.3
call-stack 0.4.0
clock 0.8.4
cmdargs 0.10.22
colour 2.3.6
comonad 5.0.8
conduit 1.3.5
containers 0.6.7
contravariant 1.5.5
cpphs 1.20.9.1
data-default 0.7.1.1
data-default-class 0.1.2.0
data-default-instances-containers 0.0.1
data-default-instances-dlist 0.0.1
data-default-instances-old-locale 0.0.1
data-fix 0.3.2
deepseq 1.4.8.0
deriving-aeson 0.2.9
devtools 0.2.0
directory 1.3.7.1
distributive 0.6.2.1
dlist 1.0
exceptions 0.10.5
extra 1.7.14
file-embed 0.0.15.0
filepath 1.4.2.2
filepattern 0.1.3
foldable1-classes-compat 0.1
generically 0.1.1
ghc 9.4.8
ghc-bignum 1.3
ghc-boot 9.4.8
ghc-boot-th 9.4.8
ghc-heap 9.4.8
ghc-lib-parser 9.4.8.20231111
ghc-lib-parser-ex 9.4.0.0
ghc-prim 0.9.1
ghci 9.4.8
happy 1.20.1.1
hashable 1.4.3.0
hlint 3.5
hpc 0.6.1.0
hscolour 1.24.4
indexed-traversable 0.1.3
indexed-traversable-instances 0.1.1.2
integer-logarithms 1.0.3.1
libyaml 0.1.2
mono-traversable 1.0.15.3
mprelude 0.2.3
mtl 2.2.2
old-locale 1.0.0.7
optparse-applicative 0.17.1.0
parsec 3.1.16.1
polyparse 1.13
pretty 1.1.3.6
primitive 0.8.0.0
process 1.6.18.0
random 1.2.1.1
refact 0.3.0.2
resourcet 1.2.6
rts 1.0.2
safe-exceptions 0.1.7.4
scientific 0.3.7.0
semialign 1.3
semigroupoids 5.3.7
source-constraints 0.0.5
split 0.2.3.5
splitmix 0.1.0.5
stm 2.5.1.0
strict 0.5
syb 0.7.2.4
tagged 0.8.7
tasty 1.4.3
tasty-expected-failure 0.12.3
tasty-hunit 0.10.1
tasty-mgolden 0.0.2
template-haskell 2.19.0.0
terminfo 0.4.1.5
text 2.0.2
text-short 0.1.5
th-abstraction 0.4.5.0
th-lift 0.8.4
these 1.2
time 1.12.2
time-compat 1.9.6.1
transformers 0.5.6.2
transformers-compat 0.7.2
typed-process 0.2.11.1
unbounded-delays 0.1.1.1
uniplate 1.6.13
unix 2.7.3
unliftio 0.2.25.0
unliftio-core 0.2.1.0
unordered-containers 0.2.19.1
utf8-string 1.0.2
uuid-types 1.0.5.1
vector 0.13.1.0
vector-algorithms 0.9.0.1
vector-stream 0.1.0.0
witherable 0.4.2
work-pool 0.0.1
yaml 0.11.11.2
Loading

0 comments on commit e24cbf3

Please sign in to comment.