From 7ea55ac505ad97cd83f17ff473ce0e0ab18e0c2f Mon Sep 17 00:00:00 2001 From: Evgeny Poberezkin Date: Sun, 18 Aug 2024 08:29:30 +0100 Subject: [PATCH] use threads instead of async --- src/Simplex/Messaging/Agent/Client.hs | 9 +++++---- src/Simplex/Messaging/Agent/Env/SQLite.hs | 4 +++- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/src/Simplex/Messaging/Agent/Client.hs b/src/Simplex/Messaging/Agent/Client.hs index 02b31cb959..fbdb535480 100644 --- a/src/Simplex/Messaging/Agent/Client.hs +++ b/src/Simplex/Messaging/Agent/Client.hs @@ -163,7 +163,7 @@ module Simplex.Messaging.Agent.Client where import Control.Applicative ((<|>)) -import Control.Concurrent (ThreadId, forkIO) +import Control.Concurrent (ThreadId, killThread) import Control.Concurrent.Async (Async, uninterruptibleCancel) import Control.Concurrent.STM (retry) import Control.Exception (AsyncException (..), BlockedIndefinitelyOnSTM (..)) @@ -266,10 +266,11 @@ import Simplex.Messaging.Transport (SMPVersion, SessionId, THandleParams (sessio import Simplex.Messaging.Transport.Client (TransportHost (..)) import Simplex.Messaging.Util import Simplex.Messaging.Version -import System.Mem.Weak (Weak) +import System.Mem.Weak (Weak, deRefWeak) import System.Random (randomR) import UnliftIO (mapConcurrently, timeout) import UnliftIO.Async (async) +import UnliftIO.Concurrent (forkIO, mkWeakThreadId) import UnliftIO.Directory (doesFileExist, getTemporaryDirectory, removeFile) import qualified UnliftIO.Exception as E import UnliftIO.STM @@ -410,7 +411,7 @@ runWorkerAsync Worker {action} work = (atomically . tryPutTMVar action) -- if it was running (or if start crashes), put it back and unlock (don't lock if it was just started) (\a -> when (isNothing a) start) -- start worker if it's not running where - start = atomically . putTMVar action . Just =<< async work + start = atomically . putTMVar action . Just =<< mkWeakThreadId =<< forkIO work data AgentOperation = AONtfNetwork | AORcvNetwork | AOMsgDelivery | AOSndNetwork | AODatabase deriving (Eq, Show) @@ -905,7 +906,7 @@ closeAgentClient c = do cancelWorker :: Worker -> IO () cancelWorker Worker {doWork, action} = do noWorkToDo doWork - atomically (tryTakeTMVar action) >>= mapM_ (mapM_ uninterruptibleCancel) + atomically (tryTakeTMVar action) >>= mapM_ (mapM_ $ deRefWeak >=> mapM_ killThread) waitUntilActive :: AgentClient -> IO () waitUntilActive AgentClient {active} = unlessM (readTVarIO active) $ atomically $ unlessM (readTVar active) retry diff --git a/src/Simplex/Messaging/Agent/Env/SQLite.hs b/src/Simplex/Messaging/Agent/Env/SQLite.hs index f57cf91e93..4c9ff8bf36 100644 --- a/src/Simplex/Messaging/Agent/Env/SQLite.hs +++ b/src/Simplex/Messaging/Agent/Env/SQLite.hs @@ -41,6 +41,7 @@ module Simplex.Messaging.Agent.Env.SQLite ) where +import Control.Concurrent (ThreadId) import Control.Monad.Except import Control.Monad.IO.Unlift import Control.Monad.Reader @@ -76,6 +77,7 @@ import qualified Simplex.Messaging.TMap as TM import Simplex.Messaging.Transport (SMPVersion, TLS, Transport (..)) import Simplex.Messaging.Transport.Client (defaultSMPPort) import Simplex.Messaging.Util (allFinally, catchAllErrors, catchAllErrors', tryAllErrors, tryAllErrors') +import System.Mem.Weak (Weak) import System.Random (StdGen, newStdGen) import UnliftIO (Async, SomeException) import UnliftIO.STM @@ -312,7 +314,7 @@ mkInternal = INTERNAL . show data Worker = Worker { workerId :: Int, doWork :: TMVar (), - action :: TMVar (Maybe (Async ())), + action :: TMVar (Maybe (Weak ThreadId)), restarts :: TVar RestartCount }