diff --git a/modules/agent/cfg.example.json b/modules/agent/cfg.example.json index 215591940..812b9b346 100644 --- a/modules/agent/cfg.example.json +++ b/modules/agent/cfg.example.json @@ -1,7 +1,10 @@ { + "agent_mem_limit": 200, + "agent_mem_ctrl": false, "debug": true, "hostname": "", "ip": "", + "batch": 2000, "plugin": { "enabled": false, "dir": "./plugin", diff --git a/modules/agent/funcs/agentmonitor.go b/modules/agent/funcs/agentmonitor.go new file mode 100644 index 000000000..914b7d504 --- /dev/null +++ b/modules/agent/funcs/agentmonitor.go @@ -0,0 +1,75 @@ +package funcs + +import ( + "bufio" + "bytes" + "fmt" + "io/ioutil" + "os" + "path/filepath" + "strconv" + "strings" + + "github.com/open-falcon/falcon-plus/g" + "github.com/toolkits/file" +) + +var ( + memLimit int = 200 // agent最大可以使用的内存,单位MB + cgroupRoot string = "/sys/fs/cgroup/memory/falcon-agent" +) + +const ( + procsFile = "cgroup.procs" + memStat = "memory.stat" + mb = 1024 * 1024 +) + +// InitCgroup init falcon-agent +func InitCgroup() { + pid := g.Pid("agent") + _ = os.RemoveAll(cgroupRoot) + // create falcon-agent cgroup dir + err := os.Mkdir(cgroupRoot, 751) + if err != nil { + fmt.Println("falcon-agent cgroup init failed", err) + return + } + // set memory limit + pPath := filepath.Join(cgroupRoot, procsFile) + err = ioutil.WriteFile(pPath, []byte(fmt.Sprintf("%s", pid)), 644) + if err != nil { + fmt.Println("falcon-agent cgroup write cgroup.procs failed", err) + return + } +} + +// GetAgentMem get agent memory info +func GetAgentMem() (int64, error) { + filePath := filepath.Join(cgroupRoot, memStat) + contents, err := ioutil.ReadFile(filePath) + if err != nil { + fmt.Printf("error: %v", err) + return 0, err + } + + reader := bufio.NewReader(bytes.NewBuffer(contents)) + var agentRSS int64 + for { + info, err := file.ReadLine(reader) + if err != nil { + return agentRSS, err + } + fields := strings.Fields(string(info)) + if len(fields) < 2 || fields[0] != "rss" { + continue + } + val, numErr := strconv.ParseInt(fields[1], 10, 64) + if numErr != nil { + continue + } + agentRSS = val / mb + break + } + return agentRSS, nil +} diff --git a/modules/agent/funcs/agentmonitor_test.go b/modules/agent/funcs/agentmonitor_test.go new file mode 100644 index 000000000..479eed68b --- /dev/null +++ b/modules/agent/funcs/agentmonitor_test.go @@ -0,0 +1,25 @@ +package funcs + +import ( + "os" + "testing" +) + +func TestGetAgentMem(t *testing.T) { + _ = os.RemoveAll(cgroupRoot) + // create falcon-agent cgroup dir + err := os.Mkdir(cgroupRoot, 751) + if err != nil { + t.Error(err) + _ = os.RemoveAll(cgroupRoot) + return + } + _, err = GetAgentMem() + if err != nil { + t.Error(err) + _ = os.RemoveAll(cgroupRoot) + return + } + _ = os.RemoveAll(cgroupRoot) + return +} diff --git a/modules/agent/g/cfg.go b/modules/agent/g/cfg.go index 1b0823ddb..9ef4921a4 100644 --- a/modules/agent/g/cfg.go +++ b/modules/agent/g/cfg.go @@ -18,6 +18,8 @@ import ( "encoding/json" "log" "os" + "strconv" + "strings" "sync" "github.com/toolkits/file" @@ -56,9 +58,12 @@ type CollectorConfig struct { } type GlobalConfig struct { + AgentMemLimit uint64 `json:"agent_mem_limit"` + AgentMemCtrl bool `json:"agent_mem_ctrl"` Debug bool `json:"debug"` Hostname string `json:"hostname"` IP string `json:"ip"` + Batch int `json:"batch,omitempty"` Plugin *PluginConfig `json:"plugin"` Heartbeat *HeartbeatConfig `json:"heartbeat"` Transfer *TransferConfig `json:"transfer"` @@ -134,6 +139,40 @@ func ParseConfig(cfg string) { log.Fatalln("parse config file:", cfg, "fail:", err) } + memCtrl := os.Getenv("AGENT_MEM_CTRL") + if memCtrl != "" { + if strings.ToLower(memCtrl) == "true" { + c.AgentMemCtrl = true + } else { + c.AgentMemCtrl = false + } + log.Println("set AgentMemCtrl:", c.AgentMemCtrl, "from env") + } + + transferAddr := os.Getenv("TRANSFER_URL") + if transferAddr != "" { + c.Transfer.Addrs = strings.Split(transferAddr, ",") + log.Println("set transfer url: " + transferAddr + " from env") + } + + heartbeatURL := os.Getenv("HEARTBEAT_URL") + if len(heartbeatURL) != 0 { + c.Heartbeat.Addr = heartbeatURL + log.Println("set heartbeat URL: " + heartbeatURL + " from env") + } + + limitBatch, err := strconv.Atoi(os.Getenv("LIMIT_BATCH")) + if err != nil { + log.Println("invalid limit Batch: ", limitBatch) + } else { + c.Batch = limitBatch + log.Println("set limit Batch: ", limitBatch, "from env") + } + if c.Batch <= 0 { + c.Batch = 2000 + log.Println("set batch default size: ", c.Batch) + } + lock.Lock() defer lock.Unlock() diff --git a/modules/agent/http/push.go b/modules/agent/http/push.go index 3af9a13b3..8b89d3c7c 100644 --- a/modules/agent/http/push.go +++ b/modules/agent/http/push.go @@ -16,27 +16,58 @@ package http import ( "encoding/json" + "log" + "net/http" + "strconv" + "sync" + "github.com/open-falcon/falcon-plus/common/model" + "github.com/open-falcon/falcon-plus/modules/agent/funcs" "github.com/open-falcon/falcon-plus/modules/agent/g" - "net/http" ) +var once sync.Once + func configPushRoutes() { http.HandleFunc("/v1/push", func(w http.ResponseWriter, req *http.Request) { if req.ContentLength == 0 { http.Error(w, "body is blank", http.StatusBadRequest) return } - + if g.Config().AgentMemCtrl == true { + ok := isHandReq(w) + if !ok { + return + } + } decoder := json.NewDecoder(req.Body) var metrics []*model.MetricValue err := decoder.Decode(&metrics) if err != nil { - http.Error(w, "connot decode body", http.StatusBadRequest) + http.Error(w, "cannot decode body", http.StatusBadRequest) + return + } + if len(metrics) > g.Config().Batch { + g.SendToTransfer(metrics[:g.Config().Batch]) + http.Error(w, "post Metric too Big !!! have sent max Batch: "+strconv.Itoa(g.Config().Batch), http.StatusBadRequest) return } - g.SendToTransfer(metrics) w.Write([]byte("success")) }) } + +func isHandReq(w http.ResponseWriter) bool { + once.Do(funcs.InitCgroup) + memUsed, err := funcs.GetAgentMem() + if err != nil { + RenderMsgJson(w, err.Error()) + return false + } + if uint64(memUsed) > g.Config().AgentMemLimit { + log.Printf("memory consumption has exceeded the threshold") + http.Error(w, "memory consumption has exceeded the threshold", http.StatusBadRequest) + return false + } + return true +}