From aea7527905d3c0e5a64d5073782605b750984d42 Mon Sep 17 00:00:00 2001 From: Christian Kniep Date: Thu, 8 Dec 2016 14:11:27 +0100 Subject: [PATCH] created docker_containers collector based on docker/docker-py --- collectors/0/docker.py | 209 ------------------ collectors/0/docker_containers.py | 133 +++++++++++ ...cker_conf.py => docker_containers_conf.py} | 21 +- collectors/test/test_docker_container.py | 91 ++++++++ 4 files changed, 228 insertions(+), 226 deletions(-) delete mode 100755 collectors/0/docker.py create mode 100755 collectors/0/docker_containers.py rename collectors/etc/{docker_conf.py => docker_containers_conf.py} (60%) create mode 100644 collectors/test/test_docker_container.py diff --git a/collectors/0/docker.py b/collectors/0/docker.py deleted file mode 100755 index 8a744c44..00000000 --- a/collectors/0/docker.py +++ /dev/null @@ -1,209 +0,0 @@ -#!/usr/bin/env python -# More informations on https://docs.docker.com/articles/runmetrics/ -"""Imports Docker stats from /sys/fs/cgroup.""" - -import os -import re -import socket -import sys -import time -import json - -from collectors.etc import docker_conf -from collectors.lib import utils - -CONFIG = docker_conf.get_config() - -COLLECTION_INTERVAL = CONFIG['interval'] -CGROUP_PATH =CONFIG['cgroup_path'] -ENABLED = docker_conf.enabled() -DOCKER_SOCK = CONFIG['socket_path'] - -if not ENABLED: - sys.stderr.write("Docker collector is not enabled") - sys.exit(13) - -# proc_names example: -# $ cat cpuacct.stat -# user 58 -# system 72 -proc_names = { - "cpuacct.stat": ( - "user", "system", - ), - "memory.stat": ( - "cache", "rss", "mapped_file", "pgfault", "pgmajfault", "swap", "active_anon", - "inactive_anon", "active_file", "inactive_file", "unevictable", - "hierarchical_memory_limit", "hierarchical_memsw_limit", - ), -} - -# proc_names_to_agg example: -# $ cat blkio.io_service_bytes -# 8:0 Read 8523776 -# 8:0 Write 1048576 -# ... -# 8:1 Read 4223776 -# 8:1 Write 1042576 -# ... -proc_names_to_agg = { - "blkio.io_service_bytes": ( - "Read", "Write", - ), -} - -def getnameandimage(containerid): - - # Retrieve container json configuration file - sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) - sock.settimeout(5) - try: - r = sock.connect_ex(DOCKER_SOCK) - if (r != 0): - print >>sys.stderr, "Can not connect to %s" % (DOCKER_SOCK) - else: - message = 'GET /containers/' + containerid + '/json HTTP/1.1\n\n' - sock.sendall(message) - json_data = "" - # "\r\n0\r\n" is raised on last chunk. See RFC 7230. - while (re.search("\r\n0\r\n", json_data) == None): - json_data += sock.recv(4096) - sock.close() - - # Retrieve container name and image - m = re.search("{(.+)}", json_data) - if m: - json_data = "{"+m.group(1)+"}" - try: - data = json.loads(json_data) - try: - containernames[containerid] = data["Name"].lstrip('/') - except: - print >>sys.stderr, containerid+" has no Name field" - try: - containerimages[containerid] = data["Config"]["Image"].replace(':', '_') - except: - print >>sys.stderr, containerid+" has no Image field" - except: - print >>sys.stderr, "Can not load json" - - except socket.timeout, e: - print >>sys.stderr, "Socket: %s" % (e,) - -def senddata(datatosend, containerid): - if datatosend: - datatosend += " containerid="+containerid - if (containerid in containernames): - datatosend += " containername="+containernames[containerid] - if (containerid in containerimages): - datatosend += " containerimage="+containerimages[containerid] - print "docker.%s" % datatosend - sys.stdout.flush() - -def readdockerstats(path, containerid): - - # update containername and containerimage if needed - if ((containerid not in containernames) or (containerid not in containerimages)): - getnameandimage(containerid) - - # Retrieve and push stats - for file_stat in os.listdir(path): - if (os.path.isfile(path+"/"+file_stat)\ - and ((file_stat in proc_names.keys()) or (file_stat in proc_names_to_agg.keys()))): - try: - f_stat = open(path+"/"+file_stat) - except IOError, e: - print >>sys.stderr, "Failed to open input file: %s" % (e,) - return 1 - ts = int(time.time()) - - # proc_name - if (file_stat in proc_names.keys()): - datatosend = None - f_stat.seek(0) - for line in f_stat: - tags = None - subcattype = None - fields = line.split() - category = file_stat.split('.')[0] - subcategory = fields[0] - value = fields[1] - if subcategory in proc_names[file_stat]: - if category == 'memory': - if subcategory in ['active_anon', 'inactive_anon']: - subcattype = subcategory.split('_')[0] - subcategory = 'anon' - if subcategory in ['active_file', 'inactive_file']: - subcattype = subcategory.split('_')[0] - subcategory = 'file' - tags = "type=%s" % subcategory - if subcattype != None: - tags += " subtype=%s" % subcattype - datatosend = "%s %d %s %s" % (category, ts, value, tags) - else: - datatosend = "%s.%s %d %s" % (category, subcategory, ts, value) - senddata(datatosend, containerid) - # proc_names_to_agg - else: - if (file_stat in proc_names_to_agg.keys()): - for field_to_match in proc_names_to_agg[file_stat]: - datatosend = None - f_stat.seek(0) - count = 0 - for line in f_stat: - fields = line.split() - if fields[1] == field_to_match: - datatosend = "%s.%s" % (file_stat, fields[1].lower()) - try: - count += int(fields[2]) - except: - pass - if datatosend: - senddata("%s %d %s" % (datatosend, ts, count), containerid) - f_stat.close() - -def main(): - """docker_cpu main loop""" - global containernames - global containerimages - utils.drop_privileges() - cache=0 - while True: - - # Connect to Docker socket to get informations about containers every 4 times - if (cache == 0): - containernames={} - containerimages={} - cache += 1 - if (cache == 4): - cache = 0 - - if os.path.isdir(CGROUP_PATH): - for level1 in os.listdir(CGROUP_PATH): - if (os.path.isdir(CGROUP_PATH + "/"+level1+"/docker")\ - # /cgroup/cpu and /cgroup/cpuacct are often links to /cgroup/cpu,cpuacct - and not (((level1 == "cpu,cpuacct") or (level1 == "cpuacct")) and (os.path.isdir(CGROUP_PATH + "/cpu/docker")))): - for level2 in os.listdir(CGROUP_PATH + "/"+level1+"/docker"): - if os.path.isdir(CGROUP_PATH + "/"+level1+"/docker/"+level2): - readdockerstats(CGROUP_PATH + "/"+level1+"/docker/"+level2, level2) - else: - # If Docker cgroup is handled by slice - # http://www.freedesktop.org/software/systemd/man/systemd.slice.html - for slicename in ("system.slice", "machine.slice", "user.slice"): - if (os.path.isdir(CGROUP_PATH + "/"+level1+"/"+slicename)\ - # /cgroup/cpu and /cgroup/cpuacct are often links to /cgroup/cpu,cpuacct - and not (((level1 == "cpu,cpuacct") or (level1 == "cpuacct")) and (os.path.isdir(CGROUP_PATH + "/cpu/"+slicename)))): - for level2 in os.listdir(CGROUP_PATH + "/"+level1+"/"+slicename): - if os.path.isdir(CGROUP_PATH + "/"+level1+"/"+slicename+"/"+level2): - m = re.search("^docker-(\w+)\.scope$", level2) - if m: - readdockerstats(CGROUP_PATH + "/"+level1+"/"+slicename+"/"+level2, m.group(1)) - break - if os.path.isdir(CGROUP_PATH + "/lxc"): - for level1 in os.listdir(CGROUP_PATH + "/lxc"): - if os.path.isdir(CGROUP_PATH + "/lxc/"+level1): - readdockerstats(CGROUP_PATH + "/lxc/"+level1, level1) - time.sleep(COLLECTION_INTERVAL) - -if __name__ == "__main__": - sys.exit(main()) diff --git a/collectors/0/docker_containers.py b/collectors/0/docker_containers.py new file mode 100755 index 00000000..7d2b9b32 --- /dev/null +++ b/collectors/0/docker_containers.py @@ -0,0 +1,133 @@ +#!/usr/bin/env python +"""Imports Docker stats from the docker-api""" + +import json +import sys +import time + +from feedparser import _parse_date as parse_date +from docker import Client + +from collectors.etc import docker_containers_conf +from collectors.lib import utils + +CONFIG = docker_containers_conf.get_config() +print CONFIG +COLLECTION_INTERVAL = CONFIG['interval'] +DEFAULT_DIMS = CONFIG['default_dims'] +ENABLED = docker_containers_conf.enabled() +DOCKER_SOCK = CONFIG['socket_path'] + +if not ENABLED: + sys.stderr.write("Docker collector is not enabled") + sys.exit(13) + +class Metric(object): + def __init__(self, name, etime, value, dims=None): + self.name = name + self.value = value + self.event_time = etime + if dims is None: + self.dims = set([]) + else: + self.dims = set(dims) + self.dims.update(set(DEFAULT_DIMS)) + + def add_dims(self, dims): + self.dims.update(set(dims)) + + def getMetricLines(self): + """ return in OpenTSDB format + [key=val] [key1=val1]... + """ + m = "%s %s %s" % (self.name, int(time.mktime(self.event_time)), self.value) + return "%s %s" % (m, " ".join(sorted(list(self.dims)))) + +class Stats(object): + def __init__(self, container, mtime): + self.dims = [ + "container_name=%s" % trimContainerName(container), + "container_id=%s" % container['Id'], + "image_name=%s" % container['Image'], + "image_id=%s" % container['ImageID'] + ] + self.event_time = mtime + + def evalCpu(self, pre, cur): + ret = [] + system_usage_ms = float(cur["system_cpu_usage"]-pre["system_cpu_usage"]) + cnt_user_ms = cur["cpu_usage"]["usage_in_usermode"]-pre["cpu_usage"]["usage_in_usermode"] + cnt_user_percent = cnt_user_ms*100/system_usage_ms + cnt_kernel_ms = cur["cpu_usage"]["usage_in_kernelmode"]-pre["cpu_usage"]["usage_in_kernelmode"] + cnt_kernel_percent = cnt_kernel_ms*100/system_usage_ms + m = Metric("cpu.total.kernel", self.event_time, cnt_kernel_percent, self.dims) + ret.append(m) + m = Metric("cpu.total.user", self.event_time, cnt_user_percent, self.dims) + ret.append(m) + m = Metric("cpu.total.all", self.event_time, cnt_kernel_percent+cnt_user_percent, self.dims) + ret.append(m) + for i, cur_percpu in list(enumerate(cur["cpu_usage"]["percpu_usage"])): + dims = [item for item in self.dims] + dims.append("cpu=%d" % i) + cnt_user_ms = cur_percpu - pre["cpu_usage"]["percpu_usage"][i] + cnt_kernel_ms = cur_percpu - pre["cpu_usage"]["percpu_usage"][i] + percpu_user_percent = cnt_user_ms*100/system_usage_ms + m = Metric("cpu.cpu%d.user" % i, self.event_time, percpu_user_percent, dims) + ret.append(m) + percpu_kernel_percent = cnt_kernel_ms*100/system_usage_ms + m = Metric("cpu.cpu%d.kernel" % i, self.event_time, percpu_kernel_percent, dims) + ret.append(m) + m = Metric("cpu.cpu%d.all" % i, self.event_time, percpu_kernel_percent+percpu_user_percent, dims) + ret.append(m) + return ret + + def evalMem(self, stats): + ret = [] + for k,v in stats["memory_stats"].items(): + if isinstance(v, dict): + for sk, sv in v.items(): + m = Metric("memory.%s.%s" % (k,sk), self.event_time, sv, self.dims) + ret.append(m) + else: + m = Metric("memory.%s" % k, self.event_time, v, self.dims) + ret.append(m) + return ret + + def evalNet(self, stats): + ret = [] + for dev,dval in stats["networks"].items(): + for k, v in dval.items(): + m = Metric("net.%s.%s" % (dev,k), self.event_time, v, self.dims) + ret.append(m) + return ret + + +def trimContainerName(container): + return container["Names"][0].strip("/") + + + +def main(): + """docker_cpu main loop""" + cli=Client(base_url=DOCKER_SOCK) + metrics = [] + + while True: + for container in cli.containers(): + stats = cli.stats(container['Id'],stream=False) + mtime = parse_date(stats["read"]) + s = Stats(container, mtime) + for m in s.evalCpu(stats["precpu_stats"], stats["cpu_stats"]): + print m.getMetricLines() + for m in s.evalMem(stats): + print m.getMetricLines() + for m in s.evalNet(stats): + print m.getMetricLines() + + if COLLECTION_INTERVAL > 0: + time.sleep(COLLECTION_INTERVAL) + else: + break + +if __name__ == "__main__": + sys.exit(main()) diff --git a/collectors/etc/docker_conf.py b/collectors/etc/docker_containers_conf.py similarity index 60% rename from collectors/etc/docker_conf.py rename to collectors/etc/docker_containers_conf.py index 08e67a40..95e9a041 100644 --- a/collectors/etc/docker_conf.py +++ b/collectors/etc/docker_containers_conf.py @@ -13,28 +13,15 @@ # see . def enabled(): - return False + return True def get_config(): - """Configuration for the Docker collector - - On EL6 distros (CentOS/RHEL/Scientific/OL) the cgroup path should be: - "/cgroup" - """ - import platform - # Scientific Linux says 'redhat' here - # CentOS 5 says 'redhat' - # CentOS >=6 says 'centos' - # CentOS >=7 cgroup is located on /sys/fs/cgroup - if platform.dist()[0] in ['centos', 'redhat'] and not platform.dist()[1].startswith("7."): - cgroup_path = '/cgroup' - else: - cgroup_path = '/sys/fs/cgroup' + """Configuration for the Docker collector""" config = { 'interval': 15, - 'socket_path': '/var/run/docker.sock', - 'cgroup_path': cgroup_path + 'default_dims': [], + 'socket_path': 'unix:///var/run/docker.sock', } return config diff --git a/collectors/test/test_docker_container.py b/collectors/test/test_docker_container.py new file mode 100644 index 00000000..36988261 --- /dev/null +++ b/collectors/test/test_docker_container.py @@ -0,0 +1,91 @@ +#!/usr/bin/env python + +import time +import os +import sys + +class _dummy(object): + pass +me = os.path.split(os.path.realpath(sys.modules[_dummy.__module__].__file__))[0] +sys.path.insert(0, os.path.join(os.path.split(me)[0], '0')) +import docker_containers + +def test_trimContainerName(): + cnt = {"Names": ["/name"]} + assert docker_containers.trimContainerName(cnt) == "name" + +def test_Metric(): + """ Check assembly of metric """ + now = time.gmtime() + m = docker_containers.Metric("metric", now, 10) + assert m.name == "metric" + assert m.value == 10 + assert m.event_time == now + assert m.dims == set([]) + m.add_dims(["hello=world"]) + assert m.dims == set(["hello=world"]) + got = m.getMetricLines() + assert got == "metric %s 10 hello=world" % int(time.mktime(now)) + + +def test_Stats(): + """ Check if the constructor of Stats works correctly """ + now = time.gmtime() + cnt = { + "Names": ["/name"], + "Id": "cntHASH", + "Image": "qnib/test", + "ImageID": "sha256:123" + } + exp_dims = [ + "container_name=name", + "container_id=cntHASH", + "image_name=qnib/test", + "image_id=sha256:123" + ] + s = docker_containers.Stats(cnt, now) + assert s.dims == exp_dims + assert s.event_time == now + # Memory + stats = { + "memory_stats": { + "usage": 368640 + } + } + m = s.evalMem(stats) + got = m[0].getMetricLines() + exp = "memory.usage %s 368640 container_id=cntHASH container_name=name image_id=sha256:123 image_name=qnib/test" % int(time.mktime(now)) + assert got == exp + # Network + stats = { + "networks": { + "eth0": { + "rx_packets": 368640 + } + } + } + m = s.evalNet(stats) + got = m[0].getMetricLines() + exp = "net.eth0.rx_packets %s 368640 container_id=cntHASH container_name=name image_id=sha256:123 image_name=qnib/test" % int(time.mktime(now)) + assert got == exp + # CPU + pre = { + "system_cpu_usage": 500, + "cpu_usage": { + "usage_in_usermode": 200, + "usage_in_kernelmode": 200, + "percpu_usage": [ 100, 100 ] + } + } + cur = { + "system_cpu_usage": 520, + "cpu_usage": { + "usage_in_usermode": 210, + "usage_in_kernelmode": 210, + "percpu_usage": [ 110, 110 ] + } + } + m = s.evalCpu(pre, cur) + got = m[0].getMetricLines() + exp = "cpu.total.kernel %s 50.0 container_id=cntHASH container_name=name image_id=sha256:123 image_name=qnib/test" % int(time.mktime(now)) + assert got == exp