-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathHarlson.hs
347 lines (301 loc) · 12.6 KB
/
Harlson.hs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
module Harlson where
import Network.Socket
import Control.Applicative
import Control.Concurrent
import Control.DeepSeq
import Control.Exception
import Control.Monad
import Data.Time.Clock
import Data.Char
import Data.Maybe
import qualified Data.List as L
import qualified Data.Map.Strict as Map
import qualified Data.ByteString.Lazy as B
import qualified Data.ByteString.Lazy.Char8 as B8
import System.IO
import Text.PrettyPrint
import Text.Regex
import Options
import Mavg
import Server
import Query
import TelnetHandler
data MData = MData { mdCounter :: !Int
} deriving Show
data OverLimit = OverLimit { oValue :: !Int
, oThrottle :: !Int
} deriving Show
data Metric = Metric { mMavg :: !(MVar Mavg)
, mData :: !(MVar MData)
}
data Key = Key { kKey :: !B.ByteString
, kEndpoint :: !B.ByteString
} deriving (Show, Eq, Ord)
data LKey = LKey { lkLevel :: !B.ByteString
, lkEndpoint :: !B.ByteString
} deriving (Show, Eq, Ord)
type MetricsMap = Map.Map Key Metric
type OverLimitsMap = Map.Map Key OverLimit
type LimitsMap = Map.Map LKey Int
type LevelsMap = Map.Map B.ByteString B.ByteString
data MavgAcc = MavgAcc { maAcc :: !Int
, maMavgs :: ![Mavg]
} deriving Show
data Stats = Stats { statQueries :: !MavgAcc
, statMetrics :: !MavgAcc
} deriving Show
data YState = YState { sMetrics :: !(MVar MetricsMap)
, sOverLimits :: !(MVar OverLimitsMap) -- the only modifier is mavgUpdater
, sLimits :: !(MVar LimitsMap)
, sLevels :: !(MVar LevelsMap)
, sStats :: !(MVar Stats)
, sExit :: !(MVar Int)
}
runHarlson :: Options -> IO ()
runHarlson opts = do
mvMetrics <- newMVar Map.empty
mvOverLimits <- newMVar Map.empty
mvLimits <- newMVar Map.empty
mvLevels <- newMVar Map.empty
mvExit <- newEmptyMVar
mvStats <- initialStats
let ystate = YState mvMetrics mvOverLimits mvLimits mvLevels mvStats mvExit
let queryProcessor = processQuery opts ystate
forkIO $ mavgUpdater ystate
forkIO $ serveTCP (optPort opts) (handler queryProcessor)
forkIO $ serveTCP (optTelnetPort opts) $ handleTelnet $ runTelnetCmd ystate
case optMode opts of
ErlangPortMode -> do
forkIO $ waitPortClose mvExit
return ()
StandaloneMode ->
return ()
takeMVar mvExit
return ()
waitPortClose :: MVar Int -> IO ()
waitPortClose mvExit = do
hSetBuffering stdin NoBuffering
try getChar :: IO (Either IOError Char)
putMVar mvExit 0
force' :: (Show a) => a -> IO a
force' x = evaluate $ deepseq (show x) x
defaultLevel :: B.ByteString
defaultLevel = B8.pack "undefined"
defaultLimit :: Int
defaultLimit = 2000000000
mavgUpdater :: YState -> IO ()
mavgUpdater ystate = do
threadDelay 1000000
metrics <- readMVar (sMetrics ystate)
limits <- readMVar (sLimits ystate)
levels <- readMVar (sLevels ystate)
now <- getCurrentTime
let mvStats = sStats ystate
bumpStats now mvStats
modifyMVar_ mvStats force'
let ms = Map.toList metrics
forM_ ms $ \(k@(Key appkey ep), m) -> do
mavg <- readMVar (mMavg m)
MData c <- swapMVar (mData m) (MData 0)
let mavg' = bumpRate mavg now c
ra = rateAverage mavg'
lvl = Map.findWithDefault defaultLevel appkey levels
modifyMVar_ (sOverLimits ystate) (\overLimits -> do
let limit = Map.findWithDefault defaultLimit (LKey lvl ep) limits
if ra > limit
then do
let limit_l = fromIntegral limit :: Integer
ra_l = fromIntegral ra :: Integer
p = throttlePrecision * limit_l `div` ra_l
p_l = fromIntegral p
return $! Map.insert k (OverLimit ra p_l) overLimits
else return $! Map.delete k overLimits)
swapMVar (mMavg m) $! mavg'
mavgUpdater ystate
handler :: (Handle -> Query -> IO ()) -> SockAddr -> Handle -> IO ()
handler qp sa h = do
isEof <- hIsEOF h
if isEof
then hClose h
else readQuery h >>= qp h >> handler qp sa h
processQuery :: Options -> YState -> Handle -> Query -> IO ()
processQuery opts ystate h (UpdateMetricLevels qms) = do
let qlevels = map (\(QMetricLevel key _ level _) -> QLevel key level) qms
qmetrics = map (\(QMetricLevel key endp _ count) -> QMetric key endp count) qms
processQuery opts ystate h (UpdateLevels qlevels)
processQuery opts ystate h (UpdateMetrics qmetrics)
processQuery _opts ystate _h (UpdateLevels newlevels) = do
updateStats (sStats ystate) 1 0
let toInsert = [(key, level) | QLevel key level <- newlevels]
modifyMVar_ (sLevels ystate) $ \old ->
return $! L.foldl' (flip $ uncurry Map.insert) old toInsert
processQuery opts ystate _h (UpdateMetrics qms) = do
updateStats (sStats ystate) 1 (length qms)
let smooth = optSmoothing opts
metrics = sMetrics ystate
forM_ qms $ updateMetric smooth metrics
processQuery _opts ystate h GetOverLimit = do
updateStats (sStats ystate) 1 0
let mvOverLimits = sOverLimits ystate
overLimitsMap <- readMVar mvOverLimits
let os = Map.toList overLimitsMap
writeReply h $ ReplyOverLimit [ROverLimit k e (OverLimitAdded val thr) | (Key k e, OverLimit val thr) <- os]
processQuery _opts ystate _h (UpdateLimits qls) = do
updateStats (sStats ystate) 1 0
let mvLimits = sLimits ystate
modifyMVar_ mvLimits (\_oldLimits -> do
let limits = Map.fromList [(LKey lvl ep, lim) | (QLimit lvl ep lim) <- qls]
return limits)
processQuery _opts ystate h GetStats = do
formattedStats <- prepareStats ystate
writeReply h $ ReplyText formattedStats
processQuery _opts ystate _h Stop = putMVar (sExit ystate) 0
processQuery _opts _ystate h query =
writeReply h $ ReplyText $ "Unknown query: " ++ show query
updateMetric :: Double -> MVar MetricsMap -> QMetric -> IO ()
updateMetric smoothingWindow mvMetrics (QMetric key ep cnt) = do
let k = Key key ep
metric <- modifyMVar mvMetrics $ \metrics ->
case Map.lookup k metrics of
Just m -> return (metrics, m)
Nothing -> do
m <- newMetric smoothingWindow
return (Map.insert k m metrics, m)
modifyMVar_ (mData metric) $ \(MData c) -> return $! MData (c + cnt)
newMetric :: Double -> IO Metric
newMetric smoothingWindow = do
mavg <- mavgNewIO smoothingWindow
ma <- newMVar mavg
mc <- newMVar (MData 0)
return $ Metric ma mc
statsWindows :: [(Double, String)]
statsWindows = [(60.0, "min"), (300.0, "5min"), (3600.0, "hour"), (86400.0, "day")]
initialStats :: IO (MVar Stats)
initialStats = do
let mk = MavgAcc 0 <$> mapM (mavgNewIO . fst) statsWindows
connects <- mk
metrics <- mk
newMVar $ Stats connects metrics
updateStats :: MVar Stats -> Int -> Int -> IO ()
updateStats mvStats c m =
modifyMVar_ mvStats (\(Stats conns mtrs) -> do
let conns' = conns {maAcc = maAcc conns + c}
let mtrs' = mtrs {maAcc = maAcc mtrs + m}
return $ Stats conns' mtrs')
bumpStats :: UTCTime -> MVar Stats -> IO ()
bumpStats now mvStats = do
let bump' n = map (\m -> bumpRate m now n)
modifyMVar_ mvStats (\(Stats (MavgAcc c cs) (MavgAcc m ms)) -> do
let cs' = bump' c cs
let ms' = bump' m ms
return $ Stats (MavgAcc 0 cs') (MavgAcc 0 ms'))
-- Telnet commands
runTelnetCmd :: YState -> String -> IO String
runTelnetCmd ystate "s" = do
let get f = readMVar $ f ystate
(Stats (MavgAcc _ qs) (MavgAcc _ ms)) <- get sStats
let ds = [(nm, zip (map rateAverage ls) (map snd statsWindows))
| (nm, ls) <- [("Queries", qs), ("Metrics", ms)]]
let statsDoc = vcat [text nm <> colon <+> nest 4
(vcat [int v <+> text "per" <+> text s
| (v, s) <- ks])
| (nm, ks) <- ds]
mesize <- Map.size <$> get sMetrics
lisize <- Map.size <$> get sLimits
olsize <- Map.size <$> get sOverLimits
let lenDoc = text "Metrics:" <+> int mesize $$
text "Limits:" <+> int lisize $$
text "Over:" <+> int olsize
return $ render (statsDoc $$ lenDoc)
runTelnetCmd ystate "l" = do
let u = text . B8.unpack
lims <- Map.toAscList <$> readMVar (sLimits ystate)
let doc = vcat [u lev <> text "/" <> u ep <> colon <+> int v | (LKey lev ep, v) <- lims]
return $ render doc
runTelnetCmd ystate "k" = do
let u = text . B8.unpack
levels <- Map.toAscList <$> readMVar (sLevels ystate)
let doc = vcat [u key <+> text "->" <+> u level | (key, level) <- levels]
return $ render doc
runTelnetCmd ystate "showallmetrics" = do
lims <- Map.toAscList <$> readMVar (sMetrics ystate)
renderMetrics ystate lims
runTelnetCmd ystate ('g':' ':grep) = do
lims <- Map.toAscList <$> readMVar (sMetrics ystate)
let regex = mkRegexWithOpts grep False False
renderMetrics ystate $ filter
(isJust . matchRegex regex . B8.unpack . kKey . fst) lims
runTelnetCmd _ystate "help" = return listTelnetCmds
runTelnetCmd _ystate "h" = return listTelnetCmds
runTelnetCmd _ystate _cmd =
return "Unrecognized command"
listTelnetCmds :: String
listTelnetCmds = render $ vcat [text cmd <> text " -- " <> text desc | (cmd, desc) <- cmds]
where cmds =
[ ("s ", "Show quick stats")
, ("l ", "Show limits")
, ("k ", "Show key -> levels")
, ("showallmetrics", "List of all metrics")
, ("g <pattern> ", "List of grepped metrics")
, ("help (or h) ", "Display help") ]
renderMetrics :: YState -> [(Key, Metric)] -> IO String
renderMetrics ystate lims' = do
let u = text . B8.unpack
levels <- readMVar $ sLevels ystate
lims <- forM lims' $ \(Key key ep, Metric m _d) -> do
mavg <- readMVar m
let level = Map.findWithDefault defaultLevel key levels
return (key, ep, level, mavg)
let doc = vcat [u key <> text "/" <>
u ep <> text "/" <>
u lev <> colon <+>
int (rateAverage mavg)
| (key, ep, lev, mavg) <- lims]
return $ render doc
-- Preparing stats for Folsom/Riemann/Graphite
prepareStats :: YState -> IO String
prepareStats ystate = do
metricStats <- prepareMetricStats ystate
harlsonStats <- prepareHarlsonStats ystate
return $ render $ metricStats $$ harlsonStats
prepareMetricStats :: YState -> IO Doc
prepareMetricStats ystate = do
metrics <- Map.toList <$> readMVar (sMetrics ystate)
levels <- readMVar $ sLevels ystate
limits <- readMVar $ sLimits ystate
docs <- mapM (\(Key k ep, Metric mvMavg _) -> do
n <- rateAverage <$> readMVar mvMavg
let lvl = Map.findWithDefault defaultLevel k levels
lim = Map.findWithDefault defaultLimit (LKey lvl ep) limits
return $ vcat [constructMetricDoc k ep nm v
| (nm, v) <- [("metrics", n), ("limits", lim)]]
) metrics
return $ vcat docs
prepareHarlsonStats :: YState -> IO Doc
prepareHarlsonStats ystate = do
let get f = readMVar $ f ystate
stats <- get sStats
let (MavgAcc _ qs) = statQueries stats
let (MavgAcc _ ms) = statMetrics stats
let mkLineDoc name (v, k) = text "rls.stats." <> text name <>
text "." <> text k <> semi <>
int (rateAverage v)
let mkSectionDoc (cs, name) =
vcat $ map (mkLineDoc name) $ zip cs $ map snd statsWindows
let doc1 = vcat $ map mkSectionDoc [(qs, "connects"), (ms, "metrics")]
let sz x = Map.size <$> get x
let mkCurrentDoc (vM, k) = do
v <- vM
let doc = text "rls.stats.current." <> text k <> semi <> int v
return doc
doc2 <- vcat <$> mapM mkCurrentDoc [ (sz sMetrics, "metrics")
, (sz sLimits, "limits")
, (sz sOverLimits, "overlimits")]
return $ doc1 $$ doc2
constructMetricDoc :: B8.ByteString -> B8.ByteString -> String -> Int -> Doc
constructMetricDoc k ep name n =
t "rls." <> t name <> t "." <> u k <> t "." <> u ep <> semi <> int n where
t = text
u = text . safeString . B8.unpack
safeString = map r
r c = if isAlphaNum c then c else '_'