Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

agent: one delivery worker per session (WIP) #972

Draft
wants to merge 1 commit into
base: ep/rfc-delivery-workers
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions simplexmq.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ library
Simplex.Messaging.Agent.Protocol
Simplex.Messaging.Agent.QueryString
Simplex.Messaging.Agent.RetryInterval
Simplex.Messaging.Agent.RetryInterval.Delivery
Simplex.Messaging.Agent.Server
Simplex.Messaging.Agent.Store
Simplex.Messaging.Agent.Store.SQLite
Expand Down Expand Up @@ -101,6 +102,7 @@ library
Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230829_crypto_files
Simplex.Messaging.Agent.Store.SQLite.Migrations.M20231222_command_created_at
Simplex.Messaging.Agent.Store.SQLite.Migrations.M20231225_failed_work_items
Simplex.Messaging.Agent.Store.SQLite.Migrations.M20240118_snd_queue_delivery
Simplex.Messaging.Agent.TAsyncs
Simplex.Messaging.Agent.TRcvQueues
Simplex.Messaging.Client
Expand Down
4 changes: 2 additions & 2 deletions src/Simplex/FileTransfer/Agent.hs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeApplications #-}
{-# OPTIONS_GHC -fno-warn-ambiguous-fields #-}

Check warning on line 11 in src/Simplex/FileTransfer/Agent.hs

View workflow job for this annotation

GitHub Actions / build-ubuntu-20.04-8.10.7

unrecognised warning flag: -fno-warn-ambiguous-fields

module Simplex.FileTransfer.Agent
( startXFTPWorkers,
Expand Down Expand Up @@ -316,7 +316,7 @@
forM_ (filter (not . chunkCreated) chunks) $ createChunk numRecipients'
withStore' c $ \db -> updateSndFileStatus db sndFileId SFSUploading
where
AgentConfig {xftpMaxRecipientsPerRequest = maxRecipients, messageRetryInterval = ri} = cfg
AgentConfig {xftpMaxRecipientsPerRequest = maxRecipients, xftpRetryInterval = ri} = cfg
encryptFileForUpload :: SndFile -> FilePath -> m (FileDigest, [(XFTPChunkSpec, FileDigest)])
encryptFileForUpload SndFile {key, nonce, srcFile} fsEncPath = do
let CryptoFile {filePath} = srcFile
Expand Down Expand Up @@ -345,7 +345,7 @@
where
tryCreate = do
usedSrvs <- newTVarIO ([] :: [XFTPServer])
withRetryInterval (riFast ri) $ \_ loop ->
withRetryInterval ri $ \_ loop ->
createWithNextSrv usedSrvs
`catchAgentError` \e -> retryOnError "XFTP prepare worker" (retryLoop loop) (throwError e) e
where
Expand Down
170 changes: 93 additions & 77 deletions src/Simplex/Messaging/Agent.hs

Large diffs are not rendered by default.

20 changes: 9 additions & 11 deletions src/Simplex/Messaging/Agent/Client.hs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@
AgentState (..),
AgentLocks (..),
AgentStatsKey (..),
mkSMPTransportSession,
getAgentWorker,
getAgentWorker',
cancelWorker,
Expand Down Expand Up @@ -155,7 +156,6 @@
import Data.Time (UTCTime, defaultTimeLocale, formatTime, getCurrentTime)
import Data.Time.Clock.System (getSystemTime)
import Data.Word (Word16)
-- import GHC.Conc (unsafeIOToSTM)
import Network.Socket (HostName)
import Simplex.FileTransfer.Client (XFTPChunkSpec (..), XFTPClient, XFTPClientConfig (..), XFTPClientError)
import qualified Simplex.FileTransfer.Client as X
Expand Down Expand Up @@ -228,16 +228,14 @@
sessionVarId :: Int
}

type ClientVar msg = SessionVar (Either AgentErrorType (Client msg))
type ClientVar msg = SessionVar (Either AgentErrorType (Client msg))

type SMPClientVar = ClientVar SMP.BrokerMsg

type NtfClientVar = ClientVar NtfResponse

type XFTPClientVar = ClientVar FileResponse

type SMPTransportSession = TransportSession SMP.BrokerMsg

type NtfTransportSession = TransportSession NtfResponse

type XFTPTransportSession = TransportSession FileResponse
Expand All @@ -259,7 +257,7 @@
pendingSubs :: TRcvQueues,
removedSubs :: TMap (UserId, SMPServer, SMP.RecipientId) SMPClientError,
workerSeq :: TVar Int,
smpDeliveryWorkers :: TMap SndQAddr (Worker, TMVar ()),
smpDeliveryWorkers :: TMap SMPTransportSession Worker,
asyncCmdWorkers :: TMap (Maybe SMPServer) Worker,
connCmdsQueued :: TMap ConnId Bool,
ntfNetworkOp :: TVar AgentOpState,
Expand Down Expand Up @@ -712,7 +710,7 @@
closeProtocolServerClients c xftpClients
atomically (swapTVar (smpSubWorkers c) M.empty) >>= mapM_ cancelReconnect
cancelActions . actions $ asyncClients c
clearWorkers smpDeliveryWorkers >>= mapM_ (cancelWorker . fst)
clearWorkers smpDeliveryWorkers >>= mapM_ cancelWorker
clearWorkers asyncCmdWorkers >>= mapM_ cancelWorker
clear connCmdsQueued
atomically . RQ.clear $ activeSubs c
Expand All @@ -724,7 +722,7 @@
clearWorkers workers = atomically $ swapTVar (workers c) mempty
clear :: Monoid m => (AgentClient -> TVar m) -> IO ()
clear sel = atomically $ writeTVar (sel c) mempty
cancelReconnect :: SessionVar (Async ()) -> IO ()
cancelReconnect :: SessionVar (Async ()) -> IO ()
cancelReconnect v = void . forkIO $ atomically (readTMVar $ sessionVar v) >>= uninterruptibleCancel

cancelWorker :: Worker -> IO ()
Expand All @@ -739,9 +737,9 @@
throwWhenInactive c = unlessM (readTVar $ active c) $ throwSTM ThreadKilled

-- this function is used to remove workers once delivery is complete, not when it is removed from the map
throwWhenNoDelivery :: AgentClient -> SndQueue -> STM ()
throwWhenNoDelivery c sq =
unlessM (TM.member (qAddress sq) $ smpDeliveryWorkers c) $
throwWhenNoDelivery :: AgentClient -> SMPTransportSession -> STM ()
throwWhenNoDelivery c tSess =
unlessM (TM.member tSess $ smpDeliveryWorkers c) $
throwSTM ThreadKilled

closeProtocolServerClients :: ProtocolServerClient err msg => AgentClient -> (AgentClient -> TMap (TransportSession msg) (ClientVar msg)) -> IO ()
Expand Down Expand Up @@ -1493,7 +1491,7 @@
where
statsKey = AgentStatsKey {userId, host = strEncode $ clientTransportHost pc, clientTs = strEncode $ clientSessionTs pc, cmd, res}

userServers :: forall p. (ProtocolTypeI p, UserProtocol p) => AgentClient -> TMap UserId (NonEmpty (ProtoServerWithAuth p))

Check warning on line 1494 in src/Simplex/Messaging/Agent/Client.hs

View workflow job for this annotation

GitHub Actions / build-ubuntu-20.04-9.6.3

Redundant constraint: UserProtocol p

Check warning on line 1494 in src/Simplex/Messaging/Agent/Client.hs

View workflow job for this annotation

GitHub Actions / build-ubuntu-22.04-9.6.3

Redundant constraint: UserProtocol p
userServers c = case protocolTypeI @p of
SPSMP -> smpServers c
SPXFTP -> xftpServers c
Expand Down Expand Up @@ -1573,7 +1571,7 @@
smpClients_ <- textKeys <$> readTVarIO smpClients
ntfClients_ <- textKeys <$> readTVarIO ntfClients
xftpClients_ <- textKeys <$> readTVarIO xftpClients
smpDeliveryWorkers_ <- workerStats . fmap fst =<< readTVarIO smpDeliveryWorkers
smpDeliveryWorkers_ <- workerStats =<< readTVarIO smpDeliveryWorkers
asyncCmdWorkers_ <- workerStats =<< readTVarIO asyncCmdWorkers
smpSubWorkers_ <- textKeys <$> readTVarIO smpSubWorkers
asyncCients_ <- M.keys <$> readTVarIO actions
Expand Down
35 changes: 11 additions & 24 deletions src/Simplex/Messaging/Agent/Env/SQLite.hs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ import Numeric.Natural
import Simplex.FileTransfer.Client (XFTPClientConfig (..), defaultXFTPClientConfig)
import Simplex.Messaging.Agent.Protocol
import Simplex.Messaging.Agent.RetryInterval
import Simplex.Messaging.Agent.RetryInterval.Delivery
import Simplex.Messaging.Agent.Store.SQLite
import qualified Simplex.Messaging.Agent.Store.SQLite.Migrations as Migrations
import Simplex.Messaging.Client
Expand Down Expand Up @@ -89,9 +90,8 @@ data AgentConfig = AgentConfig
ntfCfg :: ProtocolClientConfig,
xftpCfg :: XFTPClientConfig,
reconnectInterval :: RetryInterval,
messageRetryInterval :: RetryInterval2,
messageTimeout :: NominalDiffTime,
helloTimeout :: NominalDiffTime,
messageDeliveryCfg :: MsgDeliveryConfig,
xftpRetryInterval :: RetryInterval,
initialCleanupDelay :: Int64,
cleanupInterval :: Int64,
cleanupStepInterval :: Int,
Expand Down Expand Up @@ -125,24 +125,12 @@ defaultReconnectInterval =
maxInterval = 180_000000
}

defaultMessageRetryInterval :: RetryInterval2
defaultMessageRetryInterval =
RetryInterval2
{ riFast =
RetryInterval
{ initialInterval = 1_000000,
increaseAfter = 10_000000,
maxInterval = 60_000000
},
riSlow =
-- TODO: these timeouts can be increased in v5.0 once most clients are updated
-- to resume sending on QCONT messages.
-- After that local message expiration period should be also increased.
RetryInterval
{ initialInterval = 60_000000,
increaseAfter = 60_000000,
maxInterval = 3600_000000 -- 1 hour
}
defaultXFTPRetryInterval :: RetryInterval
defaultXFTPRetryInterval =
RetryInterval
{ initialInterval = 1_000000,
increaseAfter = 10_000000,
maxInterval = 60_000000
}

defaultAgentConfig :: AgentConfig
Expand All @@ -156,9 +144,8 @@ defaultAgentConfig =
ntfCfg = defaultClientConfig {defaultTransport = ("443", transport @TLS)},
xftpCfg = defaultXFTPClientConfig,
reconnectInterval = defaultReconnectInterval,
messageRetryInterval = defaultMessageRetryInterval,
messageTimeout = 2 * nominalDay,
helloTimeout = 2 * nominalDay,
messageDeliveryCfg = defaultMsgDeliveryConfig,
xftpRetryInterval = defaultXFTPRetryInterval,
initialCleanupDelay = 30 * 1000000, -- 30 seconds
cleanupInterval = 30 * 60 * 1000000, -- 30 minutes
cleanupStepInterval = 200000, -- 200ms
Expand Down
25 changes: 21 additions & 4 deletions src/Simplex/Messaging/Agent/RetryInterval.hs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,16 @@

module Simplex.Messaging.Agent.RetryInterval
( RetryInterval (..),
RetryInterval2 (..),
RetryIntervalMode (..),
-- RetryInterval2 (..),
-- RetryIntervalMode (..),
RIState (..),
RI2State (..),
withRetryInterval,
withRetryIntervalCount,
withRetryLock2,
updateRetryInterval2,
-- withRetryLock2,
-- updateRetryInterval2,
updateRetryInterval,
nextDelay,
)
where

Expand All @@ -27,9 +30,15 @@
maxInterval :: Int64
}

data RIState = RIState
{ retryDelay :: Int64,
retryElapsed :: Int64
}
deriving (Eq, Show)

data RetryInterval2 = RetryInterval2

Check warning on line 39 in src/Simplex/Messaging/Agent/RetryInterval.hs

View workflow job for this annotation

GitHub Actions / build-ubuntu-20.04-8.10.7

Defined but not used: data constructor ‘RetryInterval2’

Check warning on line 39 in src/Simplex/Messaging/Agent/RetryInterval.hs

View workflow job for this annotation

GitHub Actions / build-ubuntu-20.04-9.6.3

Defined but not used: data constructor ‘RetryInterval2’

Check warning on line 39 in src/Simplex/Messaging/Agent/RetryInterval.hs

View workflow job for this annotation

GitHub Actions / build-ubuntu-22.04-9.6.3

Defined but not used: data constructor ‘RetryInterval2’
{ riSlow :: RetryInterval,

Check warning on line 40 in src/Simplex/Messaging/Agent/RetryInterval.hs

View workflow job for this annotation

GitHub Actions / build-ubuntu-20.04-8.10.7

Defined but not used: ‘riSlow’

Check warning on line 40 in src/Simplex/Messaging/Agent/RetryInterval.hs

View workflow job for this annotation

GitHub Actions / build-ubuntu-20.04-9.6.3

Defined but not used: ‘riSlow’

Check warning on line 40 in src/Simplex/Messaging/Agent/RetryInterval.hs

View workflow job for this annotation

GitHub Actions / build-ubuntu-22.04-9.6.3

Defined but not used: ‘riSlow’
riFast :: RetryInterval

Check warning on line 41 in src/Simplex/Messaging/Agent/RetryInterval.hs

View workflow job for this annotation

GitHub Actions / build-ubuntu-20.04-9.6.3

Defined but not used: ‘riFast’

Check warning on line 41 in src/Simplex/Messaging/Agent/RetryInterval.hs

View workflow job for this annotation

GitHub Actions / build-ubuntu-22.04-9.6.3

Defined but not used: ‘riFast’
}

data RI2State = RI2State
Expand All @@ -38,8 +47,16 @@
}
deriving (Show)

updateRetryInterval :: RIState -> RetryInterval -> RetryInterval
updateRetryInterval RIState {retryDelay, retryElapsed} RetryInterval {initialInterval, increaseAfter, maxInterval} =

Check warning on line 51 in src/Simplex/Messaging/Agent/RetryInterval.hs

View workflow job for this annotation

GitHub Actions / build-ubuntu-20.04-9.6.3

Defined but not used: ‘initialInterval’

Check warning on line 51 in src/Simplex/Messaging/Agent/RetryInterval.hs

View workflow job for this annotation

GitHub Actions / build-ubuntu-22.04-9.6.3

Defined but not used: ‘initialInterval’
RetryInterval
{ initialInterval = retryDelay,
increaseAfter = max 0 (increaseAfter - retryElapsed),
maxInterval
}

updateRetryInterval2 :: RI2State -> RetryInterval2 -> RetryInterval2
updateRetryInterval2 RI2State {slowInterval, fastInterval} RetryInterval2 {riSlow, riFast} =

Check warning on line 59 in src/Simplex/Messaging/Agent/RetryInterval.hs

View workflow job for this annotation

GitHub Actions / build-ubuntu-20.04-9.6.3

Defined but not used: ‘updateRetryInterval2’

Check warning on line 59 in src/Simplex/Messaging/Agent/RetryInterval.hs

View workflow job for this annotation

GitHub Actions / build-ubuntu-22.04-9.6.3

Defined but not used: ‘updateRetryInterval2’
RetryInterval2
{ riSlow = riSlow {initialInterval = slowInterval, increaseAfter = 0},
riFast = riFast {initialInterval = fastInterval, increaseAfter = 0}
Expand All @@ -64,7 +81,7 @@

-- This function allows action to toggle between slow and fast retry intervals.
withRetryLock2 :: forall m. MonadIO m => RetryInterval2 -> TMVar () -> (RI2State -> (RetryIntervalMode -> m ()) -> m ()) -> m ()
withRetryLock2 RetryInterval2 {riSlow, riFast} lock action =

Check warning on line 84 in src/Simplex/Messaging/Agent/RetryInterval.hs

View workflow job for this annotation

GitHub Actions / build-ubuntu-20.04-9.6.3

Defined but not used: ‘withRetryLock2’

Check warning on line 84 in src/Simplex/Messaging/Agent/RetryInterval.hs

View workflow job for this annotation

GitHub Actions / build-ubuntu-22.04-9.6.3

Defined but not used: ‘withRetryLock2’
callAction (0, initialInterval riSlow) (0, initialInterval riFast)
where
callAction :: (Int64, Int64) -> (Int64, Int64) -> m ()
Expand Down
43 changes: 43 additions & 0 deletions src/Simplex/Messaging/Agent/RetryInterval/Delivery.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
{-# LANGUAGE NumericUnderscores #-}

module Simplex.Messaging.Agent.RetryInterval.Delivery where

import Data.Time.Clock (NominalDiffTime, nominalDay)
import Simplex.Messaging.Agent.RetryInterval

data MsgDeliveryConfig = MsgDeliveryConfig
{ messageRetryInterval :: RetryInterval,
messageTimeout :: NominalDiffTime,
messageConsecutiveRetries :: Int,
quotaExceededRetryInterval :: RetryInterval,
quotaExceededTimeout :: NominalDiffTime
}

defaultMsgDeliveryConfig :: MsgDeliveryConfig
defaultMsgDeliveryConfig =
MsgDeliveryConfig
{ messageRetryInterval =
RetryInterval
{ initialInterval = 1_000000,
increaseAfter = 10_000000,
maxInterval = 60_000000
},
messageTimeout = 2 * nominalDay,
messageConsecutiveRetries = 3,
quotaExceededRetryInterval =
RetryInterval
{ initialInterval = 180_000000, -- 3 minutes
increaseAfter = 0,
maxInterval = 3 * 3600_000000 -- 3 hours
},
quotaExceededTimeout = 7 * nominalDay
}

-- if
-- | quota exceeded ->
-- | message expired -> send error, stop retries, check and fail other expired messages
-- | otherwise -> stop retries, update delay, store deliver_after
-- | timeout ->
-- | n < messageConsecutiveRetries -> loop
-- | message expired -> send error, stop retries, check and fail other expired messages
-- | otherwise -> stop retries, update delay, store deliver_after
4 changes: 3 additions & 1 deletion src/Simplex/Messaging/Agent/Store.hs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import Data.Maybe (isJust)
import Data.Time (UTCTime)
import Data.Type.Equality
import Simplex.Messaging.Agent.Protocol
import Simplex.Messaging.Agent.RetryInterval (RI2State)
import Simplex.Messaging.Agent.RetryInterval (RI2State, RIState)
import qualified Simplex.Messaging.Crypto as C
import Simplex.Messaging.Crypto.Ratchet (RatchetX448)
import Simplex.Messaging.Encoding.String
Expand Down Expand Up @@ -149,6 +149,8 @@ data StoredSndQueue (q :: QueueStored) = SndQueue
e2eDhSecret :: C.DhSecretX25519,
-- | queue status
status :: QueueStatus,
quotaExceeded :: Bool,
retryState :: Maybe RIState,
-- | database queue ID (within connection)
dbQueueId :: DBQueueId q,
-- | True for a primary or a next primary queue of the connection (next if dbReplaceQueueId is set)
Expand Down
35 changes: 30 additions & 5 deletions src/Simplex/Messaging/Agent/Store/SQLite.hs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
{-# LANGUAGE TupleSections #-}
{-# LANGUAGE TypeOperators #-}
{-# LANGUAGE UndecidableInstances #-}
{-# OPTIONS_GHC -fno-warn-ambiguous-fields #-}

Check warning on line 22 in src/Simplex/Messaging/Agent/Store/SQLite.hs

View workflow job for this annotation

GitHub Actions / build-ubuntu-20.04-8.10.7

unrecognised warning flag: -fno-warn-ambiguous-fields
{-# OPTIONS_GHC -fno-warn-orphans #-}

module Simplex.Messaging.Agent.Store.SQLite
Expand Down Expand Up @@ -107,9 +107,15 @@
createSndMsgDelivery,
getSndMsgViaRcpt,
updateSndMsgRcpt,
getPendingQueueMsg,
updatePendingMsgRIState,
getPendingSessionMsg,
-- getPendingQueueMsg,
-- updatePendingMsgRIState,
updateSndQueueDelivery,
deletePendingMsgs,
getExpiredSndMessages,
deleteExpiredSndMessages,
setQuotaAvailable,
updateDeliveryDelay,
setMsgUserAck,
getRcvMsg,
getLastMsg,
Expand Down Expand Up @@ -263,6 +269,7 @@
import qualified Simplex.Messaging.Agent.Store.SQLite.DB as DB
import Simplex.Messaging.Agent.Store.SQLite.Migrations (DownMigration (..), MTRError, Migration (..), MigrationsToRun (..), mtrErrorDescription)
import qualified Simplex.Messaging.Agent.Store.SQLite.Migrations as Migrations
import Simplex.Messaging.Client (SMPTransportSession)
import qualified Simplex.Messaging.Crypto as C
import Simplex.Messaging.Crypto.File (CryptoFile (..), CryptoFileArgs (..))
import Simplex.Messaging.Crypto.Ratchet (RatchetX448, SkippedMsgDiff (..), SkippedMsgKeys)
Expand Down Expand Up @@ -978,8 +985,11 @@
"UPDATE snd_messages SET rcpt_internal_id = ?, rcpt_status = ? WHERE conn_id = ? AND internal_snd_id = ?"
(agentMsgId, msgRcptStatus, connId, sndMsgId)

getPendingSessionMsg :: DB.Connection -> SMPTransportSession -> IO (Either StoreError (Maybe (ConnData, SndQueue, Maybe RcvQueue, PendingMsgData)))
getPendingSessionMsg _db _tSess = undefined

getPendingQueueMsg :: DB.Connection -> ConnId -> SndQueue -> IO (Either StoreError (Maybe (Maybe RcvQueue, PendingMsgData)))
getPendingQueueMsg db connId SndQueue {dbQueueId} =

Check warning on line 992 in src/Simplex/Messaging/Agent/Store/SQLite.hs

View workflow job for this annotation

GitHub Actions / build-ubuntu-20.04-9.6.3

Defined but not used: ‘getPendingQueueMsg’

Check warning on line 992 in src/Simplex/Messaging/Agent/Store/SQLite.hs

View workflow job for this annotation

GitHub Actions / build-ubuntu-22.04-9.6.3

Defined but not used: ‘getPendingQueueMsg’
getWorkItem "message" getMsgId getMsgData markMsgFailed
where
getMsgId :: IO (Maybe InternalId)
Expand Down Expand Up @@ -1033,14 +1043,29 @@
mkError :: E.SomeException -> StoreError
mkError e = SEWorkItemError $ itemName <> " " <> opName <> " error: " <> bshow e

updatePendingMsgRIState :: DB.Connection -> ConnId -> InternalId -> RI2State -> IO ()
updatePendingMsgRIState db connId msgId RI2State {slowInterval, fastInterval} =
DB.execute db "UPDATE snd_messages SET retry_int_slow = ?, retry_int_fast = ? WHERE conn_id = ? AND internal_id = ?" (slowInterval, fastInterval, connId, msgId)
-- updatePendingMsgRIState :: DB.Connection -> ConnId -> InternalId -> RI2State -> IO ()
-- updatePendingMsgRIState db connId msgId RI2State {slowInterval, fastInterval} =
-- DB.execute db "UPDATE snd_messages SET retry_int_slow = ?, retry_int_fast = ? WHERE conn_id = ? AND internal_id = ?" (slowInterval, fastInterval, connId, msgId)

updateSndQueueDelivery :: DB.Connection -> ConnId -> SndQueue -> Bool -> Int64 -> IO ()
updateSndQueueDelivery _db _connId _sq _quotaExceeded _delay = undefined

deletePendingMsgs :: DB.Connection -> ConnId -> SndQueue -> IO ()
deletePendingMsgs db connId SndQueue {dbQueueId} =
DB.execute db "DELETE FROM snd_message_deliveries WHERE conn_id = ? AND snd_queue_id = ?" (connId, dbQueueId)

getExpiredSndMessages :: DB.Connection -> ConnId -> SndQueue -> IO [InternalId]
getExpiredSndMessages = undefined

deleteExpiredSndMessages :: DB.Connection -> ConnId -> SndQueue -> [InternalId] -> IO ()
deleteExpiredSndMessages = undefined

setQuotaAvailable :: DB.Connection -> ConnId -> SndQueue -> IO ()
setQuotaAvailable = undefined

updateDeliveryDelay :: DB.Connection -> ConnId -> SndQueue -> IO ()
updateDeliveryDelay = undefined

setMsgUserAck :: DB.Connection -> ConnId -> InternalId -> IO (Either StoreError (RcvQueue, SMP.MsgId))
setMsgUserAck db connId agentMsgId = runExceptT $ do
(dbRcvId, srvMsgId) <-
Expand Down Expand Up @@ -2006,7 +2031,7 @@
:. (dbQueueId, primary, dbReplaceQueueId, sndSwchStatus, smpClientVersion)
) =
let server = SMPServer host port keyHash
in SndQueue {userId, connId, server, sndId, sndPublicKey, sndPrivateKey, e2ePubKey, e2eDhSecret, status, dbQueueId, primary, dbReplaceQueueId, sndSwchStatus, smpClientVersion}

Check warning on line 2034 in src/Simplex/Messaging/Agent/Store/SQLite.hs

View workflow job for this annotation

GitHub Actions / build-ubuntu-20.04-9.6.3

• Fields of ‘SndQueue’ not initialised:

Check warning on line 2034 in src/Simplex/Messaging/Agent/Store/SQLite.hs

View workflow job for this annotation

GitHub Actions / build-ubuntu-22.04-9.6.3

• Fields of ‘SndQueue’ not initialised:

getSndQueueById :: DB.Connection -> ConnId -> Int64 -> IO (Either StoreError SndQueue)
getSndQueueById db connId dbSndId =
Expand Down
4 changes: 3 additions & 1 deletion src/Simplex/Messaging/Agent/Store/SQLite/Migrations.hs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230814_indexes
import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230829_crypto_files
import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20231222_command_created_at
import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20231225_failed_work_items
import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20240118_snd_queue_delivery
import Simplex.Messaging.Encoding.String
import Simplex.Messaging.Parsers (dropPrefix, sumTypeJSON)
import Simplex.Messaging.Transport.Client (TransportHost)
Expand Down Expand Up @@ -102,7 +103,8 @@ schemaMigrations =
("m20230814_indexes", m20230814_indexes, Just down_m20230814_indexes),
("m20230829_crypto_files", m20230829_crypto_files, Just down_m20230829_crypto_files),
("m20231222_command_created_at", m20231222_command_created_at, Just down_m20231222_command_created_at),
("m20231225_failed_work_items", m20231225_failed_work_items, Just down_m20231225_failed_work_items)
("m20231225_failed_work_items", m20231225_failed_work_items, Just down_m20231225_failed_work_items),
("m20240118_snd_queue_delivery", m20240118_snd_queue_delivery, Just down_m20240118_snd_queue_delivery)
]

-- | The list of migrations in ascending order by date
Expand Down
Loading
Loading