Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Limit the memory size of falcon-agent and the number of metric pushed #912

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
agent内存控制和metric推送数量限制
  • Loading branch information
Baiyecai committed Dec 3, 2020
commit 434c71864880bb31e94cbcf1efbb594675b39805
3 changes: 3 additions & 0 deletions modules/agent/cfg.example.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
{
"max_memory": 200,
"memoryctrl": false,
"debug": true,
"hostname": "",
"ip": "",
"batch": 2000,
"plugin": {
"enabled": false,
"dir": "./plugin",
155 changes: 155 additions & 0 deletions modules/agent/funcs/agentmeminfo.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
package funcs

import (
"bufio"
"bytes"
"errors"
"fmt"
"github.com/toolkits/file"
"io"
"io/ioutil"
"log"
"os"
"regexp"
"strconv"
"strings"
)

type Mem struct {
Buffers uint64
Cached uint64
MemTotal uint64
MemFree uint64
SwapTotal uint64
SwapUsed uint64
SwapFree uint64
VmRSS uint64
}

var Multi uint64 = 1024

var WANT = map[string]struct{}{
"Buffers": struct{}{},
"Cached": struct{}{},
"MemTotal": struct{}{},
"MemFree": struct{}{},
"SwapTotal": struct{}{},
"SwapUsed": struct{}{},
"SwapFree": struct{}{},
"VmRSS": struct{}{},
}

// ProcessInfo 定义进程信息
type ProcessInfo struct {
name string
pid int
ppid int
state string
}

// ProcessManager
type ProcessManager struct {
handler *os.File
}

func AgentMemInfo() (*Mem, error) {
proc := ProcessManager{}
pid, err := proc.CheckProc()
if err != nil {
return nil, err
}
var strs = strconv.Itoa(pid)
contents, err := ioutil.ReadFile("/proc" + strs + "status")
if err != nil {
log.Printf("error: %v", err)
return nil, err
}
memInfo := &Mem{}
reader := bufio.NewReader(bytes.NewBuffer(contents))

for {
line, err := file.ReadLine(reader)
if err == io.EOF {
err = nil
break
} else if err != nil {
return nil, err
}
fields := strings.Fields(string(line))
fieldName := fields[0]
_, ok := WANT[fieldName]
if ok && len(fields) == 3 {
val, numerr := strconv.ParseUint(fields[1], 10, 64)
if numerr != nil {
continue
}
switch fieldName {
case "VmRSS":
memInfo.VmRSS = val / Multi
}
}
}
if err != nil {
return nil, err
}
return memInfo, nil
}

func (proc *ProcessManager) CheckProc() (int, error) {
d, err := os.Open("/proc")
if err != nil {
return 0, err
}
defer func() {
d.Close()
}()
proc.handler = d
for {
fileList, err := proc.handler.Readdir(10)
if err != nil && err != io.EOF {
return 0, err
}
for _, fi := range fileList {
if !fi.IsDir() {
continue
}
name := fi.Name()
if ok, err := regexp.MatchString(`^[0-9]+$`, name); err != nil || !ok {
continue
}
pid, err := strconv.ParseInt(name, 10, 0)
if err != nil {
continue
}
p := ProcessInfo{pid: int(pid)}
if err := p.Load(); err == nil {
return p.pid, nil
} else {
continue
}
}
}
}

// 确认下
func (p *ProcessInfo) Load() error {
dataBytes, err := ioutil.ReadFile(fmt.Sprintf("/proc/%d/stat", p.pid))
if err != nil {
return err
}
data := string(dataBytes)
start := strings.IndexRune(data, '(') + 1
end := strings.IndexRune(data[start:], ')')
p.name = data[start : start+end]
if p.name == "falcon-agent" {
result := strings.Split(string(data[start+end+2:]), " ")
if len(result) < 2 {
return errors.New("length not right")
}
p.state = result[0]
if ppid, err := strconv.Atoi(result[2]); err == nil {
p.ppid = ppid
}
}
return errors.New("the proc not exist")
}
38 changes: 38 additions & 0 deletions modules/agent/g/cfg.go
Original file line number Diff line number Diff line change
@@ -18,6 +18,8 @@ import (
"encoding/json"
"log"
"os"
"strconv"
"strings"
"sync"

"github.com/toolkits/file"
@@ -56,9 +58,12 @@ type CollectorConfig struct {
}

type GlobalConfig struct {
MaxMemory uint64 `json:"max_memory"`
MemoryCtrl bool `json:"memoryctrl"`
Debug bool `json:"debug"`
Hostname string `json:"hostname"`
IP string `json:"ip"`
Batch int `json:"batch,omitempty"`
Plugin *PluginConfig `json:"plugin"`
Heartbeat *HeartbeatConfig `json:"heartbeat"`
Transfer *TransferConfig `json:"transfer"`
@@ -134,6 +139,39 @@ func ParseConfig(cfg string) {
log.Fatalln("parse config file:", cfg, "fail:", err)
}

memCtrl :=os.Getenv("MEMORY_CTRL")
if len(memCtrl)!=0{
if strings.ToLower(memCtrl)=="true"{
c.MemoryCtrl=true
}else{
c.MemoryCtrl=false
}
}

transferAddr :=os.Getenv("TRANSFER_URL")
if len(transferAddr)!=0{
c.Transfer.Addrs=strings.Split(transferAddr,",")
log.Println("transfer url: ",transferAddr)
}

heartbeatURL :=os.Getenv("HEARTBEAT_URL")
if len(heartbeatURL)!=0{
c.Heartbeat.Addr=heartbeatURL
log.Println("heartbeat URL: ",transferAddr)
}
limitBatch :=os.Getenv("LIMIT_BATCH")
if len(limitBatch)!=0{
c.Batch,err=strconv.Atoi(limitBatch)
if err!=nil{
log.Println("invalid limit Batch: ",limitBatch)
}
log.Println("from env set Batch size: ", c.Batch)
}
if c.Batch<=0{
c.Batch=2000
log.Println("set batch default size: ",c.Batch)
}

lock.Lock()
defer lock.Unlock()

71 changes: 67 additions & 4 deletions modules/agent/http/push.go
Original file line number Diff line number Diff line change
@@ -16,27 +16,90 @@ package http

import (
"encoding/json"
"errors"
"github.com/open-falcon/falcon-plus/common/model"
"github.com/open-falcon/falcon-plus/modules/agent/funcs"
"github.com/open-falcon/falcon-plus/modules/agent/g"
"log"
"net/http"
"strconv"
)

func configPushRoutes() {
http.HandleFunc("/v1/push", func(w http.ResponseWriter, req *http.Request) {
http.HandleFunc("/v1/push", cors(func(w http.ResponseWriter, req *http.Request) {
if req.ContentLength == 0 {
http.Error(w, "body is blank", http.StatusBadRequest)
return
}

if g.Config().MemoryCtrl == true {
errMem := MemCtrl(w)
if errMem != nil {
return
}
}
decoder := json.NewDecoder(req.Body)
var metrics []*model.MetricValue
err := decoder.Decode(&metrics)
if err != nil {
http.Error(w, "connot decode body", http.StatusBadRequest)
return
}

if len(metrics) >= g.Config().Batch {
http.Error(w, "cannot post Metric too Big !!! curr count: "+strconv.Itoa(len(metrics)), http.StatusBadRequest)
}
g.SendToTransfer(metrics)
w.Write([]byte("success"))
})
}))
}

func MemCtrl(w http.ResponseWriter) error {
log.Printf("memory control start")
mem, err := funcs.AgentMemInfo()
if err != nil {
RenderMsgJson(w, err.Error())
return err
}
memUsed := mem.VmRSS
if memUsed > g.Config().MaxMemory {
log.Printf("memory consumption has exceeded the threshold")
http.Error(w, "memory consumption has exceeded the threshold", http.StatusBadRequest)
return errors.New("memory consumption has exceeded the threshold")
}
return err
}

// 确认下相关请求是否有关联
func cors(f http.HandlerFunc) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
origin := r.Header.Get("Origin")
switch r.Method {
case "OPTIONS":
method := r.Header.Get("Access-Control-Request-Method")
log.Printf("preflight request method: %v", method)
if method == "POST" {
w.Header().Set("Access-Control-Allow-Origin", origin)
w.Header().Set("Access-Control-Allow-Headers", "*")
w.Header().Set("Access-Control-Allow-Methods", "POST,OPTIONS")
w.WriteHeader(http.StatusOK)
return
} else {
w.WriteHeader(http.StatusBadRequest)
return
}
case "POST":
if origin == r.Header.Get("Origin") {
w.Header().Set("Access-Control-Allow-Origin", origin)
} else if origin == "" {
f(w, r)
return
} else {
w.WriteHeader(http.StatusBadRequest)
return
}
default:
w.WriteHeader(http.StatusBadRequest)
return
}
f(w, r)
}
}