Skip to content

Commit

Permalink
parameterize journal store for queue storage
Browse files Browse the repository at this point in the history
  • Loading branch information
epoberezkin committed Dec 12, 2024
1 parent 79e9447 commit 1bdf8bc
Show file tree
Hide file tree
Showing 10 changed files with 280 additions and 175 deletions.
33 changes: 13 additions & 20 deletions src/Simplex/Messaging/Server.hs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,6 @@ import Simplex.Messaging.Server.MsgStore.Types
import Simplex.Messaging.Server.NtfStore
import Simplex.Messaging.Server.QueueStore
import Simplex.Messaging.Server.QueueStore.QueueInfo
import Simplex.Messaging.Server.QueueStore.STM
import Simplex.Messaging.Server.Stats
import Simplex.Messaging.TMap (TMap)
import qualified Simplex.Messaging.TMap as TM
Expand Down Expand Up @@ -423,9 +422,8 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} attachHT
ss@ServerStats {fromTime, qCreated, qSecured, qDeletedAll, qDeletedAllB, qDeletedNew, qDeletedSecured, qSub, qSubAllB, qSubAuth, qSubDuplicate, qSubProhibited, qSubEnd, qSubEndB, ntfCreated, ntfDeleted, ntfDeletedB, ntfSub, ntfSubB, ntfSubAuth, ntfSubDuplicate, msgSent, msgSentAuth, msgSentQuota, msgSentLarge, msgRecv, msgRecvGet, msgGet, msgGetNoMsg, msgGetAuth, msgGetDuplicate, msgGetProhibited, msgExpired, activeQueues, msgSentNtf, msgRecvNtf, activeQueuesNtf, qCount, msgCount, ntfCount, pRelays, pRelaysOwn, pMsgFwds, pMsgFwdsOwn, pMsgFwdsRecv}
<- asks serverStats
AMS _ st <- asks msgStore
let queues = activeMsgQueues st
notifiers = notifiers' st
interval = 1000000 * logInterval
QueueCounts {queueCount, notifierCount} <- liftIO $ queueCounts st
let interval = 1000000 * logInterval
forever $ do
withFile statsFilePath AppendMode $ \h -> liftIO $ do
hSetBuffering h LineBuffering
Expand Down Expand Up @@ -478,8 +476,6 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} attachHT
pMsgFwdsOwn' <- getResetProxyStatsData pMsgFwdsOwn
pMsgFwdsRecv' <- atomicSwapIORef pMsgFwdsRecv 0
qCount' <- readIORef qCount
qCount'' <- M.size <$> readTVarIO queues
notifierCount' <- M.size <$> readTVarIO notifiers
msgCount' <- readIORef msgCount
ntfCount' <- readIORef ntfCount
hPutStrLn h $
Expand Down Expand Up @@ -532,13 +528,13 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} attachHT
"0", -- dayCount psSub; psSub is removed to reduce memory usage
"0", -- weekCount psSub
"0", -- monthCount psSub
show qCount'',
show queueCount,
show ntfCreated',
show ntfDeleted',
show ntfSub',
show ntfSubAuth',
show ntfSubDuplicate',
show notifierCount',
show notifierCount,
show qDeletedAllB',
show qSubAllB',
show qSubEnd',
Expand Down Expand Up @@ -625,9 +621,8 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} attachHT
CPStats -> withUserRole $ do
ss <- unliftIO u $ asks serverStats
AMS _ st <- unliftIO u $ asks msgStore
let queues = activeMsgQueues st
notifiers = notifiers' st
getStat :: (ServerStats -> IORef a) -> IO a
QueueCounts {queueCount, notifierCount} <- queueCounts st
let getStat :: (ServerStats -> IORef a) -> IO a
getStat var = readIORef (var ss)
putStat :: Show a => String -> (ServerStats -> IORef a) -> IO ()
putStat label var = getStat var >>= \v -> hPutStrLn h $ label <> ": " <> show v
Expand Down Expand Up @@ -664,9 +659,7 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} attachHT
putStat "msgNtfsB" msgNtfsB
putStat "msgNtfExpired" msgNtfExpired
putStat "qCount" qCount
qCount2 <- M.size <$> readTVarIO queues
hPutStrLn h $ "qCount 2: " <> show qCount2
notifierCount <- M.size <$> readTVarIO notifiers
hPutStrLn h $ "qCount 2: " <> show queueCount
hPutStrLn h $ "notifiers: " <> show notifierCount
putStat "msgCount" msgCount
putStat "ntfCount" ntfCount
Expand Down Expand Up @@ -841,7 +834,7 @@ runClientTransport h@THandle {params = thParams@THandleParams {thVersion, sessio
c <- liftIO $ newClient msType clientId q thVersion sessionId ts
runClientThreads msType ms active c clientId `finally` clientDisconnected c
where
runClientThreads :: STMQueueStore (MsgStore s) => SMSType s -> MsgStore s -> TVar (IM.IntMap (Maybe AClient)) -> Client (MsgStore s) -> IS.Key -> M ()
runClientThreads :: MsgStoreClass (MsgStore s) => SMSType s -> MsgStore s -> TVar (IM.IntMap (Maybe AClient)) -> Client (MsgStore s) -> IS.Key -> M ()
runClientThreads msType ms active c clientId = do
atomically $ modifyTVar' active $ IM.insert clientId $ Just (AClient msType c)
s <- asks server
Expand Down Expand Up @@ -897,7 +890,7 @@ cancelSub s = case subThread s of
_ -> pure ()
ProhibitSub -> pure ()

receive :: forall c s. (Transport c, STMQueueStore s) => THandleSMP c 'TServer -> s -> Client s -> M ()
receive :: forall c s. (Transport c, MsgStoreClass s) => THandleSMP c 'TServer -> s -> Client s -> M ()
receive h@THandle {params = THandleParams {thAuth}} ms Client {rcvQ, sndQ, rcvActiveAt, sessionId} = do
labelMyThread . B.unpack $ "client $" <> encode sessionId <> " receive"
sa <- asks serverActive
Expand Down Expand Up @@ -997,7 +990,7 @@ data VerificationResult s = VRVerified (Maybe (StoreQueue s, QueueRec)) | VRFail
-- - the queue or party key do not exist.
-- In all cases, the time of the verification should depend only on the provided authorization type,
-- a dummy key is used to run verification in the last two cases, and failure is returned irrespective of the result.
verifyTransmission :: forall s. STMQueueStore s => s -> Maybe (THandleAuth 'TServer, C.CbNonce) -> Maybe TransmissionAuth -> ByteString -> QueueId -> Cmd -> M (VerificationResult s)
verifyTransmission :: forall s. MsgStoreClass s => s -> Maybe (THandleAuth 'TServer, C.CbNonce) -> Maybe TransmissionAuth -> ByteString -> QueueId -> Cmd -> M (VerificationResult s)
verifyTransmission ms auth_ tAuth authorized queueId cmd =
case cmd of
Cmd SRecipient (NEW k _ _ _ _) -> pure $ Nothing `verifiedWith` k
Expand Down Expand Up @@ -1074,7 +1067,7 @@ forkClient Client {endThreads, endThreadSeq} label action = do
action `finally` atomically (modifyTVar' endThreads $ IM.delete tId)
mkWeakThreadId t >>= atomically . modifyTVar' endThreads . IM.insert tId

client :: forall s. STMQueueStore s => THandleParams SMPVersion 'TServer -> Server -> s -> Client s -> M ()
client :: forall s. MsgStoreClass s => THandleParams SMPVersion 'TServer -> Server -> s -> Client s -> M ()
client
thParams'
Server {subscribedQ, ntfSubscribedQ, subscribers}
Expand Down Expand Up @@ -1768,7 +1761,7 @@ processServerMessages = do
stored'' <- getQueueSize ms rId q
liftIO $ closeMsgQueue q
pure (stored'', expired'')
processValidateQueue :: RecipientId -> JournalQueue -> IO MessageStats
processValidateQueue :: RecipientId -> JournalQueue 'MSMemory -> IO MessageStats
processValidateQueue rId q =
runExceptT (getQueueSize ms rId q) >>= \case
Right storedMsgsCount -> pure newMessageStats {storedMsgsCount, storedQueues = 1}
Expand All @@ -1777,7 +1770,7 @@ processServerMessages = do
exitFailure

-- TODO this function should be called after importing queues from store log
importMessages :: forall s. STMQueueStore s => Bool -> s -> FilePath -> Maybe Int64 -> IO MessageStats
importMessages :: forall s. MsgStoreClass s => Bool -> s -> FilePath -> Maybe Int64 -> IO MessageStats
importMessages tty ms f old_ = do
logInfo $ "restoring messages from file " <> T.pack f
LB.readFile f >>= runExceptT . foldM restoreMsg (0, Nothing, (0, 0, M.empty)) . LB.lines >>= \case
Expand Down
8 changes: 4 additions & 4 deletions src/Simplex/Messaging/Server/Env/STM.hs
Original file line number Diff line number Diff line change
Expand Up @@ -184,9 +184,9 @@ data Env = Env

type family MsgStore s where
MsgStore 'MSMemory = STMMsgStore
MsgStore 'MSJournal = JournalMsgStore
MsgStore 'MSJournal = JournalMsgStore 'MSMemory

data AMsgStore = forall s. (STMQueueStore (MsgStore s), MsgStoreClass (MsgStore s)) => AMS (SMSType s) (MsgStore s)
data AMsgStore = forall s. MsgStoreClass (MsgStore s) => AMS (SMSType s) (MsgStore s)

data AStoreQueue = forall s. MsgStoreClass (MsgStore s) => ASQ (SMSType s) (StoreQueue (MsgStore s))

Expand Down Expand Up @@ -295,7 +295,7 @@ newEnv config@ServerConfig {smpCredentials, httpCredentials, storeLogFile, msgSt
AMSType SMSMemory -> AMS SMSMemory <$> newMsgStore STMStoreConfig {storePath = storeMsgsFile, quota = msgQueueQuota}
AMSType SMSJournal -> case storeMsgsFile of
Just storePath ->
let cfg = JournalStoreConfig {storePath, quota = msgQueueQuota, pathParts = journalMsgStoreDepth, maxMsgCount = maxJournalMsgCount, maxStateLines = maxJournalStateLines, stateTailSize = defaultStateTailSize, idleInterval = idleQueueInterval}
let cfg = JournalStoreConfig {storePath, quota = msgQueueQuota, pathParts = journalMsgStoreDepth, queueStoreType = SMSMemory, maxMsgCount = maxJournalMsgCount, maxStateLines = maxJournalStateLines, stateTailSize = defaultStateTailSize, idleInterval = idleQueueInterval}
in AMS SMSJournal <$> newMsgStore cfg
Nothing -> putStrLn "Error: journal msg store require path in [STORE_LOG], restore_messages" >> exitFailure
ntfStore <- NtfStore <$> TM.emptyIO
Expand Down Expand Up @@ -359,5 +359,5 @@ newSMPProxyAgent smpAgentCfg random = do
smpAgent <- newSMPClientAgent smpAgentCfg random
pure ProxyAgent {smpAgent}

readWriteQueueStore :: STMQueueStore s => FilePath -> s -> IO (StoreLog 'WriteMode)
readWriteQueueStore :: MsgStoreClass s => FilePath -> s -> IO (StoreLog 'WriteMode)
readWriteQueueStore = readWriteStoreLog readQueueStore writeQueueStore
2 changes: 1 addition & 1 deletion src/Simplex/Messaging/Server/Main.hs
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ smpServerCLI_ generateSite serveStaticFiles attachStaticFiles cfgPath logPath =
doesFileExist iniFile >>= \case
True -> readIniFile iniFile >>= either exitError a
_ -> exitError $ "Error: server is not initialized (" <> iniFile <> " does not exist).\nRun `" <> executableName <> " init`."
newJournalMsgStore = newMsgStore JournalStoreConfig {storePath = storeMsgsJournalDir, pathParts = journalMsgStoreDepth, quota = defaultMsgQueueQuota, maxMsgCount = defaultMaxJournalMsgCount, maxStateLines = defaultMaxJournalStateLines, stateTailSize = defaultStateTailSize, idleInterval = checkInterval defaultMessageExpiration}
newJournalMsgStore = newMsgStore JournalStoreConfig {storePath = storeMsgsJournalDir, pathParts = journalMsgStoreDepth, queueStoreType = SMSMemory, quota = defaultMsgQueueQuota, maxMsgCount = defaultMaxJournalMsgCount, maxStateLines = defaultMaxJournalStateLines, stateTailSize = defaultStateTailSize, idleInterval = checkInterval defaultMessageExpiration}
iniFile = combine cfgPath "smp-server.ini"
serverVersion = "SMP server v" <> simplexMQVersion
defaultServerPorts = "5223,443"
Expand Down
Loading

0 comments on commit 1bdf8bc

Please sign in to comment.