|
1 |
| --- Copyright 2022 Google LLC |
| 1 | +-- Copyright 2023 Google LLC |
2 | 2 | --
|
3 | 3 | -- Use of this source code is governed by a BSD-style
|
4 | 4 | -- license that can be found in the LICENSE file or at
|
5 | 5 | -- https://developers.google.com/open-source/licenses/bsd
|
6 | 6 |
|
7 |
| -module Actor (PChan, sendPChan, sendOnly, subChan, |
8 |
| - Actor, runActor, spawn, |
9 |
| - LogServerMsg (..), logServer) where |
| 7 | +{-# LANGUAGE UndecidableInstances #-} |
10 | 8 |
|
11 |
| -import Control.Concurrent (Chan, forkIO, newChan, readChan, ThreadId, writeChan) |
12 |
| -import Control.Monad.State.Strict |
| 9 | +module Actor ( |
| 10 | + ActorM, Actor (..), launchActor, send, selfMailbox, messageLoop, |
| 11 | + sliceMailbox, SubscribeMsg (..), IncServer, IncServerT, FileWatcher, |
| 12 | + StateServer, flushDiffs, handleSubscribeMsg, subscribe, subscribeIO, sendSync, |
| 13 | + runIncServerT, launchFileWatcher |
| 14 | + ) where |
13 | 15 |
|
14 |
| -import Util (onFst, onSnd) |
| 16 | +import Control.Concurrent |
| 17 | +import Control.Monad |
| 18 | +import Control.Monad.State.Strict hiding (get) |
| 19 | +import Control.Monad.Reader |
| 20 | +import qualified Data.ByteString as BS |
| 21 | +import Data.IORef |
| 22 | +import Data.Text.Encoding qualified as T |
| 23 | +import Data.Text (Text) |
| 24 | +import System.Directory (getModificationTime) |
| 25 | +import GHC.Generics |
15 | 26 |
|
16 |
| --- Micro-actors. |
| 27 | +import IncState |
| 28 | +import MonadUtil |
17 | 29 |
|
18 |
| --- In this model, an "actor" is just an IO computation (presumably |
19 |
| --- running on its own Haskell thread) that receives messages on a |
20 |
| --- Control.Concurrent.Chan channel. The idea is that the actor thread |
21 |
| --- only receives information (or synchronization) from other threads |
22 |
| --- through messages sent on that one channel, and no other thread |
23 |
| --- reads messages from that channel. |
| 30 | +-- === Actor implementation === |
24 | 31 |
|
25 |
| --- We start the actor with a two-way view of its input channel so it |
26 |
| --- can subscribe itself to message streams by passing (a send-only |
27 |
| --- view of) it to another actor. |
28 |
| -type Actor a = Chan a -> IO () |
| 32 | +newtype ActorM msg a = ActorM { runActorM :: ReaderT (Chan msg) IO a } |
| 33 | + deriving (Functor, Applicative, Monad, MonadIO) |
29 | 34 |
|
30 |
| --- We also define a send-only channel type, to help ourselves not |
31 |
| --- accidentally read from channels we aren't supposed to. |
32 |
| -newtype PChan a = PChan { sendPChan :: a -> IO () } |
| 35 | +newtype Mailbox a = Mailbox { sendToMailbox :: a -> IO () } |
33 | 36 |
|
34 |
| -sendOnly :: Chan a -> PChan a |
35 |
| -sendOnly chan = PChan $ \ !x -> writeChan chan x |
| 37 | +class (Show msg, MonadIO m) => Actor msg m | m -> msg where |
| 38 | + selfChan :: m (Chan msg) |
36 | 39 |
|
37 |
| -subChan :: (a -> b) -> PChan b -> PChan a |
38 |
| -subChan f chan = PChan (sendPChan chan . f) |
| 40 | +instance Show msg => Actor msg (ActorM msg) where |
| 41 | + selfChan = ActorM ask |
39 | 42 |
|
40 |
| --- Synchronously execute an actor. |
41 |
| -runActor :: Actor a -> IO () |
42 |
| -runActor actor = newChan >>= actor |
| 43 | +instance Actor msg m => Actor msg (ReaderT r m) where selfChan = lift $ selfChan |
| 44 | +instance Actor msg m => Actor msg (StateT s m) where selfChan = lift $ selfChan |
43 | 45 |
|
44 |
| --- Asynchronously launch an actor. Immediately returns permission to |
45 |
| --- kill that actor and to send it messages. |
46 |
| -spawn :: Actor a -> IO (ThreadId, PChan a) |
47 |
| -spawn actor = do |
| 46 | +send :: MonadIO m => Mailbox msg -> msg -> m () |
| 47 | +send chan msg = liftIO $ sendToMailbox chan msg |
| 48 | + |
| 49 | +selfMailbox :: Actor msg m => (a -> msg) -> m (Mailbox a) |
| 50 | +selfMailbox asSelfMessage = do |
| 51 | + chan <- selfChan |
| 52 | + return $ Mailbox \msg -> writeChan chan (asSelfMessage msg) |
| 53 | + |
| 54 | +launchActor :: MonadIO m => ActorM msg () -> m (Mailbox msg) |
| 55 | +launchActor m = liftIO do |
48 | 56 | chan <- newChan
|
49 |
| - tid <- forkIO $ actor chan |
50 |
| - return (tid, sendOnly chan) |
51 |
| - |
52 |
| --- A log server. Combines inputs monoidally and pushes incremental |
53 |
| --- updates to subscribers. |
54 |
| - |
55 |
| -data LogServerMsg a = Subscribe (PChan a) |
56 |
| - | Publish a |
57 |
| - |
58 |
| -logServer :: Monoid a => Actor (LogServerMsg a) |
59 |
| -logServer self = flip evalStateT (mempty, []) $ forever $ do |
60 |
| - msg <- liftIO $ readChan self |
61 |
| - case msg of |
62 |
| - Subscribe chan -> do |
63 |
| - curVal <- gets fst |
64 |
| - liftIO $ chan `sendPChan` curVal |
65 |
| - modify $ onSnd (chan:) |
66 |
| - Publish x -> do |
67 |
| - modify $ onFst (<> x) |
68 |
| - subscribers <- gets snd |
69 |
| - mapM_ (liftIO . (`sendPChan` x)) subscribers |
| 57 | + void $ forkIO $ runReaderT (runActorM m) chan |
| 58 | + return $ Mailbox \msg -> writeChan chan msg |
| 59 | + |
| 60 | +messageLoop :: Actor msg m => (msg -> m ()) -> m () |
| 61 | +messageLoop handleMessage = do |
| 62 | + forever do |
| 63 | + msg <- liftIO . readChan =<< selfChan |
| 64 | + handleMessage msg |
| 65 | + |
| 66 | +sliceMailbox :: (b -> a) -> Mailbox a -> Mailbox b |
| 67 | +sliceMailbox f (Mailbox sendMsg) = Mailbox $ sendMsg . f |
| 68 | + |
| 69 | +-- === Promises === |
| 70 | + |
| 71 | +newtype Promise a = Promise (MVar a) |
| 72 | +newtype PromiseSetter a = PromiseSetter (MVar a) |
| 73 | + |
| 74 | +newPromise :: MonadIO m => m (Promise a, PromiseSetter a) |
| 75 | +newPromise = do |
| 76 | + v <- liftIO $ newEmptyMVar |
| 77 | + return (Promise v, PromiseSetter v) |
| 78 | + |
| 79 | +waitForPromise :: MonadIO m => Promise a -> m a |
| 80 | +waitForPromise (Promise v) = liftIO $ readMVar v |
| 81 | + |
| 82 | +setPromise :: MonadIO m => PromiseSetter a -> a -> m () |
| 83 | +setPromise (PromiseSetter v) x = liftIO $ putMVar v x |
| 84 | + |
| 85 | +-- Message that expects a synchronous reponse |
| 86 | +data SyncMsg msg response = SyncMsg msg (PromiseSetter response) |
| 87 | + |
| 88 | +sendSync :: MonadIO m => Mailbox (SyncMsg msg response) -> msg -> m response |
| 89 | +sendSync mailbox msg = do |
| 90 | + (result, resultSetter) <- newPromise |
| 91 | + send mailbox (SyncMsg msg resultSetter) |
| 92 | + waitForPromise result |
| 93 | + |
| 94 | + |
| 95 | +-- === Diff server === |
| 96 | + |
| 97 | +data IncServerState s d = IncServerState |
| 98 | + { subscribers :: [Mailbox d] |
| 99 | + , bufferedUpdates :: d |
| 100 | + , curIncState :: s } |
| 101 | + deriving (Show, Generic) |
| 102 | + |
| 103 | +class (Monoid d, MonadIO m) => IncServer s d m | m -> s, m -> d where |
| 104 | + getIncServerStateRef :: m (IORef (IncServerState s d)) |
| 105 | + |
| 106 | +data SubscribeMsg s d = Subscribe (SyncMsg (Mailbox d) s) deriving (Show) |
| 107 | + |
| 108 | +getIncServerState :: IncServer s d m => m (IncServerState s d) |
| 109 | +getIncServerState = readRef =<< getIncServerStateRef |
| 110 | + |
| 111 | +updateIncServerState :: IncServer s d m => (IncServerState s d -> IncServerState s d) -> m () |
| 112 | +updateIncServerState f = do |
| 113 | + ref <- getIncServerStateRef |
| 114 | + prev <- readRef ref |
| 115 | + writeRef ref $ f prev |
| 116 | + |
| 117 | +handleSubscribeMsg :: IncServer s d m => SubscribeMsg s d -> m () |
| 118 | +handleSubscribeMsg (Subscribe (SyncMsg newSub response)) = do |
| 119 | + flushDiffs |
| 120 | + updateIncServerState \s -> s { subscribers = newSub : subscribers s } |
| 121 | + curState <- curIncState <$> getIncServerState |
| 122 | + setPromise response curState |
| 123 | + |
| 124 | +flushDiffs :: IncServer s d m => m () |
| 125 | +flushDiffs = do |
| 126 | + d <- bufferedUpdates <$> getIncServerState |
| 127 | + updateIncServerState \s -> s { bufferedUpdates = mempty } |
| 128 | + subs <- subscribers <$> getIncServerState |
| 129 | + -- TODO: consider testing for emptiness here |
| 130 | + forM_ subs \sub -> send sub d |
| 131 | + |
| 132 | +type StateServer s d = Mailbox (SubscribeMsg s d) |
| 133 | + |
| 134 | +subscribe :: Actor msg m => (d -> msg) -> StateServer s d -> m s |
| 135 | +subscribe inject server = do |
| 136 | + updateChannel <- selfMailbox inject |
| 137 | + sendSync (sliceMailbox Subscribe server) updateChannel |
| 138 | + |
| 139 | +subscribeIO :: StateServer s d -> IO (s, Chan d) |
| 140 | +subscribeIO server = do |
| 141 | + chan <- newChan |
| 142 | + let mailbox = Mailbox (writeChan chan) |
| 143 | + s <- sendSync (sliceMailbox Subscribe server) mailbox |
| 144 | + return (s, chan) |
| 145 | + |
| 146 | +newtype IncServerT s d m a = IncServerT { runIncServerT' :: ReaderT (Ref (IncServerState s d)) m a } |
| 147 | + deriving (Functor, Applicative, Monad, MonadIO, Actor msg, FreshNames name, MonadTrans) |
| 148 | + |
| 149 | +instance (MonadIO m, IncState s d) => IncServer s d (IncServerT s d m) where |
| 150 | + getIncServerStateRef = IncServerT ask |
| 151 | + |
| 152 | +instance (MonadIO m, IncState s d) => DefuncState d (IncServerT s d m) where |
| 153 | + update d = updateIncServerState \s -> s |
| 154 | + { bufferedUpdates = bufferedUpdates s <> d |
| 155 | + , curIncState = curIncState s `applyDiff` d} |
| 156 | + |
| 157 | +instance (MonadIO m, IncState s d) => LabelReader (SingletonLabel s) (IncServerT s d m) where |
| 158 | + getl It = curIncState <$> getIncServerState |
| 159 | + |
| 160 | +runIncServerT :: (MonadIO m, IncState s d) => s -> IncServerT s d m a -> m a |
| 161 | +runIncServerT s cont = do |
| 162 | + ref <- newRef $ IncServerState [] mempty s |
| 163 | + runReaderT (runIncServerT' cont) ref |
| 164 | + |
| 165 | +-- === Refs === |
| 166 | +-- Just a wrapper around IORef lifted to `MonadIO` |
| 167 | + |
| 168 | +type Ref = IORef |
| 169 | + |
| 170 | +newRef :: MonadIO m => a -> m (Ref a) |
| 171 | +newRef = liftIO . newIORef |
| 172 | + |
| 173 | +readRef :: MonadIO m => Ref a -> m a |
| 174 | +readRef = liftIO . readIORef |
| 175 | + |
| 176 | +writeRef :: MonadIO m => Ref a -> a -> m () |
| 177 | +writeRef ref val = liftIO $ writeIORef ref val |
| 178 | + |
| 179 | +-- === Clock === |
| 180 | + |
| 181 | +-- Provides a periodic clock signal. The time interval is in microseconds. |
| 182 | +launchClock :: MonadIO m => Int -> Mailbox () -> m () |
| 183 | +launchClock intervalMicroseconds mailbox = |
| 184 | + liftIO $ void $ forkIO $ forever do |
| 185 | + threadDelay intervalMicroseconds |
| 186 | + send mailbox () |
| 187 | + |
| 188 | +-- === File watcher === |
| 189 | + |
| 190 | +type SourceFileContents = Text |
| 191 | +type FileWatcher = StateServer SourceFileContents (Overwrite SourceFileContents) |
| 192 | + |
| 193 | +readFileContents :: MonadIO m => FilePath -> m Text |
| 194 | +readFileContents path = liftIO $ T.decodeUtf8 <$> BS.readFile path |
| 195 | + |
| 196 | +data FileWatcherMsg = |
| 197 | + ClockSignal_FW () |
| 198 | + | Subscribe_FW (SubscribeMsg Text (Overwrite Text)) |
| 199 | + deriving (Show) |
| 200 | + |
| 201 | +launchFileWatcher :: MonadIO m => FilePath -> m FileWatcher |
| 202 | +launchFileWatcher path = sliceMailbox Subscribe_FW <$> launchActor (fileWatcherImpl path) |
| 203 | + |
| 204 | +fileWatcherImpl :: FilePath -> ActorM FileWatcherMsg () |
| 205 | +fileWatcherImpl path = do |
| 206 | + initContents <- readFileContents path |
| 207 | + t0 <- liftIO $ getModificationTime path |
| 208 | + launchClock 100000 =<< selfMailbox ClockSignal_FW |
| 209 | + modTimeRef <- newRef t0 |
| 210 | + runIncServerT initContents $ messageLoop \case |
| 211 | + Subscribe_FW msg -> handleSubscribeMsg msg |
| 212 | + ClockSignal_FW () -> do |
| 213 | + tOld <- readRef modTimeRef |
| 214 | + tNew <- liftIO $ getModificationTime path |
| 215 | + when (tNew /= tOld) do |
| 216 | + newContents <- readFileContents path |
| 217 | + update $ OverwriteWith newContents |
| 218 | + flushDiffs |
| 219 | + writeRef modTimeRef tNew |
| 220 | + |
| 221 | +-- === instances === |
| 222 | + |
| 223 | +instance Show msg => Show (SyncMsg msg response) where |
| 224 | + show (SyncMsg msg _) = show msg |
| 225 | + |
| 226 | +instance Show (Mailbox a) where |
| 227 | + show _ = "mailbox" |
70 | 228 |
|
| 229 | +deriving instance Actor msg m => Actor msg (FreshNameT m) |
0 commit comments