Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

smp server: parameterize journal store for queue storage #1420

Open
wants to merge 19 commits into
base: queue-journals
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions rfcs/2024-11-25-queue-blobs-2.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,9 @@ Additional suggestion to reduce probability of queue_state.log and queue_rec.log
- check the last byte of the file and log EOL if it is not EOL. Probably cleanest approach, but with a small performance cost.

If queue folder is a reference to the queue, it may have one of these files:
- notifier.id
- sender.id
- link.id
- notifier.ref
- sender.ref
- link.ref

These files would contain a one line with the recipient ID of the queue. These files would never change, they can only be deleted when queue is deleted or when notifier/link is deleted.

Expand Down
4 changes: 3 additions & 1 deletion simplexmq.cabal
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
cabal-version: 1.12

name: simplexmq
version: 6.2.0.7
version: 6.2.1.0
synopsis: SimpleXMQ message broker
description: This package includes <./docs/Simplex-Messaging-Server.html server>,
<./docs/Simplex-Messaging-Client.html client> and
Expand Down Expand Up @@ -204,6 +204,7 @@ library
Simplex.Messaging.Server.MsgStore.STM
Simplex.Messaging.Server.MsgStore.Types
Simplex.Messaging.Server.NtfStore
Simplex.Messaging.Server.Prometheus
Simplex.Messaging.Server.QueueStore
Simplex.Messaging.Server.QueueStore.STM
Simplex.Messaging.Server.Stats
Expand Down Expand Up @@ -279,6 +280,7 @@ library
build-depends:
case-insensitive ==1.2.*
, hashable ==1.4.*
, unix ==2.8.*
, websockets ==0.12.*
if impl(ghc >= 9.6.2)
build-depends:
Expand Down
203 changes: 125 additions & 78 deletions src/Simplex/Messaging/Server.hs

Large diffs are not rendered by default.

45 changes: 30 additions & 15 deletions src/Simplex/Messaging/Server/Env/STM.hs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import Control.Monad
import qualified Crypto.PubKey.RSA as RSA
import Crypto.Random
import Data.ByteString.Char8 (ByteString)
import Data.Functor (($>))
import Data.Int (Int64)
import Data.IntMap.Strict (IntMap)
import qualified Data.IntMap.Strict as IM
Expand Down Expand Up @@ -96,6 +97,9 @@ data ServerConfig = ServerConfig
serverStatsLogFile :: FilePath,
-- | file to save and restore stats
serverStatsBackupFile :: Maybe FilePath,
-- | interval and file to save prometheus metrics
prometheusInterval :: Maybe Int,
prometheusMetricsFile :: FilePath,
-- | notification delivery interval
ntfDeliveryInterval :: Int,
-- | interval between sending pending END events to unsubscribed clients, seconds
Expand Down Expand Up @@ -184,17 +188,18 @@ data Env = Env

type family MsgStore s where
MsgStore 'MSMemory = STMMsgStore
MsgStore 'MSJournal = JournalMsgStore
MsgStore 'MSHybrid = JournalMsgStore 'MSHybrid
MsgStore 'MSJournal = JournalMsgStore 'MSJournal

data AMsgStore = forall s. (STMQueueStore (MsgStore s), MsgStoreClass (MsgStore s)) => AMS (SMSType s) (MsgStore s)
data AMsgStore = forall s. MsgStoreClass (MsgStore s) => AMS (SMSType s) (MsgStore s)

data AStoreQueue = forall s. MsgStoreClass (MsgStore s) => ASQ (SMSType s) (StoreQueue (MsgStore s))

data AMsgStoreCfg = forall s. MsgStoreClass (MsgStore s) => AMSC (SMSType s) (MsgStoreConfig (MsgStore s))

msgPersistence :: AMsgStoreCfg -> Bool
msgPersistence (AMSC SMSMemory (STMStoreConfig {storePath})) = isJust storePath
msgPersistence (AMSC SMSJournal _) = True
msgPersistence _ = True

type Subscribed = Bool

Expand Down Expand Up @@ -291,19 +296,9 @@ newEnv :: ServerConfig -> IO Env
newEnv config@ServerConfig {smpCredentials, httpCredentials, storeLogFile, msgStoreType, storeMsgsFile, smpAgentCfg, information, messageExpiration, idleQueueInterval, msgQueueQuota, maxJournalMsgCount, maxJournalStateLines} = do
serverActive <- newTVarIO True
server <- newServer
msgStore@(AMS _ store) <- case msgStoreType of
AMSType SMSMemory -> AMS SMSMemory <$> newMsgStore STMStoreConfig {storePath = storeMsgsFile, quota = msgQueueQuota}
AMSType SMSJournal -> case storeMsgsFile of
Just storePath ->
let cfg = JournalStoreConfig {storePath, quota = msgQueueQuota, pathParts = journalMsgStoreDepth, maxMsgCount = maxJournalMsgCount, maxStateLines = maxJournalStateLines, stateTailSize = defaultStateTailSize, idleInterval = idleQueueInterval}
in AMS SMSJournal <$> newMsgStore cfg
Nothing -> putStrLn "Error: journal msg store require path in [STORE_LOG], restore_messages" >> exitFailure
msgStore <- createMsgStore
ntfStore <- NtfStore <$> TM.emptyIO
random <- C.newRandom
forM_ storeLogFile $ \f -> do
logInfo $ "restoring queues from file " <> T.pack f
sl <- readWriteQueueStore f store
setStoreLog store sl
tlsServerCreds <- getCredentials "SMP" smpCredentials
httpServerCreds <- mapM (getCredentials "HTTPS") httpCredentials
mapM_ checkHTTPSCredentials httpServerCreds
Expand All @@ -316,6 +311,26 @@ newEnv config@ServerConfig {smpCredentials, httpCredentials, storeLogFile, msgSt
proxyAgent <- newSMPProxyAgent smpAgentCfg random
pure Env {serverActive, config, serverInfo, server, serverIdentity, msgStore, ntfStore, random, tlsServerCreds, httpServerCreds, serverStats, sockets, clientSeq, clients, proxyAgent}
where
createMsgStore :: IO AMsgStore
createMsgStore = case (msgStoreType, storeMsgsFile) of
(AMSType SMSMemory, _) -> do
st <- newMsgStore STMStoreConfig {storePath = storeLogFile, quota = msgQueueQuota}
loadStoreLog st $> AMS SMSMemory st
(AMSType SMSHybrid, Just storePath) -> do
st <- newMsgStore $ storeCfg SMSHybrid storePath
loadStoreLog st $> AMS SMSHybrid st
(AMSType SMSJournal, Just storePath) ->
AMS SMSJournal <$> newMsgStore (storeCfg SMSJournal storePath)
(_, Nothing) -> putStrLn "Error: journal msg store requires that restore_messages is enabled in [STORE_LOG]" >> exitFailure
where
storeCfg :: SMSType s -> FilePath -> JournalStoreConfig s
storeCfg queueStoreType storePath =
JournalStoreConfig {storePath, quota = msgQueueQuota, pathParts = journalMsgStoreDepth, queueStoreType, maxMsgCount = maxJournalMsgCount, maxStateLines = maxJournalStateLines, stateTailSize = defaultStateTailSize, idleInterval = idleQueueInterval}
loadStoreLog :: STMStoreClass s => s -> IO ()
loadStoreLog st = forM_ storeLogFile $ \f -> do
logInfo $ "restoring queues from file " <> T.pack f
sl <- readWriteQueueStore f st
setStoreLog (stmQueueStore st) sl
getCredentials protocol creds = do
files <- missingCreds
unless (null files) $ do
Expand Down Expand Up @@ -359,5 +374,5 @@ newSMPProxyAgent smpAgentCfg random = do
smpAgent <- newSMPClientAgent smpAgentCfg random
pure ProxyAgent {smpAgent}

readWriteQueueStore :: STMQueueStore s => FilePath -> s -> IO (StoreLog 'WriteMode)
readWriteQueueStore :: STMStoreClass s => FilePath -> s -> IO (StoreLog 'WriteMode)
readWriteQueueStore = readWriteStoreLog readQueueStore writeQueueStore
Loading
Loading