Skip to content

Commit

Permalink
use threads instead of async
Browse files Browse the repository at this point in the history
  • Loading branch information
epoberezkin committed Aug 18, 2024
1 parent 1d7c825 commit 7ea55ac
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 5 deletions.
9 changes: 5 additions & 4 deletions src/Simplex/Messaging/Agent/Client.hs
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ module Simplex.Messaging.Agent.Client
where

import Control.Applicative ((<|>))
import Control.Concurrent (ThreadId, forkIO)
import Control.Concurrent (ThreadId, killThread)
import Control.Concurrent.Async (Async, uninterruptibleCancel)
import Control.Concurrent.STM (retry)
import Control.Exception (AsyncException (..), BlockedIndefinitelyOnSTM (..))
Expand Down Expand Up @@ -266,10 +266,11 @@ import Simplex.Messaging.Transport (SMPVersion, SessionId, THandleParams (sessio
import Simplex.Messaging.Transport.Client (TransportHost (..))
import Simplex.Messaging.Util
import Simplex.Messaging.Version
import System.Mem.Weak (Weak)
import System.Mem.Weak (Weak, deRefWeak)
import System.Random (randomR)
import UnliftIO (mapConcurrently, timeout)
import UnliftIO.Async (async)
import UnliftIO.Concurrent (forkIO, mkWeakThreadId)
import UnliftIO.Directory (doesFileExist, getTemporaryDirectory, removeFile)
import qualified UnliftIO.Exception as E
import UnliftIO.STM
Expand Down Expand Up @@ -410,7 +411,7 @@ runWorkerAsync Worker {action} work =
(atomically . tryPutTMVar action) -- if it was running (or if start crashes), put it back and unlock (don't lock if it was just started)
(\a -> when (isNothing a) start) -- start worker if it's not running
where
start = atomically . putTMVar action . Just =<< async work
start = atomically . putTMVar action . Just =<< mkWeakThreadId =<< forkIO work

data AgentOperation = AONtfNetwork | AORcvNetwork | AOMsgDelivery | AOSndNetwork | AODatabase
deriving (Eq, Show)
Expand Down Expand Up @@ -905,7 +906,7 @@ closeAgentClient c = do
cancelWorker :: Worker -> IO ()
cancelWorker Worker {doWork, action} = do
noWorkToDo doWork
atomically (tryTakeTMVar action) >>= mapM_ (mapM_ uninterruptibleCancel)
atomically (tryTakeTMVar action) >>= mapM_ (mapM_ $ deRefWeak >=> mapM_ killThread)

waitUntilActive :: AgentClient -> IO ()
waitUntilActive AgentClient {active} = unlessM (readTVarIO active) $ atomically $ unlessM (readTVar active) retry
Expand Down
4 changes: 3 additions & 1 deletion src/Simplex/Messaging/Agent/Env/SQLite.hs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ module Simplex.Messaging.Agent.Env.SQLite
)
where

import Control.Concurrent (ThreadId)
import Control.Monad.Except
import Control.Monad.IO.Unlift
import Control.Monad.Reader
Expand Down Expand Up @@ -76,6 +77,7 @@ import qualified Simplex.Messaging.TMap as TM
import Simplex.Messaging.Transport (SMPVersion, TLS, Transport (..))
import Simplex.Messaging.Transport.Client (defaultSMPPort)
import Simplex.Messaging.Util (allFinally, catchAllErrors, catchAllErrors', tryAllErrors, tryAllErrors')
import System.Mem.Weak (Weak)
import System.Random (StdGen, newStdGen)
import UnliftIO (Async, SomeException)

Check warning on line 82 in src/Simplex/Messaging/Agent/Env/SQLite.hs

View workflow job for this annotation

GitHub Actions / build-ubuntu-20.04-9.6.3

The import of ‘Async’ from module ‘UnliftIO’ is redundant

Check warning on line 82 in src/Simplex/Messaging/Agent/Env/SQLite.hs

View workflow job for this annotation

GitHub Actions / build-ubuntu-22.04-9.6.3

The import of ‘Async’ from module ‘UnliftIO’ is redundant
import UnliftIO.STM
Expand Down Expand Up @@ -312,7 +314,7 @@ mkInternal = INTERNAL . show
data Worker = Worker
{ workerId :: Int,
doWork :: TMVar (),
action :: TMVar (Maybe (Async ())),
action :: TMVar (Maybe (Weak ThreadId)),
restarts :: TVar RestartCount
}

Expand Down

0 comments on commit 7ea55ac

Please sign in to comment.