From dbc6ae2a478441992fb247cb12deceeb601d65ed Mon Sep 17 00:00:00 2001 From: Alexander Bondarenko <486682+dpwiz@users.noreply.github.com> Date: Fri, 10 May 2024 18:09:49 +0300 Subject: [PATCH] WIP: command rate monitoring --- package.yaml | 2 + simplexmq.cabal | 14 ++++ src/Simplex/Messaging/Server.hs | 46 ++++++++++- src/Simplex/Messaging/Server/Env/STM.hs | 23 ++++-- src/Simplex/Messaging/Server/Main.hs | 3 + src/Simplex/Messaging/Server/Stats.hs | 81 ++++++++++++++++++- src/Simplex/Messaging/Transport.hs | 36 +++++++-- src/Simplex/Messaging/Transport/Server.hs | 5 +- src/Simplex/Messaging/Transport/WebSockets.hs | 15 ++-- tests/SMPClient.hs | 3 + 10 files changed, 207 insertions(+), 21 deletions(-) diff --git a/package.yaml b/package.yaml index 02a088e23..2caefad0a 100644 --- a/package.yaml +++ b/package.yaml @@ -45,6 +45,7 @@ dependencies: - direct-sqlcipher == 2.3.* - directory == 1.3.* - filepath == 1.4.* + - hashable == 1.4.* - hourglass == 0.2.* - http-types == 0.12.* - http2 >= 4.2.2 && < 4.3 @@ -59,6 +60,7 @@ dependencies: - network-udp >= 0.0 && < 0.1 - optparse-applicative >= 0.15 && < 0.17 - process == 1.6.* + - psqueues == 0.2.8.* - random >= 1.1 && < 1.3 - simple-logger == 0.1.* - socks == 0.6.* diff --git a/simplexmq.cabal b/simplexmq.cabal index 3366cb0b8..f8dd2d8c6 100644 --- a/simplexmq.cabal +++ b/simplexmq.cabal @@ -204,6 +204,7 @@ library , direct-sqlcipher ==2.3.* , directory ==1.3.* , filepath ==1.4.* + , hashable ==1.4.* , hourglass ==0.2.* , http-types ==0.12.* , http2 >=4.2.2 && <4.3 @@ -218,6 +219,7 @@ library , network-udp ==0.0.* , optparse-applicative >=0.15 && <0.17 , process ==1.6.* + , psqueues ==0.2.8.* , random >=1.1 && <1.3 , simple-logger ==0.1.* , socks ==0.6.* @@ -278,6 +280,7 @@ executable ntf-server , direct-sqlcipher ==2.3.* , directory ==1.3.* , filepath ==1.4.* + , hashable ==1.4.* , hourglass ==0.2.* , http-types ==0.12.* , http2 >=4.2.2 && <4.3 @@ -292,6 +295,7 @@ executable ntf-server , network-udp ==0.0.* , optparse-applicative >=0.15 && <0.17 , process ==1.6.* + , psqueues ==0.2.8.* , random >=1.1 && <1.3 , simple-logger ==0.1.* , simplexmq @@ -353,6 +357,7 @@ executable smp-agent , direct-sqlcipher ==2.3.* , directory ==1.3.* , filepath ==1.4.* + , hashable ==1.4.* , hourglass ==0.2.* , http-types ==0.12.* , http2 >=4.2.2 && <4.3 @@ -367,6 +372,7 @@ executable smp-agent , network-udp ==0.0.* , optparse-applicative >=0.15 && <0.17 , process ==1.6.* + , psqueues ==0.2.8.* , random >=1.1 && <1.3 , simple-logger ==0.1.* , simplexmq @@ -428,6 +434,7 @@ executable smp-server , direct-sqlcipher ==2.3.* , directory ==1.3.* , filepath ==1.4.* + , hashable ==1.4.* , hourglass ==0.2.* , http-types ==0.12.* , http2 >=4.2.2 && <4.3 @@ -442,6 +449,7 @@ executable smp-server , network-udp ==0.0.* , optparse-applicative >=0.15 && <0.17 , process ==1.6.* + , psqueues ==0.2.8.* , random >=1.1 && <1.3 , simple-logger ==0.1.* , simplexmq @@ -503,6 +511,7 @@ executable xftp , direct-sqlcipher ==2.3.* , directory ==1.3.* , filepath ==1.4.* + , hashable ==1.4.* , hourglass ==0.2.* , http-types ==0.12.* , http2 >=4.2.2 && <4.3 @@ -517,6 +526,7 @@ executable xftp , network-udp ==0.0.* , optparse-applicative >=0.15 && <0.17 , process ==1.6.* + , psqueues ==0.2.8.* , random >=1.1 && <1.3 , simple-logger ==0.1.* , simplexmq @@ -578,6 +588,7 @@ executable xftp-server , direct-sqlcipher ==2.3.* , directory ==1.3.* , filepath ==1.4.* + , hashable ==1.4.* , hourglass ==0.2.* , http-types ==0.12.* , http2 >=4.2.2 && <4.3 @@ -592,6 +603,7 @@ executable xftp-server , network-udp ==0.0.* , optparse-applicative >=0.15 && <0.17 , process ==1.6.* + , psqueues ==0.2.8.* , random >=1.1 && <1.3 , simple-logger ==0.1.* , simplexmq @@ -689,6 +701,7 @@ test-suite simplexmq-test , directory ==1.3.* , filepath ==1.4.* , generic-random ==1.5.* + , hashable ==1.4.* , hourglass ==0.2.* , hspec ==2.11.* , hspec-core ==2.11.* @@ -706,6 +719,7 @@ test-suite simplexmq-test , network-udp ==0.0.* , optparse-applicative >=0.15 && <0.17 , process ==1.6.* + , psqueues ==0.2.8.* , random >=1.1 && <1.3 , silently ==1.2.* , simple-logger ==0.1.* diff --git a/src/Simplex/Messaging/Server.hs b/src/Simplex/Messaging/Server.hs index a09759814..d62e79987 100644 --- a/src/Simplex/Messaging/Server.hs +++ b/src/Simplex/Messaging/Server.hs @@ -128,7 +128,7 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do raceAny_ ( serverThread s "server subscribedQ" subscribedQ subscribers subscriptions cancelSub : serverThread s "server ntfSubscribedQ" ntfSubscribedQ Env.notifiers ntfSubscriptions (\_ -> pure ()) - : map runServer transports <> expireMessagesThread_ cfg <> serverStatsThread_ cfg <> controlPortThread_ cfg + : map runServer transports <> expireMessagesThread_ cfg <> serverStatsThread_ cfg <> rateStatsThread_ cfg <> controlPortThread_ cfg ) `finally` withLock' (savingLock s) "final" (saveServer False) where @@ -205,6 +205,13 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do [logServerStats logStatsStartTime interval serverStatsLogFile] serverStatsThread_ _ = [] + rateStatsThread_ :: ServerConfig -> [M ()] + rateStatsThread_ ServerConfig {rateStatsInterval = Just bucketWidth, logStatsInterval = Just logInterval, logStatsStartTime, rateStatsLogFile} = + [ monitorServerRates bucketWidth, -- roll windows, collect counters, runs at a faster rate so the measurements can be used for online anomaly detection + logServerRates logStatsStartTime logInterval rateStatsLogFile -- log distributions once in a while + ] + rateStatsThread_ _ = [] + logServerStats :: Int64 -> Int64 -> FilePath -> M () logServerStats startAt logInterval statsFilePath = do labelMyThread "logServerStats" @@ -257,6 +264,25 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do ] liftIO $ threadDelay' interval + monitorServerRates :: Int64 -> M () + monitorServerRates bucketWidth = do + labelMyThread "monitorServerRates" + forever $ do + -- TODO: calculate delay for the next bucket closing time + liftIO $ threadDelay' bucketWidth + -- TODO: collect and reset buckets + + logServerRates :: Int64 -> Int64 -> FilePath -> M () + logServerRates startAt logInterval statsFilePath = do + labelMyThread "logServerStats" + initialDelay <- (startAt -) . fromIntegral . (`div` 1000000_000000) . diffTimeToPicoseconds . utctDayTime <$> liftIO getCurrentTime + liftIO $ putStrLn $ "server stats log enabled: " <> statsFilePath + liftIO $ threadDelay' $ 1000000 * (initialDelay + if initialDelay < 0 then 86400 else 0) + let interval = 1000000 * logInterval + forever $ do + -- write the thing + liftIO $ threadDelay' interval + runClient :: Transport c => C.APrivateSignKey -> TProxy c -> c -> M () runClient signKey tp h = do kh <- asks serverIdentity @@ -411,13 +437,13 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do hPutStrLn h "AUTH" runClientTransport :: Transport c => THandleSMP c 'TServer -> M () -runClientTransport h@THandle {params = THandleParams {thVersion, sessionId}} = do +runClientTransport h@THandle {connection, params = THandleParams {thVersion, sessionId}} = do q <- asks $ tbqSize . config ts <- liftIO getSystemTime active <- asks clients nextClientId <- asks clientSeq c <- atomically $ do - new@Client {clientId} <- newClient nextClientId q thVersion sessionId ts + new@Client {clientId} <- newClient (getPeerId connection) nextClientId q thVersion sessionId ts modifyTVar' active $ IM.insert clientId new pure new s <- asks server @@ -643,6 +669,10 @@ client clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ, sessionId} Serv where createQueue :: QueueStore -> RcvPublicAuthKey -> RcvPublicDhKey -> SubscriptionMode -> M (Transmission BrokerMsg) createQueue st recipientKey dhKey subMode = time "NEW" $ do + -- TODO: read client Q rate + -- TODO: read server Q rate for peerId + -- TODO: read global server Q rate + -- TODO: add throttling delay/blackhole request if needed (rcvPublicDhKey, privDhKey) <- atomically . C.generateKeyPair =<< asks random let rcvDhSecret = C.dh' dhKey privDhKey qik (rcvId, sndId) = QIK {rcvId, sndId, rcvPublicDhKey} @@ -673,6 +703,9 @@ client clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ, sessionId} Serv stats <- asks serverStats atomically $ modifyTVar' (qCreated stats) (+ 1) atomically $ modifyTVar' (qCount stats) (+ 1) + -- TODO: increment client Q counter + -- TODO: increment current Q counter in IP timeline + -- TODO: increment current Q counter in server timeline case subMode of SMOnlyCreate -> pure () SMSubscribe -> void $ subscribeQueue qr rId @@ -835,6 +868,10 @@ client clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ, sessionId} Serv case C.maxLenBS msgBody of Left _ -> pure $ err LARGE_MSG Right body -> do + -- TODO: read client S rate + -- TODO: read server S rate for peerId + -- TODO: read global server S rate + -- TODO: add throttling delay/blackhole request if needed msg_ <- time "SEND" $ do q <- getStoreMsgQueue "SEND" $ recipientId qr expireMessages q @@ -850,6 +887,9 @@ client clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ, sessionId} Serv atomically $ modifyTVar' (msgSent stats) (+ 1) atomically $ modifyTVar' (msgCount stats) (+ 1) atomically $ updatePeriodStats (activeQueues stats) (recipientId qr) + -- TODO: increment client S counter + -- TODO: increment current S counter in IP timeline + -- TODO: increment current S counter in server timeline pure ok where mkMessage :: C.MaxLenBS MaxMessageLen -> M Message diff --git a/src/Simplex/Messaging/Server/Env/STM.hs b/src/Simplex/Messaging/Server/Env/STM.hs index bd8262f07..74017346c 100644 --- a/src/Simplex/Messaging/Server/Env/STM.hs +++ b/src/Simplex/Messaging/Server/Env/STM.hs @@ -16,6 +16,7 @@ import Data.List.NonEmpty (NonEmpty) import Data.Map.Strict (Map) import qualified Data.Map.Strict as M import Data.Time.Clock (getCurrentTime) +import Data.Time.Clock.POSIX (getPOSIXTime) import Data.Time.Clock.System (SystemTime) import Data.X509.Validation (Fingerprint (..)) import Network.Socket (ServiceName) @@ -33,7 +34,7 @@ import Simplex.Messaging.Server.Stats import Simplex.Messaging.Server.StoreLog import Simplex.Messaging.TMap (TMap) import qualified Simplex.Messaging.TMap as TM -import Simplex.Messaging.Transport (ATransport, VersionRangeSMP, VersionSMP) +import Simplex.Messaging.Transport (ATransport, PeerId, VersionRangeSMP, VersionSMP) import Simplex.Messaging.Transport.Server (SocketState, TransportServerConfig, alpn, loadFingerprint, loadTLSServerParams, newSocketState) import System.IO (IOMode (..)) import System.Mem.Weak (Weak) @@ -70,6 +71,10 @@ data ServerConfig = ServerConfig serverStatsLogFile :: FilePath, -- | file to save and restore stats serverStatsBackupFile :: Maybe FilePath, + -- | rate limit monitoring interval / bucket width, seconds + rateStatsInterval :: Maybe Int64, + rateStatsLogFile :: FilePath, + rateStatsBackupFile :: Maybe FilePath, -- | CA certificate private key is not needed for initialization caCertificateFile :: FilePath, privateKeyFile :: FilePath, @@ -109,6 +114,8 @@ data Env = Env storeLog :: Maybe (StoreLog 'WriteMode), tlsServerParams :: T.ServerParams, serverStats :: ServerStats, + qCreatedByIp :: Timeline, + msgSentByIp :: Timeline, sockets :: SocketState, clientSeq :: TVar Int, clients :: TVar (IntMap Client) @@ -124,6 +131,8 @@ data Server = Server data Client = Client { clientId :: Int, + peerId :: PeerId, -- send updates for this Id to time series + clientStats :: ClientStats, -- capture final values on disconnect subscriptions :: TMap RecipientId (TVar Sub), ntfSubscriptions :: TMap NotifierId (), rcvQ :: TBQueue (NonEmpty (Maybe QueueRec, Transmission Cmd)), @@ -155,8 +164,8 @@ newServer = do savingLock <- createLock return Server {subscribedQ, subscribers, ntfSubscribedQ, notifiers, savingLock} -newClient :: TVar Int -> Natural -> VersionSMP -> ByteString -> SystemTime -> STM Client -newClient nextClientId qSize thVersion sessionId createdAt = do +newClient :: PeerId -> TVar Int -> Natural -> VersionSMP -> ByteString -> SystemTime -> STM Client +newClient peerId nextClientId qSize thVersion sessionId createdAt = do clientId <- stateTVar nextClientId $ \next -> (next, next + 1) subscriptions <- TM.empty ntfSubscriptions <- TM.empty @@ -168,7 +177,8 @@ newClient nextClientId qSize thVersion sessionId createdAt = do connected <- newTVar True rcvActiveAt <- newTVar createdAt sndActiveAt <- newTVar createdAt - return Client {clientId, subscriptions, ntfSubscriptions, rcvQ, sndQ, msgQ, endThreads, endThreadSeq, thVersion, sessionId, connected, createdAt, rcvActiveAt, sndActiveAt} + clientStats <- ClientStats <$> newTVar 0 <*> newTVar 0 + return Client {clientId, subscriptions, ntfSubscriptions, rcvQ, sndQ, msgQ, endThreads, endThreadSeq, thVersion, sessionId, connected, createdAt, rcvActiveAt, sndActiveAt, peerId, clientStats} newSubscription :: SubscriptionThread -> STM Sub newSubscription subThread = do @@ -189,7 +199,10 @@ newEnv config@ServerConfig {caCertificateFile, certificateFile, privateKeyFile, sockets <- atomically newSocketState clientSeq <- newTVarIO 0 clients <- newTVarIO mempty - return Env {config, server, serverIdentity, queueStore, msgStore, random, storeLog, tlsServerParams, serverStats, sockets, clientSeq, clients} + now <- getPOSIXTime + qCreatedByIp <- atomically $ newTimeline perMinute now + msgSentByIp <- atomically $ newTimeline perMinute now + return Env {config, server, serverIdentity, queueStore, msgStore, random, storeLog, tlsServerParams, serverStats, qCreatedByIp, msgSentByIp, sockets, clientSeq, clients} where restoreQueues :: QueueStore -> FilePath -> IO (StoreLog 'WriteMode) restoreQueues QueueStore {queues, senders, notifiers} f = do diff --git a/src/Simplex/Messaging/Server/Main.hs b/src/Simplex/Messaging/Server/Main.hs index d75d02812..6b6d555f3 100644 --- a/src/Simplex/Messaging/Server/Main.hs +++ b/src/Simplex/Messaging/Server/Main.hs @@ -208,6 +208,9 @@ smpServerCLI cfgPath logPath = logStatsStartTime = 0, -- seconds from 00:00 UTC serverStatsLogFile = combine logPath "smp-server-stats.daily.log", serverStatsBackupFile = logStats $> combine logPath "smp-server-stats.log", + rateStatsInterval = Just 60, -- TODO: add to options + rateStatsLogFile = combine logPath "smp-server-rates.daily.log", + rateStatsBackupFile = Just $ combine logPath "smp-server-rates.log", smpServerVRange = supportedServerSMPRelayVRange, transportConfig = defaultTransportServerConfig diff --git a/src/Simplex/Messaging/Server/Stats.hs b/src/Simplex/Messaging/Server/Stats.hs index 0b4c677c2..7a80715b3 100644 --- a/src/Simplex/Messaging/Server/Stats.hs +++ b/src/Simplex/Messaging/Server/Stats.hs @@ -10,11 +10,18 @@ module Simplex.Messaging.Server.Stats where import Control.Applicative (optional, (<|>)) import qualified Data.Attoparsec.ByteString.Char8 as A import qualified Data.ByteString.Char8 as B +import Data.IntMap (IntMap) +import qualified Data.IntMap.Strict as IM +import Data.IntPSQ (IntPSQ) +import qualified Data.IntPSQ as IP +import Data.Monoid (getSum) import Data.Set (Set) import qualified Data.Set as S import Data.Time.Calendar.Month (pattern MonthDay) import Data.Time.Calendar.OrdinalDate (mondayStartWeek) -import Data.Time.Clock (UTCTime (..)) +import Data.Time.Clock (NominalDiffTime, UTCTime (..)) +import Data.Time.Clock.POSIX (POSIXTime) +import Data.Word (Word32) import Simplex.Messaging.Encoding.String import Simplex.Messaging.Protocol (RecipientId) import UnliftIO.STM @@ -231,3 +238,75 @@ updatePeriodStats stats pId = do updatePeriod month where updatePeriod pSel = modifyTVar' (pSel stats) (S.insert pId) + +data ClientStats = ClientStats + { qCreatedClient :: TVar Int, + msgSentClient :: TVar Int + } + +-- may be combined with session duration to produce average rates (q/s, msg/s) +data ClientStatsData = ClientStatsData + { _qCreatedClient :: Int, + _msgSentClient :: Int + } + +type Timeline = (TVar SparseSeries, Current) + +newTimeline :: QuantFun -> POSIXTime -> STM Timeline +newTimeline quantF now = (,current) <$> newTVar IP.empty + where + current :: Current + current = (quantF, quantF now, mempty) + +-- Sparse timeseries with 1 second resolution (or more coarse): +-- priotity - time/bucket +-- key -- PeerId +-- value -- final counter value of the bucket that was current +-- May be combined with bucket width to produce rolling rates. +type SparseSeries = IntPSQ BucketId Int + +-- POSIXTime, or quantized +type BucketId = Word32 + +type QuantFun = POSIXTime -> BucketId + +-- Current bucket that gets filled +type Current = (QuantFun, BucketId, IntMap (TVar Int)) + +perSecond :: POSIXTime -> BucketId +perSecond = truncate + +perMinute :: POSIXTime -> BucketId +perMinute = (60 `secondsWidth`) + +secondsWidth :: NominalDiffTime -> POSIXTime -> BucketId +secondsWidth w t = truncate $ t / w + +finishCurrent :: POSIXTime -> Timeline -> STM Timeline +finishCurrent now (series, current) = error "TODO: read/reset current, push into series, evict minimal when it falls out of scope" + +type WindowData = IntMap Int -- PeerId -> counter + +window :: BucketId -> BucketId -> SparseSeries -> WindowData +window = error "TODO: pick elements inside the range and drop bucket ids" + +-- counter -> occurences +type Histogram = IntMap Int + +histogram :: WindowData -> Histogram +histogram = fmap getSum . IM.fromListWith (<>) . map (,1) . IM.elems + +distribution :: Histogram -> Distribution Int +distribution = error "TODO: unroll histogram, sample elements at percentiles" + +data Distribution a = Distribution + { minimal :: a, + bottom50p :: a, + top50p :: a, + top20p :: a, + top10p :: a, + top5p :: a, + top1p :: a, + maximal :: a + } + deriving (Show) diff --git a/src/Simplex/Messaging/Transport.hs b/src/Simplex/Messaging/Transport.hs index e1d383b5a..90c19ab17 100644 --- a/src/Simplex/Messaging/Transport.hs +++ b/src/Simplex/Messaging/Transport.hs @@ -54,6 +54,9 @@ module Simplex.Messaging.Transport ATransport (..), TransportPeer (..), getServerVerifyKey, + PeerId, + clientPeerId, + addrPeerId, -- * TLS Transport TLS (..), @@ -95,12 +98,14 @@ import qualified Data.ByteString.Char8 as B import qualified Data.ByteString.Lazy.Char8 as LB import Data.Default (def) import Data.Functor (($>)) +import Data.Hashable (hash) import Data.Version (showVersion) import Data.Word (Word16) import qualified Data.X509 as X import qualified Data.X509.Validation as XV import GHC.IO.Handle.Internals (ioe_EOF) import Network.Socket +import qualified Network.Socket.Address as SA import qualified Network.TLS as T import qualified Network.TLS.Extra as TE import qualified Paths_simplexmq as SMQ @@ -196,13 +201,15 @@ class Transport c where transportConfig :: c -> TransportConfig -- | Upgrade server TLS context to connection (used in the server) - getServerConnection :: TransportConfig -> X.CertificateChain -> T.Context -> IO c + getServerConnection :: PeerId -> TransportConfig -> X.CertificateChain -> T.Context -> IO c -- | Upgrade client TLS context to connection (used in the client) getClientConnection :: TransportConfig -> X.CertificateChain -> T.Context -> IO c getServerCerts :: c -> X.CertificateChain + getPeerId :: c -> PeerId + -- | tls-unique channel binding per RFC5929 tlsUnique :: c -> SessionId @@ -243,6 +250,7 @@ getServerVerifyKey c = data TLS = TLS { tlsContext :: T.Context, tlsPeer :: TransportPeer, + tlsPeerId :: PeerId, tlsUniq :: ByteString, tlsBuffer :: TBuffer, tlsALPN :: Maybe ALPN, @@ -261,13 +269,13 @@ connectTLS host_ TransportConfig {logTLSErrors} params sock = logThrow e = putStrLn ("TLS error" <> host <> ": " <> show e) >> E.throwIO e host = maybe "" (\h -> " (" <> h <> ")") host_ -getTLS :: TransportPeer -> TransportConfig -> X.CertificateChain -> T.Context -> IO TLS -getTLS tlsPeer cfg tlsServerCerts cxt = withTlsUnique tlsPeer cxt newTLS +getTLS :: TransportPeer -> PeerId -> TransportConfig -> X.CertificateChain -> T.Context -> IO TLS +getTLS tlsPeer tlsPeerId cfg tlsServerCerts cxt = withTlsUnique tlsPeer cxt newTLS where newTLS tlsUniq = do tlsBuffer <- atomically newTBuffer tlsALPN <- T.getNegotiatedProtocol cxt - pure TLS {tlsContext = cxt, tlsALPN, tlsTransportConfig = cfg, tlsServerCerts, tlsPeer, tlsUniq, tlsBuffer} + pure TLS {tlsContext = cxt, tlsPeerId, tlsALPN, tlsTransportConfig = cfg, tlsServerCerts, tlsPeer, tlsUniq, tlsBuffer} withTlsUnique :: TransportPeer -> T.Context -> (ByteString -> IO c) -> IO c withTlsUnique peer cxt f = @@ -301,7 +309,8 @@ instance Transport TLS where transportPeer = tlsPeer transportConfig = tlsTransportConfig getServerConnection = getTLS TServer - getClientConnection = getTLS TClient + getClientConnection = getTLS TClient 0 + getPeerId = tlsPeerId getServerCerts = tlsServerCerts getSessionALPN = tlsALPN tlsUnique = tlsUniq @@ -545,6 +554,23 @@ smpTHandle c = THandle {connection = c, params} where params = THandleParams {sessionId = tlsUnique c, blockSize = smpBlockSize, thVersion = VersionSMP 0, thAuth = Nothing, implySessId = False, batch = True} +-- | Stats key, hashed from IPs, circuits etc. We don't want to keep actual identities, just attach counters to them. +type PeerId = Int -- XXX: perhaps more fields needed if we want subnet escalation + +clientPeerId :: Socket -> IO PeerId +clientPeerId = fmap addrPeerId . SA.getPeerName + +addrPeerId :: SockAddr -> PeerId +addrPeerId peer = hash peer6 -- XXX: for extra paranoia can be salted with a seed randomized on server start + where + -- ingore ports and fluff, normalize to ipv6 address space + -- most of the ipv6 is unused as clients get /64 subnets for a few IPs, so 128bit to 64bit hashing is ok for using as intmap keys + peer6 = case peer of + SockAddrInet _port a -> embed4in6 a + SockAddrInet6 _port _flow a _scope -> a + SockAddrUnix _path -> error "use for TOR circuits?" + embed4in6 v4 = (0, 0, 0xFFFF, v4) -- RFC4038: the IPv6 address ::FFFF:x.y.z.w represents the IPv4 address x.y.z.w. + $(J.deriveJSON (sumTypeJSON id) ''HandshakeError) $(J.deriveJSON (sumTypeJSON $ dropPrefix "TE") ''TransportError) diff --git a/src/Simplex/Messaging/Transport/Server.hs b/src/Simplex/Messaging/Transport/Server.hs index 145b438e0..744664c80 100644 --- a/src/Simplex/Messaging/Transport/Server.hs +++ b/src/Simplex/Messaging/Transport/Server.hs @@ -19,7 +19,7 @@ module Simplex.Messaging.Transport.Server loadTLSServerParams, loadFingerprint, smpServerHandshake, - tlsServerCredentials + tlsServerCredentials, ) where @@ -95,8 +95,9 @@ runTransportServerSocketState ss started getSocket threadLabel serverParams cfg tCfg = serverTransportConfig cfg setup conn = timeout (tlsSetupTimeout cfg) $ do labelMyThread $ threadLabel <> "/setup" + peerId <- clientPeerId conn tls <- connectTLS Nothing tCfg serverParams conn - getServerConnection tCfg (fst $ tlsServerCredentials serverParams) tls + getServerConnection peerId tCfg (fst $ tlsServerCredentials serverParams) tls tlsServerCredentials :: T.ServerParams -> (X.CertificateChain, X.PrivKey) tlsServerCredentials serverParams = case T.sharedCredentials $ T.serverShared serverParams of diff --git a/src/Simplex/Messaging/Transport/WebSockets.hs b/src/Simplex/Messaging/Transport/WebSockets.hs index 0883fcc28..817cc3796 100644 --- a/src/Simplex/Messaging/Transport/WebSockets.hs +++ b/src/Simplex/Messaging/Transport/WebSockets.hs @@ -20,6 +20,7 @@ import Simplex.Messaging.Transport TransportConfig (..), TransportError (..), TransportPeer (..), + PeerId, closeTLS, smpBlockSize, withTlsUnique, @@ -28,6 +29,7 @@ import Simplex.Messaging.Transport.Buffer (trimCR) data WS = WS { wsPeer :: TransportPeer, + wsPeerId :: PeerId, tlsUniq :: ByteString, wsALPN :: Maybe ALPN, wsStream :: Stream, @@ -54,11 +56,14 @@ instance Transport WS where transportConfig :: WS -> TransportConfig transportConfig = wsTransportConfig - getServerConnection :: TransportConfig -> X.CertificateChain -> T.Context -> IO WS + getServerConnection :: PeerId -> TransportConfig -> X.CertificateChain -> T.Context -> IO WS getServerConnection = getWS TServer getClientConnection :: TransportConfig -> X.CertificateChain -> T.Context -> IO WS - getClientConnection = getWS TClient + getClientConnection = getWS TClient 0 + + getPeerId :: WS -> PeerId + getPeerId = wsPeerId getServerCerts :: WS -> X.CertificateChain getServerCerts = wsServerCerts @@ -89,14 +94,14 @@ instance Transport WS where then E.throwIO TEBadBlock else pure $ B.init s -getWS :: TransportPeer -> TransportConfig -> X.CertificateChain -> T.Context -> IO WS -getWS wsPeer cfg wsServerCerts cxt = withTlsUnique wsPeer cxt connectWS +getWS :: TransportPeer -> PeerId -> TransportConfig -> X.CertificateChain -> T.Context -> IO WS +getWS wsPeer wsPeerId cfg wsServerCerts cxt = withTlsUnique wsPeer cxt connectWS where connectWS tlsUniq = do s <- makeTLSContextStream cxt wsConnection <- connectPeer wsPeer s wsALPN <- T.getNegotiatedProtocol cxt - pure $ WS {wsPeer, tlsUniq, wsALPN, wsStream = s, wsConnection, wsTransportConfig = cfg, wsServerCerts} + pure $ WS {wsPeer, wsPeerId, tlsUniq, wsALPN, wsStream = s, wsConnection, wsTransportConfig = cfg, wsServerCerts} connectPeer :: TransportPeer -> Stream -> IO Connection connectPeer TServer = acceptClientRequest connectPeer TClient = sendClientRequest diff --git a/tests/SMPClient.hs b/tests/SMPClient.hs index ae9baeb3c..7cb00de83 100644 --- a/tests/SMPClient.hs +++ b/tests/SMPClient.hs @@ -107,6 +107,9 @@ cfg = logStatsStartTime = 0, serverStatsLogFile = "tests/smp-server-stats.daily.log", serverStatsBackupFile = Nothing, + rateStatsInterval = Nothing, + rateStatsLogFile = "", + rateStatsBackupFile = Nothing, caCertificateFile = "tests/fixtures/ca.crt", privateKeyFile = "tests/fixtures/server.key", certificateFile = "tests/fixtures/server.crt",