Skip to content

Commit

Permalink
agent: do not throw exception when command is created for deleted con…
Browse files Browse the repository at this point in the history
…nection (#1150)

* agent: do not throw exception when command is created for deleted connection

* convert database busy/locked to critical alert
  • Loading branch information
epoberezkin authored May 13, 2024
1 parent 91cc48a commit 4455b8b
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 9 deletions.
2 changes: 1 addition & 1 deletion src/Simplex/Messaging/Agent.hs
Original file line number Diff line number Diff line change
Expand Up @@ -1994,7 +1994,7 @@ subscriber :: AgentClient -> AM' ()
subscriber c@AgentClient {subQ, msgQ} = forever $ do
t <- atomically $ readTBQueue msgQ
agentOperationBracket c AORcvNetwork waitUntilActive $
runExceptT (processSMPTransmission c t) >>= \case
tryAgentError' (processSMPTransmission c t) >>= \case
Left e -> do
logError $ tshow e
atomically $ writeTBQueue subQ ("", "", APC SAEConn $ ERR e)
Expand Down
14 changes: 11 additions & 3 deletions src/Simplex/Messaging/Agent/Client.hs
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ import Data.Text.Encoding
import Data.Time (UTCTime, defaultTimeLocale, diffUTCTime, formatTime, getCurrentTime)
import Data.Time.Clock.System (getSystemTime)
import Data.Word (Word16)
import qualified Database.SQLite.Simple as SQL
import Network.Socket (HostName)
import Simplex.FileTransfer.Client (XFTPChunkSpec (..), XFTPClient, XFTPClientConfig (..), XFTPClientError)
import qualified Simplex.FileTransfer.Client as X
Expand Down Expand Up @@ -1621,10 +1622,16 @@ withStore :: AgentClient -> (DB.Connection -> IO (Either StoreError a)) -> AM a
withStore c action = do
st <- asks store
withExceptT storeError . ExceptT . liftIO . agentOperationBracket c AODatabase (\_ -> pure ()) $
withTransaction st action `E.catch` handleInternal ""
withTransaction st action `E.catches` handleDBErrors
where
handleInternal :: String -> E.SomeException -> IO (Either StoreError a)
handleInternal ctxStr e = pure . Left . SEInternal . B.pack $ show e <> ctxStr
handleDBErrors :: [E.Handler IO (Either StoreError a)]
handleDBErrors =
[ E.Handler $ \(e :: SQL.SQLError) ->
let se = SQL.sqlError e
busy = se == SQL.ErrorBusy || se == SQL.ErrorLocked
in pure . Left . (if busy then SEDatabaseBusy else SEInternal) $ bshow se,
E.Handler $ \(E.SomeException e) -> pure . Left $ SEInternal $ bshow e
]

withStoreBatch :: Traversable t => AgentClient -> (DB.Connection -> t (IO (Either AgentErrorType a))) -> AM' (t (Either AgentErrorType a))
withStoreBatch c actions = do
Expand Down Expand Up @@ -1652,6 +1659,7 @@ storeError = \case
-- it is used to wrap agent operations when "transaction-like" store access is needed
-- NOTE: network IO should NOT be used inside AgentStoreMonad
SEAgentError e -> e
SEDatabaseBusy e -> CRITICAL True $ B.unpack e
e -> INTERNAL $ show e

incStat :: AgentClient -> Int -> AgentStatsKey -> STM ()
Expand Down
4 changes: 3 additions & 1 deletion src/Simplex/Messaging/Agent/Store.hs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import Data.Type.Equality
import Simplex.Messaging.Agent.Protocol
import Simplex.Messaging.Agent.RetryInterval (RI2State)
import qualified Simplex.Messaging.Crypto as C
import Simplex.Messaging.Crypto.Ratchet (RatchetX448, PQEncryption, PQSupport)
import Simplex.Messaging.Crypto.Ratchet (PQEncryption, PQSupport, RatchetX448)
import Simplex.Messaging.Encoding.String
import Simplex.Messaging.Protocol
( MsgBody,
Expand Down Expand Up @@ -593,6 +593,8 @@ type AsyncCmdId = Int64
data StoreError
= -- | IO exceptions in store actions.
SEInternal ByteString
| -- | Database busy
SEDatabaseBusy ByteString
| -- | Failed to generate unique random ID
SEUniqueID
| -- | User ID not found
Expand Down
13 changes: 9 additions & 4 deletions src/Simplex/Messaging/Agent/Store/SQLite.hs
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@ module Simplex.Messaging.Agent.Store.SQLite
)
where

import Control.Logger.Simple
import Control.Monad
import Control.Monad.Except
import Control.Monad.IO.Class
Expand Down Expand Up @@ -268,7 +269,7 @@ import Simplex.Messaging.Agent.Store.SQLite.Migrations (DownMigration (..), MTRE
import qualified Simplex.Messaging.Agent.Store.SQLite.Migrations as Migrations
import qualified Simplex.Messaging.Crypto as C
import Simplex.Messaging.Crypto.File (CryptoFile (..), CryptoFileArgs (..))
import Simplex.Messaging.Crypto.Ratchet (RatchetX448, SkippedMsgDiff (..), SkippedMsgKeys, PQEncryption (..), PQSupport (..))
import Simplex.Messaging.Crypto.Ratchet (PQEncryption (..), PQSupport (..), RatchetX448, SkippedMsgDiff (..), SkippedMsgKeys)
import qualified Simplex.Messaging.Crypto.Ratchet as CR
import Simplex.Messaging.Encoding
import Simplex.Messaging.Encoding.String
Expand All @@ -278,7 +279,7 @@ import Simplex.Messaging.Parsers (blobFieldParser, defaultJSON, dropPrefix, from
import Simplex.Messaging.Protocol
import qualified Simplex.Messaging.Protocol as SMP
import Simplex.Messaging.Transport.Client (TransportHost)
import Simplex.Messaging.Util (bshow, catchAllErrors, eitherToMaybe, ifM, safeDecodeUtf8, ($>>=), (<$$>))
import Simplex.Messaging.Util (bshow, catchAllErrors, eitherToMaybe, ifM, safeDecodeUtf8, tshow, ($>>=), (<$$>))
import Simplex.Messaging.Version.Internal
import System.Directory (copyFile, createDirectoryIfMissing, doesFileExist)
import System.Exit (exitFailure)
Expand Down Expand Up @@ -1272,12 +1273,16 @@ createCommand :: DB.Connection -> ACorrId -> ConnId -> Maybe SMPServer -> AgentC
createCommand db corrId connId srv_ cmd = runExceptT $ do
(host_, port_, serverKeyHash_) <- serverFields
createdAt <- liftIO getCurrentTime
liftIO $
liftIO . E.handle handleErr $
DB.execute
db
"INSERT INTO commands (host, port, corr_id, conn_id, command_tag, command, server_key_hash, created_at) VALUES (?,?,?,?,?,?,?,?)"
(host_, port_, corrId, connId, agentCommandTag cmd, cmd, serverKeyHash_, createdAt)
(host_, port_, corrId, connId, cmdTag, cmd, serverKeyHash_, createdAt)
where
cmdTag = agentCommandTag cmd
handleErr e
| SQL.sqlError e == SQL.ErrorConstraint = logError $ "tried to create command " <> tshow cmdTag <> " for deleted connection"
| otherwise = E.throwIO e
serverFields :: ExceptT StoreError IO (Maybe (NonEmpty TransportHost), Maybe ServiceName, Maybe C.KeyHash)
serverFields = case srv_ of
Just srv@(SMPServer host port _) ->
Expand Down

0 comments on commit 4455b8b

Please sign in to comment.