diff --git a/rfcs/2023-09-28-journal-mode-wal.md b/rfcs/2023-09-28-journal-mode-wal.md new file mode 100644 index 000000000..58c435d90 --- /dev/null +++ b/rfcs/2023-09-28-journal-mode-wal.md @@ -0,0 +1,30 @@ +# Switching database to WAL mode + +## Problem + +1. Slow writes when sending messages to large groups. + +A possible solution is batching multiple writes into a single transaction, which is attempted for sending messages in #847 / #3067. The problem with that approach is that it substantially complicates the code and has to be done for other scenarios separately (e.g., broadcasting profile updates, which is even more complex as different messages have to be sent to different contacts, to account for preference overrides). + +2. Conflicts for the database access from multiple processes (iOS app and NSE). + +A possible solution is better coordination of access than currently implemented, but it is substantially more complex, particularly if additional extensions are added. + +## Solution + +A proposed solution is to increase page_size to 16kb (from 4kb) and switch to WAL mode. This should improve write performance and reduce conflicts. + +Problems with this soltion: +- old versions of the app won't be taking into account WAL file when exporting. Possible solutions are: + - make it non-reversible change (that is, without down migration). + - checkpoint and switch database to DELETE mode when exporting. +- windows closes the database connection when the app is stopped, so we can no longer do any operations prior to exporting without providing database key. Possible solutions are: + - always checkpoint and move to DELETE mode when stopping and move back to WAL mode when starting. + - what else? + +Switching to 16kb block also requires a process: +- set it first +- run VACUUM (this will change block size) +- only then the database can be switched to WAL mode + +If the database is already in WAL mode it needs to be switched to DELETE mode before block size change will happen on VACUUM. diff --git a/src/Simplex/Messaging/Agent.hs b/src/Simplex/Messaging/Agent.hs index ae4819762..ce05a408b 100644 --- a/src/Simplex/Messaging/Agent.hs +++ b/src/Simplex/Messaging/Agent.hs @@ -40,6 +40,7 @@ module Simplex.Messaging.Agent SubscriptionsInfo (..), getSMPAgentClient, disconnectAgentClient, + disposeAgentClient, resumeAgentClient, withConnLock, withInvLock, @@ -188,6 +189,11 @@ disconnectAgentClient c@AgentClient {agentEnv = Env {ntfSupervisor = ns, xftpAge closeXFTPAgent xa logConnection c False +disposeAgentClient :: MonadUnliftIO m => AgentClient -> m () +disposeAgentClient c@AgentClient {agentEnv = Env {store}} = do + disconnectAgentClient c + liftIO $ closeSQLiteStore store + resumeAgentClient :: MonadIO m => AgentClient -> m () resumeAgentClient c = atomically $ writeTVar (active c) True diff --git a/src/Simplex/Messaging/Agent/Store/SQLite.hs b/src/Simplex/Messaging/Agent/Store/SQLite.hs index 72d4c261a..cb7f88561 100644 --- a/src/Simplex/Messaging/Agent/Store/SQLite.hs +++ b/src/Simplex/Messaging/Agent/Store/SQLite.hs @@ -28,10 +28,17 @@ module Simplex.Messaging.Agent.Store.SQLite MigrationConfirmation (..), MigrationError (..), UpMigration (..), + SQLiteJournalMode (..), createSQLiteStore, connectSQLiteStore, closeSQLiteStore, openSQLiteStore, + checkpointSQLiteStore, + backupSQLiteStore, + restoreSQLiteStore, + removeSQLiteStore, + setSQLiteJournalMode, + getSQLiteJournalMode, sqlString, execSQL, upMigration, -- used in tests @@ -219,12 +226,14 @@ import Control.Monad import Control.Monad.Except import Control.Monad.IO.Class import Crypto.Random (ChaChaDRG) +import Data.Aeson (FromJSON (..), ToJSON (..)) import qualified Data.Aeson.TH as J import qualified Data.Attoparsec.ByteString.Char8 as A import Data.Bifunctor (second) import Data.ByteString (ByteString) import qualified Data.ByteString.Base64.URL as U import Data.Char (toLower) +import Data.Either (fromRight) import Data.Functor (($>)) import Data.IORef import Data.Int (Int64) @@ -268,9 +277,9 @@ import Simplex.Messaging.Parsers (blobFieldParser, defaultJSON, dropPrefix, from import Simplex.Messaging.Protocol import qualified Simplex.Messaging.Protocol as SMP import Simplex.Messaging.Transport.Client (TransportHost) -import Simplex.Messaging.Util (bshow, eitherToMaybe, groupOn, ifM, ($>>=), (<$$>)) +import Simplex.Messaging.Util (bshow, eitherToMaybe, groupOn, ifM, unlessM, whenM, ($>>=), (<$$>)) import Simplex.Messaging.Version -import System.Directory (copyFile, createDirectoryIfMissing, doesFileExist) +import System.Directory (copyFile, createDirectoryIfMissing, doesFileExist, removeFile) import System.Exit (exitFailure) import System.FilePath (takeDirectory) import System.IO (hFlush, stdout) @@ -317,6 +326,33 @@ instance StrEncoding MigrationConfirmation where "error" -> pure MCError _ -> fail "invalid MigrationConfirmation" +data SQLiteJournalMode = SQLModeWAL | SQLModeDelete | SQLMode Text + deriving (Show) + +instance StrEncoding SQLiteJournalMode where + strEncode = \case + SQLModeWAL -> "wal" + SQLModeDelete -> "delete" + SQLMode s -> encodeUtf8 s + strP = do + s <- A.takeTill (== ' ') + pure $ case s of + "wal" -> SQLModeWAL + "WAL" -> SQLModeWAL + "delete" -> SQLModeDelete + "DELETE" -> SQLModeDelete + _ -> SQLMode $ decodeLatin1 s + +decodeJournalMode :: Text -> SQLiteJournalMode +decodeJournalMode s = fromRight (SQLMode s) $ strDecode $ encodeUtf8 s + +instance ToJSON SQLiteJournalMode where + toJSON = strToJSON + toEncoding = strToJEncoding + +instance FromJSON SQLiteJournalMode where + parseJSON = strParseJSON "SQLiteJournalMode" + createSQLiteStore :: FilePath -> String -> [Migration] -> MigrationConfirmation -> IO (Either MigrationError SQLiteStore) createSQLiteStore dbFilePath dbKey migrations confirmMigrations = do let dbDir = takeDirectory dbFilePath @@ -354,11 +390,44 @@ migrateSchema st migrations confirmMigrations = do where confirm err = confirmOrExit $ migrationErrorDescription err run ms = do - let f = dbFilePath st - copyFile f (f <> ".bak") + withConnection st $ \db -> do + execSQL_ db "PRAGMA wal_checkpoint(TRUNCATE);" + backupSQLiteStore st Migrations.run st ms pure $ Right () +-- names are chosen to make .bak file a valid database with WAL files +backupSQLiteStore :: SQLiteStore -> IO () +backupSQLiteStore st = do + let f = dbFilePath st + fBak = f <> ".bak" + copyWhenExists f fBak + copyWhenExists (f <> "-wal") (fBak <> "-wal") + copyWhenExists (f <> "-shm") (fBak <> "-shm") + +restoreSQLiteStore :: SQLiteStore -> IO () +restoreSQLiteStore st = do + let f = dbFilePath st + fBak = f <> ".bak" + copyWhenExists fBak f + copyWhenExists (fBak <> "-wal") (f <> "-wal") + copyWhenExists (fBak <> "-shm") (f <> "-shm") + +copyWhenExists :: FilePath -> FilePath -> IO () +copyWhenExists f f' = whenM (doesFileExist f) $ copyFile f f' + +removeSQLiteStore :: SQLiteStore -> IO () +removeSQLiteStore st = do + let f = dbFilePath st + removeDB f + removeDB (f <> ".bak") + where + removeDB f = do + remove f + remove (f <> "-wal") + remove (f <> "-shm") + remove f = whenM (doesFileExist f) $ removeFile f + confirmOrExit :: String -> IO () confirmOrExit s = do putStrLn s @@ -380,27 +449,23 @@ connectSQLiteStore dbFilePath dbKey = do connectDB :: FilePath -> String -> IO DB.Connection connectDB path key = do db <- DB.open path - prepare db `onException` DB.close db + execSQL_ db openSQL `onException` DB.close db -- _printPragmas db path pure db where - prepare db = do - let exec = SQLite3.exec $ SQL.connectionHandle $ DB.conn db - unless (null key) . exec $ "PRAGMA key = " <> sqlString key <> ";" - exec . fromQuery $ - [sql| - PRAGMA busy_timeout = 100; - PRAGMA foreign_keys = ON; - -- PRAGMA trusted_schema = OFF; - PRAGMA secure_delete = ON; - PRAGMA auto_vacuum = FULL; - |] + openSQL = + (if null key then "" else "PRAGMA key = " <> sqlString key <> ";\n") + <> "PRAGMA journal_mode = WAL;\n\ + \PRAGMA busy_timeout = 100;\n\ + \PRAGMA foreign_keys = ON;\n\ + \PRAGMA trusted_schema = OFF;\n\ + \PRAGMA secure_delete = ON;\n" closeSQLiteStore :: SQLiteStore -> IO () closeSQLiteStore st@SQLiteStore {dbClosed} = ifM (readTVarIO dbClosed) (putStrLn "closeSQLiteStore: already closed") $ - withConnection st $ \conn -> do - DB.close conn + withConnection st $ \db -> do + DB.close db atomically $ writeTVar dbClosed True openSQLiteStore :: SQLiteStore -> String -> IO () @@ -417,6 +482,26 @@ openSQLiteStore SQLiteStore {dbConnection, dbFilePath, dbClosed} key = putTMVar dbConnection DB.Connection {conn, slow} writeTVar dbClosed False +checkpointSQLiteStore :: SQLiteStore -> IO () +checkpointSQLiteStore st = + unlessM (readTVarIO $ dbClosed st) $ + withConnection st (`execSQL_` "PRAGMA wal_checkpoint(TRUNCATE);") + +setSQLiteJournalMode :: SQLiteStore -> SQLiteJournalMode -> IO () +setSQLiteJournalMode st mode = + withConnection st (`execSQL_` q) + where + q = case mode of + SQLModeWAL -> "PRAGMA journal_mode = WAL;" + SQLModeDelete -> "PRAGMA journal_mode = DELETE;" + SQLMode s -> "PRAGMA journal_mode = " <> s <> ";" + +getSQLiteJournalMode :: SQLiteStore -> IO SQLiteJournalMode +getSQLiteJournalMode st = + withConnection st $ \db -> do + [Only mode] <- DB.query_ db "PRAGMA journal_mode;" :: IO [Only Text] + pure $ decodeJournalMode mode + sqlString :: String -> Text sqlString s = quote <> T.replace quote "''" (T.pack s) <> quote where @@ -434,6 +519,9 @@ sqlString s = quote <> T.replace quote "''" (T.pack s) <> quote -- auto_vacuum <- DB.query_ db "PRAGMA auto_vacuum;" :: IO [[Int]] -- print $ path <> " auto_vacuum: " <> show auto_vacuum +execSQL_ :: DB.Connection -> Text -> IO () +execSQL_ = SQLite3.exec . SQL.connectionHandle . DB.conn + execSQL :: DB.Connection -> Text -> IO [Text] execSQL db query = do rs <- newIORef [] diff --git a/src/Simplex/Messaging/Agent/Store/SQLite/Migrations.hs b/src/Simplex/Messaging/Agent/Store/SQLite/Migrations.hs index 8ce7d6514..0fc35c270 100644 --- a/src/Simplex/Messaging/Agent/Store/SQLite/Migrations.hs +++ b/src/Simplex/Messaging/Agent/Store/SQLite/Migrations.hs @@ -118,7 +118,7 @@ getCurrent db = map toMigration <$> DB.query_ db "SELECT name, down FROM migrati run :: SQLiteStore -> MigrationsToRun -> IO () run st = \case MTRUp [] -> pure () - MTRUp ms -> mapM_ runUp ms >> withConnection' st (`execSQL` "VACUUM;") + MTRUp ms -> mapM_ runUp ms >> withConnection' st (`execSQL` "VACUUM; PRAGMA wal_checkpoint(TRUNCATE);") MTRDown ms -> mapM_ runDown $ reverse ms MTRNone -> pure () where diff --git a/tests/AgentTests/FunctionalAPITests.hs b/tests/AgentTests/FunctionalAPITests.hs index 8c37579fc..92bc2d5bc 100644 --- a/tests/AgentTests/FunctionalAPITests.hs +++ b/tests/AgentTests/FunctionalAPITests.hs @@ -49,10 +49,10 @@ import Data.Type.Equality import SMPAgentClient import SMPClient (cfg, testPort, testPort2, testStoreLogFile2, withSmpServer, withSmpServerConfigOn, withSmpServerOn, withSmpServerStoreLogOn, withSmpServerStoreMsgLogOn) import Simplex.Messaging.Agent -import Simplex.Messaging.Agent.Client (ProtocolTestFailure (..), ProtocolTestStep (..)) +import Simplex.Messaging.Agent.Client (ProtocolTestFailure (..), ProtocolTestStep (..), agentClientStore, ) import Simplex.Messaging.Agent.Env.SQLite (AgentConfig (..), InitialAgentServers (..), createAgentStore) import Simplex.Messaging.Agent.Protocol as Agent -import Simplex.Messaging.Agent.Store.SQLite (MigrationConfirmation (..)) +import Simplex.Messaging.Agent.Store.SQLite (MigrationConfirmation (..), checkpointSQLiteStore) import Simplex.Messaging.Client (NetworkConfig (..), ProtocolClientConfig (..), TransportSessionMode (TSMEntity, TSMUser), defaultClientConfig) import qualified Simplex.Messaging.Crypto as C import Simplex.Messaging.Encoding.String @@ -358,8 +358,8 @@ withAgentClientsCfg2 aCfg bCfg runTest = do a <- getSMPAgentClient' aCfg initAgentServers testDB b <- getSMPAgentClient' bCfg initAgentServers testDB2 runTest a b - disconnectAgentClient a - disconnectAgentClient b + disposeAgentClient a + disposeAgentClient b withAgentClients2 :: (AgentClient -> AgentClient -> IO ()) -> IO () withAgentClients2 = withAgentClientsCfg2 agentCfg agentCfg @@ -448,7 +448,7 @@ testAsyncInitiatingOffline :: HasCallStack => IO () testAsyncInitiatingOffline = withAgentClients2 $ \alice bob -> runRight_ $ do (bobId, cReq) <- createConnection alice 1 True SCMInvitation Nothing SMSubscribe - disconnectAgentClient alice + disposeAgentClient alice aliceId <- joinConnection bob 1 True cReq "bob's connInfo" SMSubscribe alice' <- liftIO $ getSMPAgentClient' agentCfg initAgentServers testDB subscribeConnection alice' bobId @@ -464,7 +464,7 @@ testAsyncJoiningOfflineBeforeActivation = withAgentClients2 $ \alice bob -> runRight_ $ do (bobId, qInfo) <- createConnection alice 1 True SCMInvitation Nothing SMSubscribe aliceId <- joinConnection bob 1 True qInfo "bob's connInfo" SMSubscribe - disconnectAgentClient bob + disposeAgentClient bob ("", _, CONF confId _ "bob's connInfo") <- get alice allowConnection alice bobId confId "alice's connInfo" bob' <- liftIO $ getSMPAgentClient' agentCfg initAgentServers testDB2 @@ -478,9 +478,9 @@ testAsyncBothOffline :: HasCallStack => IO () testAsyncBothOffline = withAgentClients2 $ \alice bob -> runRight_ $ do (bobId, cReq) <- createConnection alice 1 True SCMInvitation Nothing SMSubscribe - disconnectAgentClient alice + disposeAgentClient alice aliceId <- joinConnection bob 1 True cReq "bob's connInfo" SMSubscribe - disconnectAgentClient bob + disposeAgentClient bob alice' <- liftIO $ getSMPAgentClient' agentCfg initAgentServers testDB subscribeConnection alice' bobId ("", _, CONF confId _ "bob's connInfo") <- get alice' @@ -524,7 +524,7 @@ testAsyncHelloTimeout = do agentCfgV1 = agentCfg {smpAgentVRange = vr11, smpClientVRange = vr11, e2eEncryptVRange = vr11, smpCfg = smpCfgV1} withAgentClientsCfg2 agentCfgV1 agentCfg {helloTimeout = 1} $ \alice bob -> runRight_ $ do (_, cReq) <- createConnection alice 1 True SCMInvitation Nothing SMSubscribe - disconnectAgentClient alice + disposeAgentClient alice aliceId <- joinConnection bob 1 True cReq "bob's connInfo" SMSubscribe get bob ##> ("", aliceId, ERR $ CONN NOT_ACCEPTED) @@ -550,7 +550,7 @@ testAllowConnectionClientRestart t = do pure () threadDelay 100000 -- give time to enqueue confirmation (enqueueConfirmation) - disconnectAgentClient alice + disposeAgentClient alice alice2 <- getSMPAgentClient' agentCfg initAgentServers testDB @@ -565,8 +565,8 @@ testAllowConnectionClientRestart t = do get bob ##> ("", aliceId, CON) exchangeGreetingsMsgId 4 alice2 bobId bob aliceId - disconnectAgentClient alice2 - disconnectAgentClient bob + disposeAgentClient alice2 + disposeAgentClient bob testIncreaseConnAgentVersion :: HasCallStack => ATransport -> IO () testIncreaseConnAgentVersion t = do @@ -582,7 +582,7 @@ testIncreaseConnAgentVersion t = do -- version doesn't increase if incompatible - disconnectAgentClient alice + disposeAgentClient alice alice2 <- getSMPAgentClient' agentCfg {smpAgentVRange = mkVersionRange 1 3} initAgentServers testDB runRight_ $ do @@ -593,7 +593,7 @@ testIncreaseConnAgentVersion t = do -- version increases if compatible - disconnectAgentClient bob + disposeAgentClient bob bob2 <- getSMPAgentClient' agentCfg {smpAgentVRange = mkVersionRange 1 3} initAgentServers testDB2 runRight_ $ do @@ -604,7 +604,7 @@ testIncreaseConnAgentVersion t = do -- version doesn't decrease, even if incompatible - disconnectAgentClient alice2 + disposeAgentClient alice2 alice3 <- getSMPAgentClient' agentCfg {smpAgentVRange = mkVersionRange 2 2} initAgentServers testDB runRight_ $ do @@ -613,7 +613,7 @@ testIncreaseConnAgentVersion t = do checkVersion alice3 bobId 3 checkVersion bob2 aliceId 3 - disconnectAgentClient bob2 + disposeAgentClient bob2 bob3 <- getSMPAgentClient' agentCfg {smpAgentVRange = mkVersionRange 1 1} initAgentServers testDB2 runRight_ $ do @@ -621,8 +621,8 @@ testIncreaseConnAgentVersion t = do exchangeGreetingsMsgId 12 alice3 bobId bob3 aliceId checkVersion alice3 bobId 3 checkVersion bob3 aliceId 3 - disconnectAgentClient alice3 - disconnectAgentClient bob3 + disposeAgentClient alice3 + disposeAgentClient bob3 checkVersion :: AgentClient -> ConnId -> Version -> ExceptT AgentErrorType IO () checkVersion c connId v = do @@ -643,9 +643,9 @@ testIncreaseConnAgentVersionMaxCompatible t = do -- version increases to max compatible - disconnectAgentClient alice + disposeAgentClient alice alice2 <- getSMPAgentClient' agentCfg {smpAgentVRange = mkVersionRange 1 3} initAgentServers testDB - disconnectAgentClient bob + disposeAgentClient bob bob2 <- getSMPAgentClient' agentCfg {smpAgentVRange = mkVersionRange 1 4} initAgentServers testDB2 runRight_ $ do @@ -654,8 +654,8 @@ testIncreaseConnAgentVersionMaxCompatible t = do exchangeGreetingsMsgId 6 alice2 bobId bob2 aliceId checkVersion alice2 bobId 3 checkVersion bob2 aliceId 3 - disconnectAgentClient alice2 - disconnectAgentClient bob2 + disposeAgentClient alice2 + disposeAgentClient bob2 testIncreaseConnAgentVersionStartDifferentVersion :: HasCallStack => ATransport -> IO () testIncreaseConnAgentVersionStartDifferentVersion t = do @@ -671,7 +671,7 @@ testIncreaseConnAgentVersionStartDifferentVersion t = do -- version increases to max compatible - disconnectAgentClient alice + disposeAgentClient alice alice2 <- getSMPAgentClient' agentCfg {smpAgentVRange = mkVersionRange 1 3} initAgentServers testDB runRight_ $ do @@ -679,8 +679,8 @@ testIncreaseConnAgentVersionStartDifferentVersion t = do exchangeGreetingsMsgId 6 alice2 bobId bob aliceId checkVersion alice2 bobId 3 checkVersion bob aliceId 3 - disconnectAgentClient alice2 - disconnectAgentClient bob + disposeAgentClient alice2 + disposeAgentClient bob testDeliverClientRestart :: HasCallStack => ATransport -> IO () testDeliverClientRestart t = do @@ -698,7 +698,7 @@ testDeliverClientRestart t = do 6 <- runRight $ sendMessage bob aliceId SMP.noMsgFlags "hello" - disconnectAgentClient bob + disposeAgentClient bob bob2 <- getSMPAgentClient' agentCfg initAgentServers testDB2 @@ -710,8 +710,8 @@ testDeliverClientRestart t = do get bob2 ##> ("", aliceId, SENT 6) get alice =##> \case ("", c, Msg "hello") -> c == bobId; _ -> False - disconnectAgentClient alice - disconnectAgentClient bob2 + disposeAgentClient alice + disposeAgentClient bob2 testDuplicateMessage :: HasCallStack => ATransport -> IO () testDuplicateMessage t = do @@ -723,7 +723,7 @@ testDuplicateMessage t = do 4 <- sendMessage alice bobId SMP.noMsgFlags "hello" get alice ##> ("", bobId, SENT 4) get bob =##> \case ("", c, Msg "hello") -> c == aliceId; _ -> False - disconnectAgentClient bob + disposeAgentClient bob -- if the agent user did not send ACK, the message will be delivered again bob1 <- getSMPAgentClient' agentCfg initAgentServers testDB2 @@ -744,8 +744,8 @@ testDuplicateMessage t = do threadDelay 200000 Left (BROKER _ TIMEOUT) <- runExceptT $ ackMessage bob1 aliceId 5 Nothing - disconnectAgentClient alice - disconnectAgentClient bob1 + disposeAgentClient alice + disposeAgentClient bob1 alice2 <- getSMPAgentClient' agentCfg initAgentServers testDB bob2 <- getSMPAgentClient' agentCfg initAgentServers testDB2 @@ -760,8 +760,8 @@ testDuplicateMessage t = do 6 <- sendMessage alice2 bobId SMP.noMsgFlags "hello 3" get alice2 ##> ("", bobId, SENT 6) get bob2 =##> \case ("", c, Msg "hello 3") -> c == aliceId; _ -> False - disconnectAgentClient alice2 - disconnectAgentClient bob2 + disposeAgentClient alice2 + disposeAgentClient bob2 testSkippedMessages :: HasCallStack => ATransport -> IO () testSkippedMessages t = do @@ -775,7 +775,7 @@ testSkippedMessages t = do get bob =##> \case ("", c, Msg "hello") -> c == aliceId; _ -> False ackMessage bob aliceId 4 Nothing - disconnectAgentClient bob + disposeAgentClient bob runRight_ $ do 5 <- sendMessage alice bobId SMP.noMsgFlags "hello 2" @@ -790,7 +790,7 @@ testSkippedMessages t = do nGet alice =##> \case ("", "", DOWN _ [c]) -> c == bobId; _ -> False threadDelay 200000 - disconnectAgentClient alice + disposeAgentClient alice alice2 <- getSMPAgentClient' agentCfg initAgentServers testDB bob2 <- getSMPAgentClient' agentCfg initAgentServers testDB2 @@ -809,8 +809,8 @@ testSkippedMessages t = do get alice2 ##> ("", bobId, SENT 9) get bob2 =##> \case ("", c, Msg "hello 6") -> c == aliceId; _ -> False ackMessage bob2 aliceId 6 Nothing - disconnectAgentClient alice2 - disconnectAgentClient bob2 + disposeAgentClient alice2 + disposeAgentClient bob2 testRatchetSync :: HasCallStack => ATransport -> IO () testRatchetSync t = withAgentClients2 $ \alice bob -> @@ -839,7 +839,9 @@ setupDesynchronizedRatchet alice bob = do get alice =##> \case ("", c, Msg "hello 2") -> c == bobId; _ -> False ackMessage alice bobId 5 Nothing - liftIO $ copyFile testDB2 (testDB2 <> ".bak") + liftIO $ do + checkpointSQLiteStore $ agentClientStore bob + copyFile testDB2 (testDB2 <> ".bak") 6 <- sendMessage alice bobId SMP.noMsgFlags "hello 3" get alice ##> ("", bobId, SENT 6) @@ -851,7 +853,7 @@ setupDesynchronizedRatchet alice bob = do get alice =##> \case ("", c, Msg "hello 4") -> c == bobId; _ -> False ackMessage alice bobId 7 Nothing - disconnectAgentClient bob + disposeAgentClient bob -- importing database backup after progressing ratchet de-synchronizes ratchet liftIO $ renameFile (testDB2 <> ".bak") testDB2 @@ -924,7 +926,7 @@ testRatchetSyncClientRestart t = do ("", "", DOWN _ _) <- nGet bob2 ConnectionStats {ratchetSyncState} <- runRight $ synchronizeRatchet bob2 aliceId False liftIO $ ratchetSyncState `shouldBe` RSStarted - disconnectAgentClient bob2 + disposeAgentClient bob2 bob3 <- getSMPAgentClient' agentCfg initAgentServers testDB2 withSmpServerStoreMsgLogOn t testPort $ \_ -> do runRight_ $ do @@ -935,9 +937,9 @@ testRatchetSyncClientRestart t = do get alice =##> ratchetSyncP bobId RSOk get bob3 =##> ratchetSyncP aliceId RSOk exchangeGreetingsMsgIds alice bobId 12 bob3 aliceId 9 - disconnectAgentClient alice - disconnectAgentClient bob - disconnectAgentClient bob3 + disposeAgentClient alice + disposeAgentClient bob + disposeAgentClient bob3 testRatchetSyncSuspendForeground :: HasCallStack => ATransport -> IO () testRatchetSyncSuspendForeground t = do @@ -969,9 +971,9 @@ testRatchetSyncSuspendForeground t = do get alice =##> ratchetSyncP bobId RSOk get bob2 =##> ratchetSyncP aliceId RSOk exchangeGreetingsMsgIds alice bobId 12 bob2 aliceId 9 - disconnectAgentClient alice - disconnectAgentClient bob - disconnectAgentClient bob2 + disposeAgentClient alice + disposeAgentClient bob + disposeAgentClient bob2 testRatchetSyncSimultaneous :: HasCallStack => ATransport -> IO () testRatchetSyncSimultaneous t = do @@ -1002,9 +1004,9 @@ testRatchetSyncSimultaneous t = do get alice =##> ratchetSyncP bobId RSOk get bob2 =##> ratchetSyncP aliceId RSOk exchangeGreetingsMsgIds alice bobId 12 bob2 aliceId 9 - disconnectAgentClient alice - disconnectAgentClient bob - disconnectAgentClient bob2 + disposeAgentClient alice + disposeAgentClient bob + disposeAgentClient bob2 testOnlyCreatePull :: IO () testOnlyCreatePull = withAgentClients2 $ \alice bob -> runRight_ $ do @@ -1061,7 +1063,7 @@ testInactiveClientDisconnected t = do runRight_ $ do (connId, _cReq) <- createConnection alice 1 True SCMInvitation Nothing SMSubscribe nGet alice ##> ("", "", DOWN testSMPServer [connId]) - disconnectAgentClient alice + disposeAgentClient alice testActiveClientNotDisconnected :: ATransport -> IO () testActiveClientNotDisconnected t = do @@ -1072,7 +1074,7 @@ testActiveClientNotDisconnected t = do runRight_ $ do (connId, _cReq) <- createConnection alice 1 True SCMInvitation Nothing SMSubscribe keepSubscribing alice connId ts - disconnectAgentClient alice + disposeAgentClient alice where keepSubscribing :: AgentClient -> ConnId -> SystemTime -> ExceptT AgentErrorType IO () keepSubscribing alice connId ts = do @@ -1191,8 +1193,8 @@ testBatchedSubscriptions nCreate nDel t = do delete b aIds' deleteFail a bIds' deleteFail b aIds' - disconnectAgentClient a - disconnectAgentClient b + disposeAgentClient a + disposeAgentClient b where subscribe :: AgentClient -> [ConnId] -> ExceptT AgentErrorType IO () subscribe c cs = do @@ -1271,14 +1273,14 @@ testAsyncCommandsRestore t = do alice <- getSMPAgentClient' agentCfg initAgentServers testDB bobId <- runRight $ createConnectionAsync alice 1 "1" True SCMInvitation SMSubscribe liftIO $ noMessages alice "alice doesn't receive INV because server is down" - disconnectAgentClient alice + disposeAgentClient alice alice' <- liftIO $ getSMPAgentClient' agentCfg initAgentServers testDB withSmpServerStoreLogOn t testPort $ \_ -> do runRight_ $ do subscribeConnection alice' bobId ("1", _, INV _) <- get alice' pure () - disconnectAgentClient alice' + disposeAgentClient alice' testAcceptContactAsync :: IO () testAcceptContactAsync = @@ -1337,7 +1339,7 @@ testDeleteConnectionAsync t = do get a =##> \case ("", c, DEL_CONN) -> c `elem` connIds; _ -> False get a =##> \case ("", c, DEL_CONN) -> c `elem` connIds; _ -> False liftIO $ noMessages a "nothing else should be delivered to alice" - disconnectAgentClient a + disposeAgentClient a testJoinConnectionAsyncReplyError :: HasCallStack => ATransport -> IO () testJoinConnectionAsyncReplyError t = do @@ -1378,8 +1380,8 @@ testJoinConnectionAsyncReplyError t = do get b ##> ("", aId, INFO "alice's connInfo") get b ##> ("", aId, CON) exchangeGreetings a bId b aId - disconnectAgentClient a - disconnectAgentClient b + disposeAgentClient a + disposeAgentClient b testUsers :: IO () testUsers = @@ -1442,8 +1444,8 @@ testSwitchConnection servers = do exchangeGreetingsMsgId 4 a bId b aId testFullSwitch a bId b aId 10 testFullSwitch a bId b aId 16 - disconnectAgentClient a - disconnectAgentClient b + disposeAgentClient a + disposeAgentClient b testFullSwitch :: AgentClient -> ByteString -> AgentClient -> ByteString -> Int64 -> ExceptT AgentErrorType IO () testFullSwitch a bId b aId msgId = do @@ -1524,7 +1526,7 @@ testSwitchAsync servers = do withB = withAgent agentCfg {initialClientId = 1} servers testDB2 withAgent :: AgentConfig -> InitialAgentServers -> FilePath -> (AgentClient -> IO a) -> IO a -withAgent cfg' servers dbPath = bracket (getSMPAgentClient' cfg' servers dbPath) disconnectAgentClient +withAgent cfg' servers dbPath = bracket (getSMPAgentClient' cfg' servers dbPath) disposeAgentClient sessionSubscribe :: (forall a. (AgentClient -> IO a) -> IO a) -> [ConnId] -> (AgentClient -> ExceptT AgentErrorType IO ()) -> IO () sessionSubscribe withC connIds a = @@ -1542,7 +1544,7 @@ testSwitchDelete servers = do runRight_ $ do (aId, bId) <- makeConnection a b exchangeGreetingsMsgId 4 a bId b aId - disconnectAgentClient b + disposeAgentClient b stats <- switchConnectionAsync a "" bId liftIO $ rcvSwchStatuses' stats `shouldMatchList` [Just RSSwitchStarted] phaseRcv a bId SPStarted [Just RSSendingQADD, Nothing] @@ -1551,8 +1553,8 @@ testSwitchDelete servers = do get a =##> \case ("", c, DEL_RCVQ _ _ Nothing) -> c == bId; _ -> False get a =##> \case ("", c, DEL_CONN) -> c == bId; _ -> False liftIO $ noMessages a "nothing else should be delivered to alice" - disconnectAgentClient a - disconnectAgentClient b + disposeAgentClient a + disposeAgentClient b testAbortSwitchStarted :: HasCallStack => InitialAgentServers -> IO () testAbortSwitchStarted servers = do @@ -1840,8 +1842,8 @@ testCreateQueueAuth clnt1 clnt2 = do get b ##> ("", aId, CON) exchangeGreetings a bId b aId pure 2 - disconnectAgentClient a - disconnectAgentClient b + disposeAgentClient a + disposeAgentClient b pure r where getClient (clntAuth, clntVersion) = @@ -1904,8 +1906,8 @@ testDeliveryReceiptsVersion t = do liftIO $ noMessages b "no delivery receipt (unsupported version)" pure (aId, bId) - disconnectAgentClient a - disconnectAgentClient b + disposeAgentClient a + disposeAgentClient b a' <- getSMPAgentClient' agentCfg {smpAgentVRange = mkVersionRange 1 4} initAgentServers testDB b' <- getSMPAgentClient' agentCfg {smpAgentVRange = mkVersionRange 1 4} initAgentServers testDB2 @@ -1927,8 +1929,8 @@ testDeliveryReceiptsVersion t = do ackMessage a' bId 10 $ Just "" get b' =##> \case ("", c, Rcvd 10) -> c == aId; _ -> False ackMessage b' aId 11 Nothing - disconnectAgentClient a' - disconnectAgentClient b' + disposeAgentClient a' + disposeAgentClient b' testDeliveryReceiptsConcurrent :: HasCallStack => ATransport -> IO () testDeliveryReceiptsConcurrent t = @@ -2065,7 +2067,7 @@ testServerMultipleIdentities = exchangeGreetings alice bobId bob aliceId -- this saves queue with second server identity Left (BROKER _ NETWORK) <- runExceptT $ joinConnection bob 1 True secondIdentityCReq "bob's connInfo" SMSubscribe - disconnectAgentClient bob + disposeAgentClient bob bob' <- liftIO $ getSMPAgentClient' agentCfg initAgentServers testDB2 subscribeConnection bob' aliceId exchangeGreetingsMsgId 6 alice bobId bob' aliceId diff --git a/tests/AgentTests/NotificationTests.hs b/tests/AgentTests/NotificationTests.hs index da9f4c322..834c78594 100644 --- a/tests/AgentTests/NotificationTests.hs +++ b/tests/AgentTests/NotificationTests.hs @@ -112,7 +112,7 @@ testNotificationToken APNSMockServer {apnsQ} = do deleteNtfToken a tkn -- agent deleted this token Left (CMD PROHIBITED) <- tryE $ checkNtfToken a tkn - disconnectAgentClient a + disposeAgentClient a (.->) :: J.Value -> J.Key -> ExceptT AgentErrorType IO ByteString v .-> key = do @@ -144,7 +144,7 @@ testNtfTokenRepeatRegistration APNSMockServer {apnsQ} = do -- can still use the first verification code, it is the same after decryption verifyNtfToken a tkn nonce verification NTActive <- checkNtfToken a tkn - disconnectAgentClient a + disposeAgentClient a testNtfTokenSecondRegistration :: APNSMockServer -> IO () testNtfTokenSecondRegistration APNSMockServer {apnsQ} = do @@ -180,8 +180,8 @@ testNtfTokenSecondRegistration APNSMockServer {apnsQ} = do Left (NTF AUTH) <- tryE $ checkNtfToken a tkn -- and the second is active NTActive <- checkNtfToken a' tkn - disconnectAgentClient a - disconnectAgentClient a' + disposeAgentClient a + disposeAgentClient a' testNtfTokenServerRestart :: ATransport -> APNSMockServer -> IO () testNtfTokenServerRestart t APNSMockServer {apnsQ} = do @@ -195,7 +195,7 @@ testNtfTokenServerRestart t APNSMockServer {apnsQ} = do pure ntfData -- the new agent is created as otherwise when running the tests in CI the old agent was keeping the connection to the server threadDelay 1000000 - disconnectAgentClient a + disposeAgentClient a a' <- getSMPAgentClient' agentCfg initAgentServers testDB -- server stopped before token is verified, so now the attempt to verify it will return AUTH error but re-register token, -- so that repeat verification happens without restarting the clients, when notification arrives @@ -210,7 +210,7 @@ testNtfTokenServerRestart t APNSMockServer {apnsQ} = do liftIO $ sendApnsResponse' APNSRespOk verifyNtfToken a' tkn nonce' verification' NTActive <- checkNtfToken a' tkn - disconnectAgentClient a' + disposeAgentClient a' testNotificationSubscriptionExistingConnection :: APNSMockServer -> IO () testNotificationSubscriptionExistingConnection APNSMockServer {apnsQ} = do @@ -251,7 +251,7 @@ testNotificationSubscriptionExistingConnection APNSMockServer {apnsQ} = do runRight_ $ do (_, [SMPMsgMeta {msgFlags = MsgFlags True}]) <- getNotificationMessage aliceNtf nonce message pure () - disconnectAgentClient aliceNtf + disposeAgentClient aliceNtf runRight_ $ do get alice =##> \case ("", c, Msg "hello") -> c == bobId; _ -> False @@ -264,8 +264,8 @@ testNotificationSubscriptionExistingConnection APNSMockServer {apnsQ} = do get bob ##> ("", aliceId, SENT $ baseId + 2) -- no notifications should follow noNotification apnsQ - disconnectAgentClient alice - disconnectAgentClient bob + disposeAgentClient alice + disposeAgentClient bob where baseId = 3 msgId = subtract baseId @@ -309,8 +309,8 @@ testNotificationSubscriptionNewConnection APNSMockServer {apnsQ} = do ackMessage bob aliceId (baseId + 2) Nothing -- no unexpected notifications should follow noNotification apnsQ - disconnectAgentClient alice - disconnectAgentClient bob + disposeAgentClient alice + disposeAgentClient bob where baseId = 3 msgId = subtract baseId @@ -388,8 +388,8 @@ testChangeNotificationsMode APNSMockServer {apnsQ} = do ackMessage alice bobId (baseId + 5) Nothing -- no notifications should follow noNotification apnsQ - disconnectAgentClient alice - disconnectAgentClient bob + disposeAgentClient alice + disposeAgentClient bob where baseId = 3 msgId = subtract baseId @@ -417,7 +417,7 @@ testChangeToken APNSMockServer {apnsQ} = do get alice =##> \case ("", c, Msg "hello") -> c == bobId; _ -> False ackMessage alice bobId (baseId + 1) Nothing pure (aliceId, bobId) - disconnectAgentClient alice + disposeAgentClient alice alice1 <- getSMPAgentClient' agentCfg initAgentServers testDB runRight_ $ do @@ -433,8 +433,8 @@ testChangeToken APNSMockServer {apnsQ} = do ackMessage alice1 bobId (baseId + 2) Nothing -- no notifications should follow noNotification apnsQ - disconnectAgentClient alice1 - disconnectAgentClient bob + disposeAgentClient alice1 + disposeAgentClient bob where baseId = 3 msgId = subtract baseId @@ -464,8 +464,8 @@ testNotificationsStoreLog t APNSMockServer {apnsQ} = do void $ messageNotification apnsQ get alice =##> \case ("", c, Msg "hello again") -> c == bobId; _ -> False liftIO $ killThread threadId - disconnectAgentClient alice - disconnectAgentClient bob + disposeAgentClient alice + disposeAgentClient bob testNotificationsSMPRestart :: ATransport -> APNSMockServer -> IO () testNotificationsSMPRestart t APNSMockServer {apnsQ} = do @@ -496,8 +496,8 @@ testNotificationsSMPRestart t APNSMockServer {apnsQ} = do _ <- messageNotificationData alice apnsQ get alice =##> \case ("", c, Msg "hello again") -> c == bobId; _ -> False liftIO $ killThread threadId - disconnectAgentClient alice - disconnectAgentClient bob + disposeAgentClient alice + disposeAgentClient bob testNotificationsSMPRestartBatch :: Int -> ATransport -> APNSMockServer -> IO () testNotificationsSMPRestartBatch n t APNSMockServer {apnsQ} = do @@ -536,8 +536,8 @@ testNotificationsSMPRestartBatch n t APNSMockServer {apnsQ} = do get b ##> ("", aliceId, SENT msgId) _ <- messageNotificationData a apnsQ get a =##> \case ("", c, Msg "hello again") -> c == bobId; _ -> False - disconnectAgentClient a - disconnectAgentClient b + disposeAgentClient a + disposeAgentClient b where runServers :: ExceptT AgentErrorType IO a -> IO a runServers a = do @@ -567,8 +567,8 @@ testSwitchNotifications servers APNSMockServer {apnsQ} = do switchComplete a bId b aId liftIO $ threadDelay 500000 testMessage "hello again" - disconnectAgentClient a - disconnectAgentClient b + disposeAgentClient a + disposeAgentClient b messageNotification :: TBQueue APNSMockRequest -> ExceptT AgentErrorType IO (C.CbNonce, ByteString) messageNotification apnsQ = do diff --git a/tests/AgentTests/SQLiteTests.hs b/tests/AgentTests/SQLiteTests.hs index cf6e8373b..609838055 100644 --- a/tests/AgentTests/SQLiteTests.hs +++ b/tests/AgentTests/SQLiteTests.hs @@ -525,8 +525,8 @@ testCloseReopenStore = do hasMigrations st closeSQLiteStore st errorGettingMigrations st - openSQLiteStore st "" - hasMigrations st + Right st' <- createSQLiteStore (dbFilePath st) "" Migrations.app MCError + hasMigrations st' testCloseReopenEncryptedStore :: IO () testCloseReopenEncryptedStore = do @@ -541,8 +541,8 @@ testCloseReopenEncryptedStore = do hasMigrations st closeSQLiteStore st errorGettingMigrations st - openSQLiteStore st key - hasMigrations st + Right st' <- createSQLiteStore (dbFilePath st) key Migrations.app MCError + hasMigrations st' getMigrations :: SQLiteStore -> IO Bool getMigrations st = not . null <$> withTransaction st (Migrations.getCurrent . DB.conn) diff --git a/tests/XFTPAgent.hs b/tests/XFTPAgent.hs index 465c9c2b6..a1ec3efd8 100644 --- a/tests/XFTPAgent.hs +++ b/tests/XFTPAgent.hs @@ -21,7 +21,7 @@ import SMPAgentClient (agentCfg, initAgentServers, testDB, testDB2, testDB3) import Simplex.FileTransfer.Description import Simplex.FileTransfer.Protocol (FileParty (..), XFTPErrorType (AUTH)) import Simplex.FileTransfer.Server.Env (XFTPServerConfig (..)) -import Simplex.Messaging.Agent (AgentClient, disconnectAgentClient, testProtocolServer, xftpDeleteRcvFile, xftpDeleteSndFileInternal, xftpDeleteSndFileRemote, xftpReceiveFile, xftpSendFile, xftpStartWorkers) +import Simplex.Messaging.Agent (AgentClient, disposeAgentClient, testProtocolServer, xftpDeleteRcvFile, xftpDeleteSndFileInternal, xftpDeleteSndFileRemote, xftpReceiveFile, xftpSendFile, xftpStartWorkers) import Simplex.Messaging.Agent.Client (ProtocolTestFailure (..), ProtocolTestStep (..)) import Simplex.Messaging.Agent.Protocol (ACommand (..), AgentErrorType (..), BrokerErrorType (..), RcvFileId, SndFileId, noAuthSrv) import Simplex.Messaging.Crypto.File (CryptoFile (..), CryptoFileArgs) @@ -103,7 +103,7 @@ testXFTPAgentSendReceive = withXFTPServer $ do runRight_ $ do rfId <- testReceive rcp rfd originalFilePath xftpDeleteRcvFile rcp rfId - disconnectAgentClient rcp + disposeAgentClient rcp testXFTPAgentSendReceiveEncrypted :: HasCallStack => IO () testXFTPAgentSendReceiveEncrypted = withXFTPServer $ do @@ -126,7 +126,7 @@ testXFTPAgentSendReceiveEncrypted = withXFTPServer $ do runRight_ $ do rfId <- testReceiveCF rcp rfd cfArgs originalFilePath xftpDeleteRcvFile rcp rfId - disconnectAgentClient rcp + disposeAgentClient rcp createRandomFile :: HasCallStack => IO FilePath createRandomFile = do @@ -183,7 +183,7 @@ testXFTPAgentReceiveRestore = withGlobalLogging logCfgNoLogs $ do rfId <- xftpReceiveFile rcp 1 rfd Nothing liftIO $ timeout 300000 (get rcp) `shouldReturn` Nothing -- wait for worker attempt pure rfId - disconnectAgentClient rcp + disposeAgentClient rcp [prefixDir] <- listDirectory recipientFiles let tmpPath = recipientFiles prefixDir "xftp.encrypted" @@ -195,7 +195,7 @@ testXFTPAgentReceiveRestore = withGlobalLogging logCfgNoLogs $ do runRight_ $ xftpStartWorkers rcp' (Just recipientFiles) ("", rfId', RFPROG _ _) <- rfGet rcp' liftIO $ rfId' `shouldBe` rfId - disconnectAgentClient rcp' + disposeAgentClient rcp' threadDelay 100000 @@ -231,7 +231,7 @@ testXFTPAgentReceiveCleanup = withGlobalLogging logCfgNoLogs $ do rfId <- xftpReceiveFile rcp 1 rfd Nothing liftIO $ timeout 300000 (get rcp) `shouldReturn` Nothing -- wait for worker attempt pure rfId - disconnectAgentClient rcp + disposeAgentClient rcp [prefixDir] <- listDirectory recipientFiles let tmpPath = recipientFiles prefixDir "xftp.encrypted" @@ -258,7 +258,7 @@ testXFTPAgentSendRestore = withGlobalLogging logCfgNoLogs $ do sfId <- xftpSendFile sndr 1 (CF.plain filePath) 2 liftIO $ timeout 1000000 (get sndr) `shouldReturn` Nothing -- wait for worker to encrypt and attempt to create file pure sfId - disconnectAgentClient sndr + disposeAgentClient sndr dirEntries <- listDirectory senderFiles let prefixDir = fromJust $ find (isSuffixOf "_snd.xftp") dirEntries @@ -273,7 +273,7 @@ testXFTPAgentSendRestore = withGlobalLogging logCfgNoLogs $ do runRight_ $ xftpStartWorkers sndr' (Just senderFiles) ("", sfId', SFPROG _ _) <- sfGet sndr' liftIO $ sfId' `shouldBe` sfId - disconnectAgentClient sndr' + disposeAgentClient sndr' threadDelay 100000 @@ -310,7 +310,7 @@ testXFTPAgentSendCleanup = withGlobalLogging logCfgNoLogs $ do (_, _, SFPROG _ _) <- sfGet sndr pure () pure sfId - disconnectAgentClient sndr + disposeAgentClient sndr pure sfId dirEntries <- listDirectory senderFiles @@ -353,7 +353,7 @@ testXFTPAgentDelete = withGlobalLogging logCfgNoLogs $ xftpDeleteSndFileRemote sndr 1 sfId sndDescr Nothing <- liftIO $ 100000 `timeout` sfGet sndr pure () - disconnectAgentClient rcp1 + disposeAgentClient rcp1 threadDelay 1000000 length <$> listDirectory xftpServerFiles `shouldReturn` 0 @@ -379,8 +379,8 @@ testXFTPAgentDeleteRestore = withGlobalLogging logCfgNoLogs $ do rcp1 <- getSMPAgentClient' agentCfg initAgentServers testDB2 runRight_ . void $ testReceive rcp1 rfd1 filePath - disconnectAgentClient rcp1 - disconnectAgentClient sndr + disposeAgentClient rcp1 + disposeAgentClient sndr pure (sfId, sndDescr, rfd2) -- delete file - should not succeed with server down @@ -389,7 +389,7 @@ testXFTPAgentDeleteRestore = withGlobalLogging logCfgNoLogs $ do xftpStartWorkers sndr (Just senderFiles) xftpDeleteSndFileRemote sndr 1 sfId sndDescr liftIO $ timeout 300000 (get sndr) `shouldReturn` Nothing -- wait for worker attempt - disconnectAgentClient sndr + disposeAgentClient sndr threadDelay 300000 length <$> listDirectory xftpServerFiles `shouldReturn` 6