Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Limit the memory size of falcon-agent and the number of metric pushed #912

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions modules/agent/cfg.example.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
{
"agent_mem_limit": 200,
"agent_mem_ctrl": false,
"debug": true,
"hostname": "",
"ip": "",
"batch": 2000,
"plugin": {
"enabled": false,
"dir": "./plugin",
Expand Down
132 changes: 132 additions & 0 deletions modules/agent/funcs/agentmeminfo.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
package funcs

import (
"bufio"
"bytes"
"errors"
"fmt"
"github.com/toolkits/file"
"io"
"io/ioutil"
"log"
"os"
"regexp"
"strconv"
"strings"
)

const Multi uint64 = 1024

// ProcessInfo 定义进程信息
type ProcessInfo struct {
name string
pid int
ppid int
state string
}

// agentInfo falcon-agent进程信息
var agentInfo *ProcessInfo

func init() {
agentInfo = getProcInfo("falcon-agent")
if agentInfo == nil {
log.Println("not exist falcon-agent proc")
}
}

func AgentMemInfo() (uint64, error) {
if agentInfo == nil {
agentInfo = getProcInfo("falcon-agent")
if agentInfo==nil {
return 0, errors.New("not exist falcon-agent proc")
}
}
pid := agentInfo.pid
contents, err := ioutil.ReadFile("/proc" + strconv.Itoa(pid) + "status")
if err != nil {
log.Printf("error: %v", err)
return 0, err
}

reader := bufio.NewReader(bytes.NewBuffer(contents))
var agentVmRSS uint64
for {
info, err := file.ReadLine(reader)
if err != nil {
break
}
fields := strings.Fields(string(info))
if len(fields) < 2 || fields[0] != "VmRSS" {
continue
}
val, numErr := strconv.ParseUint(fields[1], 10, 64)
if numErr != nil {
continue
}
agentVmRSS = val / Multi
}
if err != io.EOF {
return 0, err
}
return agentVmRSS, nil
}

func getProcInfo(procName string) *ProcessInfo {
dirs, err := os.Open("/proc")
if err != nil {
log.Println(err)
return nil
}
defer dirs.Close()
for {
fileList, err := dirs.Readdir(10)
if err != nil {
log.Println(err)
return nil
}
for _, fi := range fileList {
if !fi.IsDir() {
continue
}
name := fi.Name()
if ok, err := regexp.MatchString(`^[0-9]+$`, name); err != nil || !ok {
continue
}
pid, err := strconv.ParseInt(name, 10, 0)
if err != nil {
continue
}
procData, err := ioutil.ReadFile(fmt.Sprintf("/proc/%d/stat", pid))
if err != nil {
continue
}
procInfo := matchProc(procData, procName)
if procInfo == nil {
continue
}
procInfo.pid=int(pid)
return procInfo
}
}
}

// matchProc 获得相应进程名称的pid的信息
func matchProc(procData []byte, procName string) *ProcessInfo {
var p = &ProcessInfo{
name:procName,
}
data := string(procData)
start := strings.IndexRune(data, '(') + 1
end := strings.IndexRune(data[start:], ')')
otherInfo := strings.Split(data[start+end+2:], " ")
if data[start:start+end] != procName || len(otherInfo) < 3 {
return nil
}
p.state = otherInfo[0]
ppid, err := strconv.Atoi(otherInfo[2])
if err == nil {
p.ppid = ppid
}
return p
}
11 changes: 11 additions & 0 deletions modules/agent/funcs/agentmeminfo_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package funcs

import "testing"

func TestMatchProc(t *testing.T){
data:=[]byte("1082 (bash) S 1081 1082 1082 8912896 -1 0 2704 2704 0 0 15 93 15 93 20 0 0 0 96121849 7487488 2521 345")
procInfo:=matchProc(data,"bash")
if procInfo==nil{
t.Error("matchProc function incorrect")
}
}
39 changes: 39 additions & 0 deletions modules/agent/g/cfg.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
"encoding/json"
"log"
"os"
"strconv"
"strings"
"sync"

"github.com/toolkits/file"
Expand Down Expand Up @@ -56,9 +58,12 @@ type CollectorConfig struct {
}

type GlobalConfig struct {
AgentMemLimit uint64 `json:"agent_mem_limit"`
AgentMemCtrl bool `json:"agent_mem_ctrl"`
Debug bool `json:"debug"`
Hostname string `json:"hostname"`
IP string `json:"ip"`
Batch int `json:"batch,omitempty"`
Plugin *PluginConfig `json:"plugin"`
Heartbeat *HeartbeatConfig `json:"heartbeat"`
Transfer *TransferConfig `json:"transfer"`
Expand Down Expand Up @@ -134,6 +139,40 @@ func ParseConfig(cfg string) {
log.Fatalln("parse config file:", cfg, "fail:", err)
}

memCtrl := os.Getenv("AGENT_MEM_CTRL")
if memCtrl != "" {
if strings.ToLower(memCtrl) == "true" {
c.AgentMemCtrl = true
} else {
c.AgentMemCtrl = false
}
log.Println("set AgentMemCtrl:", c.AgentMemCtrl, "from env")
}

transferAddr := os.Getenv("TRANSFER_URL")
if transferAddr != "" {
c.Transfer.Addrs = strings.Split(transferAddr, ",")
log.Println("set transfer url: " + transferAddr + " from env")
}

heartbeatURL := os.Getenv("HEARTBEAT_URL")
if len(heartbeatURL) != 0 {
c.Heartbeat.Addr = heartbeatURL
log.Println("set heartbeat URL: " + heartbeatURL + " from env")
}

limitBatch, err := strconv.Atoi(os.Getenv("LIMIT_BATCH"))
if err != nil {
log.Println("invalid limit Batch: ", limitBatch)
} else {
c.Batch = limitBatch
log.Println("set limit Batch: ", limitBatch, "from env")
}
if c.Batch <= 0 {
c.Batch = 2000
log.Println("set batch default size: ", c.Batch)
}

lock.Lock()
defer lock.Unlock()

Expand Down
32 changes: 29 additions & 3 deletions modules/agent/http/push.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,11 @@ package http
import (
"encoding/json"
"github.com/open-falcon/falcon-plus/common/model"
"github.com/open-falcon/falcon-plus/modules/agent/funcs"
"github.com/open-falcon/falcon-plus/modules/agent/g"
"log"
"net/http"
"strconv"
)

func configPushRoutes() {
Expand All @@ -27,16 +30,39 @@ func configPushRoutes() {
http.Error(w, "body is blank", http.StatusBadRequest)
return
}

if g.Config().AgentMemCtrl == true {
ok := isHandReq(w)
if !ok {
return
}
}
decoder := json.NewDecoder(req.Body)
var metrics []*model.MetricValue
err := decoder.Decode(&metrics)
if err != nil {
http.Error(w, "connot decode body", http.StatusBadRequest)
http.Error(w, "cannot decode body", http.StatusBadRequest)
return
}

if len(metrics) > g.Config().Batch {
apriltommy0525 marked this conversation as resolved.
Show resolved Hide resolved
http.Error(w, "cannot post Metric too Big !!! curr count: "+strconv.Itoa(len(metrics)), http.StatusBadRequest)
}
g.SendToTransfer(metrics)
w.Write([]byte("success"))
})
}

func isHandReq(w http.ResponseWriter) bool {
log.Printf("memory control start")
memUsed, err := funcs.AgentMemInfo()
if err != nil {
RenderMsgJson(w, err.Error())
return false
}
if memUsed > g.Config().AgentMemLimit {
log.Printf("memory consumption has exceeded the threshold")
http.Error(w, "memory consumption has exceeded the threshold", http.StatusBadRequest)
return false
}
return true
}