Skip to content

Commit

Permalink
[work-pool] Add work-pool subproject
Browse files Browse the repository at this point in the history
* This is an extraction from my mutant-manager project.
  • Loading branch information
mbj committed Jun 2, 2024
1 parent e4ec728 commit 9270b5b
Show file tree
Hide file tree
Showing 9 changed files with 574 additions and 0 deletions.
3 changes: 3 additions & 0 deletions stack-9.4.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ flags:
development: true
tasty-mgolden:
development: true
work-pool:
development: true
xray:
development: true
packages:
Expand All @@ -97,4 +99,5 @@ packages:
- source-constraints
- stack-deploy
- tasty-mgolden
- work-pool
- xray
3 changes: 3 additions & 0 deletions stack-9.6.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ flags:
development: true
tasty-mgolden:
development: true
work-pool:
development: true
xray:
development: true
packages:
Expand All @@ -89,4 +91,5 @@ packages:
- source-constraints
- stack-deploy
- tasty-mgolden
- work-pool
- xray
3 changes: 3 additions & 0 deletions work-pool/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# work-pool

Minimal fixed max size, fixed worker count work pool with per worker boot and dynamic job production.
25 changes: 25 additions & 0 deletions work-pool/package.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
_common/package: !include "../common/package.yaml"

name: work-pool
synopsis: Simple work pool with max queue size, dynamic resupply and explicit worker boot.
homepage: https://github.com/mbj/mhs#readme
github: mbj/mhs
version: 0.0.1

<<: *defaults

dependencies:
- base
- mprelude
- text
- unliftio

tests:
test:
<<: *test
dependencies:
- containers
- devtools
- tasty
- tasty-hunit
- work-pool
78 changes: 78 additions & 0 deletions work-pool/src/WorkPool.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
module WorkPool
( Config(..)
, Pool
, pushJob
, run
)
where

import MPrelude
import Prelude (succ)

import qualified UnliftIO.Async as UnliftIO
import qualified UnliftIO.STM as UnliftIO

-- | Worker pool configuration
data Config a = Config
{ produceJobs :: forall m . MonadUnliftIO m => Pool a -> m ()
-- ^ function called from the main thread producing work, use `pushJob` to
-- create workable jobs.
, queueSize :: Natural
-- ^ maximum size of the jobs queued
, workerCount :: Natural
-- ^ number of workers to boot
, workerRun :: forall m . MonadUnliftIO m => Natural -> m (a -> m ())
-- ^ function called when a worker is booted, argument is the worker index,
-- returns an action to be called per job assigned to this worker.
}

-- Internal queue event, supplying job or quitting the worker
data Event a = Quit | Job a

-- | Running pool
newtype Pool a = Pool
{ queue :: UnliftIO.TBQueue (Event a)
}

-- | Add (dynamically) created a job to the pool
--
-- This function will block if the max queue size would be overflown.
-- As the workers create space in the queue this function will unblock.
pushJob :: MonadIO m => Pool a -> a -> m ()
pushJob Pool{..} item
= UnliftIO.atomically
$ UnliftIO.writeTBQueue queue (Job item)

-- | Run worker pool with specified config
--
-- The function will return if either:
-- * the `produceJobs` function returns
-- * a worker or throws an error
-- * the producer throws an error.
--
-- Care is taken to not leak threads via the use if `withAsync` from the `async` package.
run :: forall a m . MonadUnliftIO m => Config a -> m ()
run Config{..} = do
pool@Pool{..} <- UnliftIO.atomically $ do
queue <- UnliftIO.newTBQueue queueSize
pure Pool{..}

boot pool $ \handlers -> do
produceJobs pool -- supply jobs
UnliftIO.atomically $ UnliftIO.writeTBQueue queue Quit -- signal workers to gracefully exit
traverse_ UnliftIO.wait handlers -- wait for workers to gracefully exit
where
boot Pool{..} withAsyncHandlers = go 0 []
where
go index asyncHandlers =
if index == workerCount
then withAsyncHandlers asyncHandlers
else
UnliftIO.withAsync
(workLoop =<< workerRun index)
(\async -> go (succ index) (async:asyncHandlers))

workLoop action =
UnliftIO.atomically (UnliftIO.readTBQueue queue) >>= \case
(Job item) -> action item >> workLoop action
Quit -> UnliftIO.atomically $ UnliftIO.writeTBQueue queue Quit
86 changes: 86 additions & 0 deletions work-pool/test/Test.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
import Control.Arrow (left)
import Control.Monad (when)
import MPrelude
import Test.Tasty
import Test.Tasty.HUnit

import qualified Data.List as List
import qualified Data.Set as Set
import qualified Data.String as String
import qualified Devtools
import qualified UnliftIO.Concurrent as UnliftIO
import qualified UnliftIO.Exception as UnliftIO
import qualified WorkPool

main :: IO ()
main
= defaultMain
$ testGroup "work-pool"
[ Devtools.testTree $$(Devtools.readDependencies [Devtools.Target "work-pool"])
, mkSuccess 1 1
, mkSuccess 1 100
, mkSuccess 100 1
, mkSuccess 100 100
, mkSuccess 1000 1000
, producerFailure
, workerFailure
]
where
mkSuccess :: Natural -> Natural -> TestTree
mkSuccess queueSize workerCount =
testCase ("queue size: " <> show queueSize <> ", workerCount: " <> show workerCount) $ do
output <- UnliftIO.newMVar []
WorkPool.run $ config output
assertEqual "" (Set.fromList values) =<< UnliftIO.readMVar output
where
config output = WorkPool.Config{..}
where
workerRun :: MonadUnliftIO m => Natural -> m (Natural -> m ())
workerRun _index = pure $ \value -> do
void $ UnliftIO.modifyMVar output $ \set -> pure (Set.insert value set, ())

workerFailure :: TestTree
workerFailure = testCase "worker failure" $ do
result <- UnliftIO.try (WorkPool.run config)
assertEqual "" (Left "intentional error\n") (left formatException result)
where
config = WorkPool.Config{queueSize = 100, workerCount = 100, ..}

workerRun :: MonadIO m => Natural -> m (Natural -> m ())
workerRun _index = pure $ \value ->
when (value == 100) $ UnliftIO.throwString "intentional error"

producerFailure :: TestTree
producerFailure = testCase "producer failure" $ do
result <- UnliftIO.try (WorkPool.run config)
assertEqual "" (Left "intentional error\n") (left formatException result)
where
config
= WorkPool.Config
{ produceJobs = produceJobsFailing
, queueSize = 100
, workerCount = 100
, ..
}

produceJobsFailing :: MonadIO m => WorkPool.Pool Natural -> m ()
produceJobsFailing pool = do
WorkPool.pushJob pool 1
UnliftIO.throwString "intentional error"

workerRun :: MonadIO m => Natural -> m (Natural -> m ())
workerRun _index = pure . const $ pure ()

formatException :: UnliftIO.SomeException -> String
formatException
= String.unlines
. List.drop 2
. List.take 3
. String.lines
. UnliftIO.displayException

produceJobs :: MonadIO m => WorkPool.Pool Natural -> m ()
produceJobs pool = traverse_ (WorkPool.pushJob pool) values

values :: [Natural]
values = [0..1000]
122 changes: 122 additions & 0 deletions work-pool/test/stack-9.4-dependencies.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
Diff 0.4.1
OneTuple 0.4.1.1
QuickCheck 2.14.3
StateVar 1.2.2
aeson 2.1.2.1
alex 3.3.0.0
ansi-terminal 0.11.5
ansi-terminal-types 0.11.5
ansi-wl-pprint 0.6.9
array 0.5.4.0
assoc 1.1
async 2.2.5
attoparsec 0.14.4
base 4.17.2.1
base-compat 0.12.3
base-compat-batteries 0.12.3
base-orphans 0.9.1
bifunctors 5.5.15
binary 0.8.9.1
bitvec 1.1.5.0
bytestring 0.11.5.3
call-stack 0.4.0
clock 0.8.4
cmdargs 0.10.22
colour 2.3.6
comonad 5.0.8
conduit 1.3.5
containers 0.6.7
contravariant 1.5.5
cpphs 1.20.9.1
data-default 0.7.1.1
data-default-class 0.1.2.0
data-default-instances-containers 0.0.1
data-default-instances-dlist 0.0.1
data-default-instances-old-locale 0.0.1
data-fix 0.3.2
deepseq 1.4.8.0
deriving-aeson 0.2.9
devtools 0.2.0
directory 1.3.7.1
distributive 0.6.2.1
dlist 1.0
exceptions 0.10.5
extra 1.7.14
file-embed 0.0.15.0
filepath 1.4.2.2
filepattern 0.1.3
foldable1-classes-compat 0.1
generically 0.1.1
ghc 9.4.8
ghc-bignum 1.3
ghc-boot 9.4.8
ghc-boot-th 9.4.8
ghc-heap 9.4.8
ghc-lib-parser 9.4.8.20231111
ghc-lib-parser-ex 9.4.0.0
ghc-prim 0.9.1
ghci 9.4.8
happy 1.20.1.1
hashable 1.4.3.0
hlint 3.5
hpc 0.6.1.0
hscolour 1.24.4
indexed-traversable 0.1.3
indexed-traversable-instances 0.1.1.2
integer-logarithms 1.0.3.1
libyaml 0.1.2
mono-traversable 1.0.15.3
mprelude 0.2.3
mtl 2.2.2
old-locale 1.0.0.7
optparse-applicative 0.17.1.0
parsec 3.1.16.1
polyparse 1.13
pretty 1.1.3.6
primitive 0.8.0.0
process 1.6.18.0
random 1.2.1.1
refact 0.3.0.2
resourcet 1.2.6
rts 1.0.2
safe-exceptions 0.1.7.4
scientific 0.3.7.0
semialign 1.3
semigroupoids 5.3.7
source-constraints 0.0.5
split 0.2.3.5
splitmix 0.1.0.5
stm 2.5.1.0
strict 0.5
syb 0.7.2.4
tagged 0.8.7
tasty 1.4.3
tasty-expected-failure 0.12.3
tasty-hunit 0.10.1
tasty-mgolden 0.0.2
template-haskell 2.19.0.0
terminfo 0.4.1.5
text 2.0.2
text-short 0.1.5
th-abstraction 0.4.5.0
th-lift 0.8.4
these 1.2
time 1.12.2
time-compat 1.9.6.1
transformers 0.5.6.2
transformers-compat 0.7.2
typed-process 0.2.11.1
unbounded-delays 0.1.1.1
uniplate 1.6.13
unix 2.7.3
unliftio 0.2.25.0
unliftio-core 0.2.1.0
unordered-containers 0.2.19.1
utf8-string 1.0.2
uuid-types 1.0.5.1
vector 0.13.1.0
vector-algorithms 0.9.0.1
vector-stream 0.1.0.0
witherable 0.4.2
work-pool 0.0.1
yaml 0.11.11.2
Loading

0 comments on commit 9270b5b

Please sign in to comment.