Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

servers: blocking records for content moderation #1430

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 35 additions & 16 deletions src/Simplex/FileTransfer/Server.hs
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,11 @@ import qualified Simplex.Messaging.Crypto as C
import qualified Simplex.Messaging.Crypto.Lazy as LC
import Simplex.Messaging.Encoding
import Simplex.Messaging.Encoding.String
import Simplex.Messaging.Protocol (CorrId (..), EntityId (..), RcvPublicAuthKey, RcvPublicDhKey, RecipientId, TransmissionAuth, pattern NoEntity)
import Simplex.Messaging.Protocol (CorrId (..), BlockingInfo, EntityId (..), RcvPublicAuthKey, RcvPublicDhKey, RecipientId, TransmissionAuth, pattern NoEntity)
import Simplex.Messaging.Server (dummyVerifyCmd, verifyCmdAuthorization)
import Simplex.Messaging.Server.Control (CPClientRole (..))
import Simplex.Messaging.Server.Expiration
import Simplex.Messaging.Server.QueueStore (RoundedSystemTime, getRoundedSystemTime)
import Simplex.Messaging.Server.QueueStore (RoundedSystemTime, ServerEntityStatus (..), getRoundedSystemTime)
import Simplex.Messaging.Server.Stats
import Simplex.Messaging.TMap (TMap)
import qualified Simplex.Messaging.TMap as TM
Expand Down Expand Up @@ -287,11 +287,15 @@ xftpServer cfg@XFTPServerConfig {xftpPort, transportConfig, inactiveClientExpira
CPDelete fileId -> withUserRole $ unliftIO u $ do
fs <- asks store
r <- runExceptT $ do
let asSender = ExceptT . atomically $ getFile fs SFSender fileId
let asRecipient = ExceptT . atomically $ getFile fs SFRecipient fileId
(fr, _) <- asSender `catchError` const asRecipient
(fr, _) <- ExceptT $ atomically $ getFile fs SFSender fileId
ExceptT $ deleteServerFile_ fr
liftIO . hPutStrLn h $ either (\e -> "error: " <> show e) (\() -> "ok") r
CPBlock fileId info -> withUserRole $ unliftIO u $ do
fs <- asks store
r <- runExceptT $ do
(fr, _) <- ExceptT $ atomically $ getFile fs SFSender fileId
ExceptT $ blockServerFile fr info
liftIO . hPutStrLn h $ either (\e -> "error: " <> show e) (\() -> "ok") r
CPHelp -> hPutStrLn h "commands: stats-rts, delete, help, quit"
CPQuit -> pure ()
CPSkip -> pure ()
Expand Down Expand Up @@ -321,7 +325,7 @@ processRequest XFTPTransportRequest {thParams, reqBody = body@HTTP2Body {bodyHea
let THandleParams {thAuth} = thParams
verifyXFTPTransmission ((,C.cbNonce (bs corrId)) <$> thAuth) sig_ signed fId cmd >>= \case
VRVerified req -> uncurry send =<< processXFTPRequest body req
VRFailed -> send (FRErr AUTH) Nothing
VRFailed e -> send (FRErr e) Nothing
Left e -> send (FRErr e) Nothing
where
send resp = sendXFTPResponse (corrId, fId, resp)
Expand Down Expand Up @@ -355,7 +359,7 @@ randomDelay = do
threadDelay $ (d * (1000 + pc)) `div` 1000
#endif

data VerificationResult = VRVerified XFTPRequest | VRFailed
data VerificationResult = VRVerified XFTPRequest | VRFailed XFTPErrorType

verifyXFTPTransmission :: Maybe (THandleAuth 'TServer, C.CbNonce) -> Maybe TransmissionAuth -> ByteString -> XFTPFileId -> FileCmd -> M VerificationResult
verifyXFTPTransmission auth_ tAuth authorized fId cmd =
Expand All @@ -367,13 +371,19 @@ verifyXFTPTransmission auth_ tAuth authorized fId cmd =
verifyCmd :: SFileParty p -> M VerificationResult
verifyCmd party = do
st <- asks store
atomically $ verify <$> getFile st party fId
atomically $ verify =<< getFile st party fId
where
verify = \case
Right (fr, k) -> XFTPReqCmd fId fr cmd `verifyWith` k
_ -> maybe False (dummyVerifyCmd Nothing authorized) tAuth `seq` VRFailed
Right (fr, k) -> result <$> readTVar (fileStatus fr)
where
result = \case
EntityActive -> XFTPReqCmd fId fr cmd `verifyWith` k
EntityBlocked info -> VRFailed $ BLOCKED info
EntityOff -> noFileAuth
Left _ -> pure noFileAuth
noFileAuth = maybe False (dummyVerifyCmd Nothing authorized) tAuth `seq` VRFailed AUTH
-- TODO verify with DH authorization
req `verifyWith` k = if verifyCmdAuthorization auth_ tAuth authorized k then VRVerified req else VRFailed
req `verifyWith` k = if verifyCmdAuthorization auth_ tAuth authorized k then VRVerified req else VRFailed AUTH

processXFTPRequest :: HTTP2Body -> XFTPRequest -> M (FileResponse, Maybe ServerFile)
processXFTPRequest HTTP2Body {bodyPart} = \case
Expand All @@ -390,7 +400,7 @@ processXFTPRequest HTTP2Body {bodyPart} = \case
FACK -> noFile =<< ackFileReception fId fr
-- it should never get to the commands below, they are passed in other constructors of XFTPRequest
FNEW {} -> noFile $ FRErr INTERNAL
PING -> noFile FRPong
PING -> noFile $ FRErr INTERNAL
XFTPReqPing -> noFile FRPong
where
noFile resp = pure (resp, Nothing)
Expand Down Expand Up @@ -518,15 +528,24 @@ processXFTPRequest HTTP2Body {bodyPart} = \case
pure FROk

deleteServerFile_ :: FileRec -> M (Either XFTPErrorType ())
deleteServerFile_ FileRec {senderId, fileInfo, filePath} = do
deleteServerFile_ fr@FileRec {senderId} = do
withFileLog (`logDeleteFile` senderId)
runExceptT $ do
deleteOrBlockServerFile_ fr filesDeleted (`deleteFile` senderId)

-- this also deletes the file from storage, but doesn't include it in delete statistics
blockServerFile :: FileRec -> BlockingInfo -> M (Either XFTPErrorType ())
blockServerFile fr@FileRec {senderId} info = do
withFileLog $ \sl -> logBlockFile sl senderId info
deleteOrBlockServerFile_ fr filesBlocked $ \st -> blockFile st senderId info True

deleteOrBlockServerFile_ :: FileRec -> (FileServerStats -> IORef Int) -> (FileStore -> STM (Either XFTPErrorType ())) -> M (Either XFTPErrorType ())
deleteOrBlockServerFile_ FileRec {filePath, fileInfo} stat storeAction = runExceptT $ do
path <- readTVarIO filePath
stats <- asks serverStats
ExceptT $ first (\(_ :: SomeException) -> FILE_IO) <$> try (forM_ path $ \p -> whenM (doesFileExist p) (removeFile p >> deletedStats stats))
st <- asks store
void $ atomically $ deleteFile st senderId
lift $ incFileStat filesDeleted
void $ atomically $ storeAction st
lift $ incFileStat stat
where
deletedStats stats = do
liftIO $ atomicModifyIORef'_ (filesCount stats) (subtract 1)
Expand Down
5 changes: 4 additions & 1 deletion src/Simplex/FileTransfer/Server/Control.hs
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,13 @@ module Simplex.FileTransfer.Server.Control where
import qualified Data.Attoparsec.ByteString.Char8 as A
import Simplex.FileTransfer.Protocol (XFTPFileId)
import Simplex.Messaging.Encoding.String
import Simplex.Messaging.Protocol (BasicAuth)
import Simplex.Messaging.Protocol (BasicAuth, BlockingInfo)

data ControlProtocol
= CPAuth BasicAuth
| CPStatsRTS
| CPDelete XFTPFileId
| CPBlock XFTPFileId BlockingInfo
| CPHelp
| CPQuit
| CPSkip
Expand All @@ -21,6 +22,7 @@ instance StrEncoding ControlProtocol where
CPAuth tok -> "auth " <> strEncode tok
CPStatsRTS -> "stats-rts"
CPDelete fId -> strEncode (Str "delete", fId)
CPBlock fId info -> strEncode (Str "block", fId, info)
CPHelp -> "help"
CPQuit -> "quit"
CPSkip -> ""
Expand All @@ -29,6 +31,7 @@ instance StrEncoding ControlProtocol where
"auth" -> CPAuth <$> _strP
"stats-rts" -> pure CPStatsRTS
"delete" -> CPDelete <$> _strP
"block" -> CPBlock <$> _strP <*> _strP
"help" -> pure CPHelp
"quit" -> pure CPQuit
"" -> pure CPSkip
Expand Down
15 changes: 11 additions & 4 deletions src/Simplex/FileTransfer/Server/Stats.hs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ data FileServerStats = FileServerStats
filesUploaded :: IORef Int,
filesExpired :: IORef Int,
filesDeleted :: IORef Int,
filesBlocked :: IORef Int,
filesDownloaded :: PeriodStats,
fileDownloads :: IORef Int,
fileDownloadAcks :: IORef Int,
Expand All @@ -34,6 +35,7 @@ data FileServerStatsData = FileServerStatsData
_filesUploaded :: Int,
_filesExpired :: Int,
_filesDeleted :: Int,
_filesBlocked :: Int,
_filesDownloaded :: PeriodStatsData,
_fileDownloads :: Int,
_fileDownloadAcks :: Int,
Expand All @@ -50,12 +52,13 @@ newFileServerStats ts = do
filesUploaded <- newIORef 0
filesExpired <- newIORef 0
filesDeleted <- newIORef 0
filesBlocked <- newIORef 0
filesDownloaded <- newPeriodStats
fileDownloads <- newIORef 0
fileDownloadAcks <- newIORef 0
filesCount <- newIORef 0
filesSize <- newIORef 0
pure FileServerStats {fromTime, filesCreated, fileRecipients, filesUploaded, filesExpired, filesDeleted, filesDownloaded, fileDownloads, fileDownloadAcks, filesCount, filesSize}
pure FileServerStats {fromTime, filesCreated, fileRecipients, filesUploaded, filesExpired, filesDeleted, filesBlocked, filesDownloaded, fileDownloads, fileDownloadAcks, filesCount, filesSize}

getFileServerStatsData :: FileServerStats -> IO FileServerStatsData
getFileServerStatsData s = do
Expand All @@ -65,12 +68,13 @@ getFileServerStatsData s = do
_filesUploaded <- readIORef $ filesUploaded s
_filesExpired <- readIORef $ filesExpired s
_filesDeleted <- readIORef $ filesDeleted s
_filesBlocked <- readIORef $ filesBlocked s
_filesDownloaded <- getPeriodStatsData $ filesDownloaded s
_fileDownloads <- readIORef $ fileDownloads s
_fileDownloadAcks <- readIORef $ fileDownloadAcks s
_filesCount <- readIORef $ filesCount s
_filesSize <- readIORef $ filesSize s
pure FileServerStatsData {_fromTime, _filesCreated, _fileRecipients, _filesUploaded, _filesExpired, _filesDeleted, _filesDownloaded, _fileDownloads, _fileDownloadAcks, _filesCount, _filesSize}
pure FileServerStatsData {_fromTime, _filesCreated, _fileRecipients, _filesUploaded, _filesExpired, _filesDeleted, _filesBlocked, _filesDownloaded, _fileDownloads, _fileDownloadAcks, _filesCount, _filesSize}

-- this function is not thread safe, it is used on server start only
setFileServerStats :: FileServerStats -> FileServerStatsData -> IO ()
Expand All @@ -81,21 +85,23 @@ setFileServerStats s d = do
writeIORef (filesUploaded s) $! _filesUploaded d
writeIORef (filesExpired s) $! _filesExpired d
writeIORef (filesDeleted s) $! _filesDeleted d
writeIORef (filesBlocked s) $! _filesBlocked d
setPeriodStats (filesDownloaded s) $! _filesDownloaded d
writeIORef (fileDownloads s) $! _fileDownloads d
writeIORef (fileDownloadAcks s) $! _fileDownloadAcks d
writeIORef (filesCount s) $! _filesCount d
writeIORef (filesSize s) $! _filesSize d

instance StrEncoding FileServerStatsData where
strEncode FileServerStatsData {_fromTime, _filesCreated, _fileRecipients, _filesUploaded, _filesExpired, _filesDeleted, _filesDownloaded, _fileDownloads, _fileDownloadAcks, _filesCount, _filesSize} =
strEncode FileServerStatsData {_fromTime, _filesCreated, _fileRecipients, _filesUploaded, _filesExpired, _filesDeleted, _filesBlocked, _filesDownloaded, _fileDownloads, _fileDownloadAcks, _filesCount, _filesSize} =
B.unlines
[ "fromTime=" <> strEncode _fromTime,
"filesCreated=" <> strEncode _filesCreated,
"fileRecipients=" <> strEncode _fileRecipients,
"filesUploaded=" <> strEncode _filesUploaded,
"filesExpired=" <> strEncode _filesExpired,
"filesDeleted=" <> strEncode _filesDeleted,
"filesBlocked=" <> strEncode _filesBlocked,
"filesCount=" <> strEncode _filesCount,
"filesSize=" <> strEncode _filesSize,
"filesDownloaded:",
Expand All @@ -110,9 +116,10 @@ instance StrEncoding FileServerStatsData where
_filesUploaded <- "filesUploaded=" *> strP <* A.endOfLine
_filesExpired <- "filesExpired=" *> strP <* A.endOfLine <|> pure 0
_filesDeleted <- "filesDeleted=" *> strP <* A.endOfLine
_filesBlocked <- "filesBlocked=" *> strP <* A.endOfLine
_filesCount <- "filesCount=" *> strP <* A.endOfLine <|> pure 0
_filesSize <- "filesSize=" *> strP <* A.endOfLine <|> pure 0
_filesDownloaded <- "filesDownloaded:" *> A.endOfLine *> strP <* A.endOfLine
_fileDownloads <- "fileDownloads=" *> strP <* A.endOfLine
_fileDownloadAcks <- "fileDownloadAcks=" *> strP <* A.endOfLine
pure FileServerStatsData {_fromTime, _filesCreated, _fileRecipients, _filesUploaded, _filesExpired, _filesDeleted, _filesDownloaded, _fileDownloads, _fileDownloadAcks, _filesCount, _filesSize}
pure FileServerStatsData {_fromTime, _filesCreated, _fileRecipients, _filesUploaded, _filesExpired, _filesDeleted, _filesBlocked, _filesDownloaded, _fileDownloads, _fileDownloadAcks, _filesCount, _filesSize}
20 changes: 16 additions & 4 deletions src/Simplex/FileTransfer/Server/Store.hs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ module Simplex.FileTransfer.Server.Store
setFilePath,
addRecipient,
deleteFile,
blockFile,
deleteRecipient,
expiredFilePath,
getFile,
Expand All @@ -22,6 +23,7 @@ module Simplex.FileTransfer.Server.Store
where

import Control.Concurrent.STM
import Control.Monad
import qualified Data.Attoparsec.ByteString.Char8 as A
import Data.Int (Int64)
import Data.Set (Set)
Expand All @@ -30,8 +32,8 @@ import Simplex.FileTransfer.Protocol (FileInfo (..), SFileParty (..), XFTPFileId
import Simplex.FileTransfer.Transport (XFTPErrorType (..))
import qualified Simplex.Messaging.Crypto as C
import Simplex.Messaging.Encoding.String
import Simplex.Messaging.Protocol (RcvPublicAuthKey, RecipientId, SenderId)
import Simplex.Messaging.Server.QueueStore (RoundedSystemTime (..))
import Simplex.Messaging.Protocol (BlockingInfo, RcvPublicAuthKey, RecipientId, SenderId)
import Simplex.Messaging.Server.QueueStore (RoundedSystemTime (..), ServerEntityStatus (..))
import Simplex.Messaging.TMap (TMap)
import qualified Simplex.Messaging.TMap as TM
import Simplex.Messaging.Util (ifM, ($>>=))
Expand All @@ -47,7 +49,8 @@ data FileRec = FileRec
fileInfo :: FileInfo,
filePath :: TVar (Maybe FilePath),
recipientIds :: TVar (Set RecipientId),
createdAt :: RoundedSystemTime
createdAt :: RoundedSystemTime,
fileStatus :: TVar ServerEntityStatus
}

fileTimePrecision :: Int64
Expand Down Expand Up @@ -78,7 +81,8 @@ newFileRec :: SenderId -> FileInfo -> RoundedSystemTime -> STM FileRec
newFileRec senderId fileInfo createdAt = do
recipientIds <- newTVar S.empty
filePath <- newTVar Nothing
pure FileRec {senderId, fileInfo, filePath, recipientIds, createdAt}
fileStatus <- newTVar EntityActive
pure FileRec {senderId, fileInfo, filePath, recipientIds, createdAt, fileStatus}

setFilePath :: FileStore -> SenderId -> FilePath -> STM (Either XFTPErrorType ())
setFilePath st sId fPath =
Expand Down Expand Up @@ -109,6 +113,14 @@ deleteFile FileStore {files, recipients, usedStorage} senderId = do
pure $ Right ()
_ -> pure $ Left AUTH

-- this function must be called after the file is deleted from the file system
blockFile :: FileStore -> SenderId -> BlockingInfo -> Bool -> STM (Either XFTPErrorType ())
blockFile st@FileStore {usedStorage} senderId info deleted =
withFile st senderId $ \FileRec {fileInfo, fileStatus} -> do
when deleted $ modifyTVar' usedStorage $ subtract (fromIntegral $ size fileInfo)
writeTVar fileStatus $! EntityBlocked info
pure $ Right ()

deleteRecipient :: FileStore -> RecipientId -> FileRec -> STM ()
deleteRecipient FileStore {recipients} rId FileRec {recipientIds} = do
TM.delete rId recipients
Expand Down
12 changes: 10 additions & 2 deletions src/Simplex/FileTransfer/Server/StoreLog.hs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ module Simplex.FileTransfer.Server.StoreLog
logPutFile,
logAddRecipients,
logDeleteFile,
logBlockFile,
logAckFile,
)
where
Expand All @@ -31,7 +32,7 @@ import qualified Data.Map.Strict as M
import Simplex.FileTransfer.Protocol (FileInfo (..))
import Simplex.FileTransfer.Server.Store
import Simplex.Messaging.Encoding.String
import Simplex.Messaging.Protocol (RcvPublicAuthKey, RecipientId, SenderId)
import Simplex.Messaging.Protocol (BlockingInfo, RcvPublicAuthKey, RecipientId, SenderId)
import Simplex.Messaging.Server.QueueStore (RoundedSystemTime)
import Simplex.Messaging.Server.StoreLog
import Simplex.Messaging.Util (bshow)
Expand All @@ -42,7 +43,8 @@ data FileStoreLogRecord
| PutFile SenderId FilePath
| AddRecipients SenderId (NonEmpty FileRecipient)
| DeleteFile SenderId
| AckFile RecipientId
| BlockFile SenderId BlockingInfo
| AckFile RecipientId -- TODO add senderId as well?
deriving (Show)

instance StrEncoding FileStoreLogRecord where
Expand All @@ -51,13 +53,15 @@ instance StrEncoding FileStoreLogRecord where
PutFile sId path -> strEncode (Str "FPUT", sId, path)
AddRecipients sId rcps -> strEncode (Str "FADD", sId, rcps)
DeleteFile sId -> strEncode (Str "FDEL", sId)
BlockFile sId info -> strEncode (Str "FBLK", sId, info)
AckFile rId -> strEncode (Str "FACK", rId)
strP =
A.choice
[ "FNEW " *> (AddFile <$> strP_ <*> strP_ <*> strP),
"FPUT " *> (PutFile <$> strP_ <*> strP),
"FADD " *> (AddRecipients <$> strP_ <*> strP),
"FDEL " *> (DeleteFile <$> strP),
"FBLK " *> (BlockFile <$> strP_ <*> strP),
"FACK " *> (AckFile <$> strP)
]

Expand All @@ -76,6 +80,9 @@ logAddRecipients s = logFileStoreRecord s .: AddRecipients
logDeleteFile :: StoreLog 'WriteMode -> SenderId -> IO ()
logDeleteFile s = logFileStoreRecord s . DeleteFile

logBlockFile :: StoreLog 'WriteMode -> SenderId -> BlockingInfo -> IO ()
logBlockFile s fId = logFileStoreRecord s . BlockFile fId

logAckFile :: StoreLog 'WriteMode -> RecipientId -> IO ()
logAckFile s = logFileStoreRecord s . AckFile

Expand All @@ -96,6 +103,7 @@ readFileStore f st = mapM_ (addFileLogRecord . LB.toStrict) . LB.lines =<< LB.re
PutFile qId path -> setFilePath st qId path
AddRecipients sId rcps -> runExceptT $ addRecipients sId rcps
DeleteFile sId -> deleteFile st sId
BlockFile sId info -> blockFile st sId info True
AckFile rId -> ackFile st rId
addRecipients sId rcps = mapM_ (ExceptT . addRecipient st sId) rcps

Expand Down
Loading
Loading