Skip to content

Commit

Permalink
[timeseries] Add initial support for elasticsearch #99
Browse files Browse the repository at this point in the history
Closes #99
nepython authored and devkapilbansal committed Apr 8, 2021
1 parent 7b2a410 commit 044a29f
Showing 26 changed files with 1,356 additions and 155 deletions.
3 changes: 2 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -18,6 +18,7 @@ WORKDIR /opt/openwisp/tests/
ENV NAME=openwisp-monitoring \
PYTHONBUFFERED=1 \
INFLUXDB_HOST=influxdb \
REDIS_HOST=redis
REDIS_HOST=redis \
ELASTICSEARCH_HOST=es01
CMD ["sh", "docker-entrypoint.sh"]
EXPOSE 8000
41 changes: 39 additions & 2 deletions README.rst
Original file line number Diff line number Diff line change
@@ -74,6 +74,7 @@ Available Features
* Collects and displays `device status <#device-status>`_ information like uptime, RAM status, CPU load averages,
Interface properties and addresses, WiFi interface status and associated clients,
Neighbors information, DHCP Leases, Disk/Flash status
* Collection of monitoring information in a timeseries database (`InfluxDB <https://www.influxdata.com/>`_ and `Elasticsearch <https://www.elastic.co/elasticsearch/>`_ are currently supported)
* Monitoring charts for uptime, packet loss, round trip time (latency), associated wifi clients, interface traffic,
RAM usage, CPU load, flash/disk usage
* Charts can be viewed at resolutions of 1 day, 3 days, a week, a month and a year
@@ -108,6 +109,8 @@ beforehand.
In case you prefer not to use Docker you can `install InfluxDB <https://docs.influxdata.com/influxdb/v1.8/introduction/install/>`_
and Redis from your repositories, but keep in mind that the version packaged by your distribution may be different.

If you wish to use ``Elasticsearch`` for storing and retrieving timeseries data then `install Elasticsearch <https://www.elastic.co/guide/en/elasticsearch/reference/current/install-elasticsearch.html>`_.

Install spatialite and sqlite:

.. code-block:: shell
@@ -165,6 +168,20 @@ Follow the setup instructions of `openwisp-controller
'PORT': '8086',
}
In case, you wish to use ``Elasticsearch`` for timeseries data storage and retrieval,
make use of the following settings

.. code-block:: python
TIMESERIES_DATABASE = {
'BACKEND': 'openwisp_monitoring.db.backends.elasticsearch',
'USER': 'openwisp',
'PASSWORD': 'openwisp',
'NAME': 'openwisp2',
'HOST': 'localhost',
'PORT': '9200',
}
``urls.py``:

.. code-block:: python
@@ -461,6 +478,9 @@ This data is only used to assess the recent status of devices, keeping
it for a long time would not add much benefit and would cost a lot more
in terms of disk space.

**Note**: In case you use ``Elasticsearch`` then time shall be taken as integral multiple of a day.
That means the time ``36h0m0s`` shall be interpreted as ``24h0m0s`` (integral multiple of a day).

``OPENWISP_MONITORING_AUTO_PING``
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

@@ -764,18 +784,30 @@ MB (megabytes) instead of GB (Gigabytes) you can use:
"SUM(rx_bytes) / 1000000 AS download FROM {key} "
"WHERE time >= '{time}' AND content_type = '{content_type}' "
"AND object_id = '{object_id}' GROUP BY time(1d)"
)
),
'elasticsearch': _make_query({
'upload': {'sum': {'field': 'points.fields.tx_bytes'}},
'download': {'avg': {'field': 'points.fields.rx_bytes'}},
})
},
}
}
# Please declare the operations separately in case you use elasticsearch as done below
OPENWISP_MONITORING_ADDITIONAL_CHART_OPERATIONS = {
'upload': {'operator': '/', 'value': 1000000},
'download': {'operator': '/', 'value': 1000000},
}
Or if you want to define a new chart configuration, which you can then
call in your custom code (eg: a custom check class), you can do so as follows:

.. code-block:: python
from django.utils.translation import gettext_lazy as _
from openwisp_monitoring.db.backends.elasticsearch import _make_query
OPENWISP_MONITORING_CHARTS = {
'ram': {
'type': 'line',
@@ -789,7 +821,12 @@ call in your custom code (eg: a custom check class), you can do so as follows:
"MEAN(buffered) AS buffered FROM {key} WHERE time >= '{time}' AND "
"content_type = '{content_type}' AND object_id = '{object_id}' "
"GROUP BY time(1d)"
)
),
'elasticsearch': _make_query({
'total': {'avg': {'field': 'points.fields.total'}},
'free': {'avg': {'field': 'points.fields.free'}},
'buffered': {'avg': {'field': 'points.fields.buffered'}},
})
},
}
}
48 changes: 48 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -11,6 +11,8 @@ services:
depends_on:
- influxdb
- redis
- es01
- es02

influxdb:
image: influxdb:1.8-alpine
@@ -22,6 +24,45 @@ services:
INFLUXDB_DB: openwisp2
INFLUXDB_USER: openwisp
INFLUXDB_USER_PASSWORD: openwisp
# clustered version of elasticsearch is used as that might be used in production
es01:
image: docker.elastic.co/elasticsearch/elasticsearch:7.8.0
container_name: es01
environment:
- "node.name=es01"
- "discovery.seed_hosts=es02"
- "cluster.initial_master_nodes=es01,es02"
- "cluster.name=openwisp2"
- "bootstrap.memory_lock=true"
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
ulimits:
memlock:
soft: -1
hard: -1
volumes:
- esdata01:/usr/share/elasticsearch/data
ports:
- 9200:9200
networks:
- esnet
es02:
image: docker.elastic.co/elasticsearch/elasticsearch:7.8.0
container_name: es02
environment:
- "node.name=es02"
- "discovery.seed_hosts=es01"
- "cluster.initial_master_nodes=es01,es02"
- "cluster.name=openwisp2"
- "bootstrap.memory_lock=true"
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
ulimits:
memlock:
soft: -1
hard: -1
volumes:
- esdata02:/usr/share/elasticsearch/data
networks:
- esnet

redis:
image: redis:5.0-alpine
@@ -31,3 +72,10 @@ services:

volumes:
influxdb-data: {}
esdata01:
driver: local
esdata02:
driver: local

networks:
esnet:
4 changes: 1 addition & 3 deletions openwisp_monitoring/db/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
from .backends import timeseries_db

chart_query = timeseries_db.queries.chart_query
default_chart_query = timeseries_db.queries.default_chart_query
device_data_query = timeseries_db.queries.device_data_query

__all__ = ['timeseries_db', 'chart_query', 'default_chart_query', 'device_data_query']
__all__ = ['timeseries_db', 'chart_query']
3 changes: 2 additions & 1 deletion openwisp_monitoring/db/backends/__init__.py
Original file line number Diff line number Diff line change
@@ -48,7 +48,8 @@ def load_backend_module(backend_name=TIMESERIES_DB['BACKEND'], module=None):
except ImportError as e:
# The database backend wasn't found. Display a helpful error message
# listing all built-in database backends.
builtin_backends = ['influxdb']
builtin_backends = ['influxdb', 'elasticsearch']
raise e
if backend_name not in [
f'openwisp_monitoring.db.backends.{b}' for b in builtin_backends
]:
3 changes: 3 additions & 0 deletions openwisp_monitoring/db/backends/elasticsearch/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from .queries import _make_query

__all__ = ['_make_query']
466 changes: 466 additions & 0 deletions openwisp_monitoring/db/backends/elasticsearch/client.py

Large diffs are not rendered by default.

92 changes: 92 additions & 0 deletions openwisp_monitoring/db/backends/elasticsearch/index.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
import uuid

from django.conf import settings
from elasticsearch.exceptions import NotFoundError
from elasticsearch_dsl import Date, Document, InnerDoc, Nested, Q, Search


class Point(InnerDoc):
time = Date(required=True, default_timezone=settings.TIME_ZONE)
fields = Nested(dynamic=True, required=True, multi=True)


class MetricDocument(Document):
tags = Nested(dynamic=True, required=False, multi=True)
points = Nested(Point)

class Index:
name = 'metric'
settings = {
'number_of_shards': 1,
'number_of_replicas': 0,
'lifecycle.name': 'default',
'lifecycle.rollover_alias': 'metric',
}


def find_metric(client, index, tags, retention_policy=None, add=False):
search = Search(using=client, index=index)
if tags:
tags_dict = dict()
for key, value in tags.items():
tags_dict[f'tags.{key}'] = value
q = Q(
'nested',
path='tags',
query=Q(
'bool', must=[Q('match', **{k: str(v)}) for k, v in tags_dict.items()]
),
)
else:
q = Q()
try:
result = list(search.query(q).execute())[0].meta
return result['id'], result['index']
except (NotFoundError, AttributeError, IndexError):
if add:
document = create_document(
client, index, tags, retention_policy=retention_policy
)
return document['_id'], document['_index']
return None


def create_document(client, key, tags, _id=None, retention_policy=None):
"""
Adds document to relevant index using ``keys``, ``tags`` and ``id`` provided.
If no ``id`` is provided a random ``uuid`` would be used.
"""
_id = str(_id or uuid.uuid1())
# If index exists, create the document and return
try:
index_aliases = client.indices.get_alias(index=key)
for k, v in index_aliases.items():
if v['aliases'][key]['is_write_index']:
break
client.create(index=k, id=_id, body={'tags': tags})
return {'_id': _id, '_index': k}
except NotFoundError:
pass
# Create a new index if it doesn't exist
name = f'{key}-000001'
document = MetricDocument(meta={'id': _id})
document._index = document._index.clone(name)
# Create a new index template if it doesn't exist
if not client.indices.exists_template(name=key):
document._index.settings(**{'lifecycle.rollover_alias': key})
if retention_policy:
document._index.settings(**{'lifecycle.name': retention_policy})
# add index pattern is added for Index Lifecycle Management
document._index.as_template(key, f'{key}-*').save(using=client)
document.init(using=client, index=name)
document.meta.index = name
document.tags = tags
document.save(using=client, index=name)
client.indices.put_alias(index=name, name=key, body={'is_write_index': True})
if retention_policy:
client.indices.put_settings(
body={'lifecycle.name': retention_policy}, index=name
)
client.indices.put_settings(body={'lifecycle.rollover_alias': key}, index=name)
client.indices.refresh(index=key)
return document.to_dict(include_meta=True)
130 changes: 130 additions & 0 deletions openwisp_monitoring/db/backends/elasticsearch/queries.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
import operator
from copy import deepcopy

from openwisp_utils.utils import deep_merge_dicts

from .settings import ADDITIONAL_CHART_OPERATIONS

default_chart_query = {
'query': {
'nested': {
'path': 'tags',
'query': {
'bool': {
'must': [
{'match': {'tags.object_id': {'query': '{object_id}'}}},
{'match': {'tags.content_type': {'query': '{content_type}'}}},
]
}
},
},
},
'_source': False,
'size': 0,
'aggs': {
'GroupByTime': {
'nested': {
'path': 'points',
'aggs': {
'set_range': {
'filter': {
'range': {
'points.time': {'from': 'now-1d/d', 'to': 'now/d'}
}
},
'aggs': {
'time': {
'date_histogram': {
'field': 'points.time',
'fixed_interval': '10m',
'format': 'date_time_no_millis',
'order': {'_key': 'desc'},
},
'aggs': {
'nest': {
'nested': {
'path': 'points.fields',
'aggs': {
'{field_name}': {
'avg': {
'field': 'points.fields.{field_name}'
}
}
},
}
},
},
},
},
}
},
}
}
},
}

math_map = {
'uptime': {'operator': '*', 'value': 100},
'memory_usage': {'operator': '*', 'value': 100},
'CPU_load': {'operator': '*', 'value': 100},
'disk_usage': {'operator': '*', 'value': 100},
'upload': {'operator': '/', 'value': 1000000000},
'download': {'operator': '/', 'value': 1000000000},
}

operator_lookup = {
'+': operator.add,
'-': operator.sub,
'*': operator.mul,
'/': operator.truediv,
}

if ADDITIONAL_CHART_OPERATIONS:
assert isinstance(ADDITIONAL_CHART_OPERATIONS, dict)
for value in ADDITIONAL_CHART_OPERATIONS.values():
assert value['operator'] in operator_lookup
assert isinstance(value['value'], (int, float))
math_map = deep_merge_dicts(math_map, ADDITIONAL_CHART_OPERATIONS)


def _make_query(aggregation=None):
query = deepcopy(default_chart_query)
if aggregation:
query['aggs']['GroupByTime']['nested']['aggs']['set_range']['aggs']['time'][
'aggs'
]['nest']['nested']['aggs'] = aggregation
return query


def _get_chart_query():
aggregation_dict = {
'uptime': {'uptime': {'avg': {'field': 'points.fields.reachable'}}},
'packet_loss': {'packet_loss': {'avg': {'field': 'points.fields.loss'}}},
'rtt': {
'RTT_average': {'avg': {'field': 'points.fields.rtt_avg'}},
'RTT_max': {'avg': {'field': 'points.fields.rtt_max'}},
'RTT_min': {'avg': {'field': 'points.fields.rtt_min'}},
},
'traffic': {
'upload': {'sum': {'field': 'points.fields.tx_bytes'}},
'download': {'sum': {'field': 'points.fields.rx_bytes'}},
},
'wifi_clients': {
'wifi_clients': {
'cardinality': {
'field': 'points.fields.{field_name}.keyword',
'missing': 0,
}
}
},
'memory': {'memory_usage': {'avg': {'field': 'points.fields.percent_used'}}},
'cpu': {'CPU_load': {'avg': {'field': 'points.fields.cpu_usage'}}},
'disk': {'disk_usage': {'avg': {'field': 'points.fields.used_disk'}}},
}
query = {}
for key, value in aggregation_dict.items():
query[key] = {'elasticsearch': _make_query(value)}
return query


chart_query = _get_chart_query()
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
# By default age is calculated from the date the index is created but if the
# index has been rolled over than the rollover date is used to calculate the age

default_rp_policy = {
'policy': {
'phases': {
'hot': {
'actions': {
'rollover': {'max_age': '30d', 'max_size': '90G'},
'set_priority': {'priority': 100},
}
},
'warm': {
'min_age': '30d',
'actions': {
'forcemerge': {'max_num_segments': 1},
'allocate': {'number_of_replicas': 0},
'set_priority': {'priority': 50},
},
},
'cold': {'min_age': '150d', 'actions': {'freeze': {}}},
'delete': {'min_age': '335d', 'actions': {'delete': {}}},
}
}
}


def _make_policy(max_age):
return {
'policy': {
'phases': {
'hot': {
'actions': {
'rollover': {'max_age': max_age},
'set_priority': {'priority': 100},
}
},
'delete': {'actions': {'delete': {}}},
}
}
}
5 changes: 5 additions & 0 deletions openwisp_monitoring/db/backends/elasticsearch/settings.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from django.conf import settings

ADDITIONAL_CHART_OPERATIONS = getattr(
settings, 'OPENWISP_MONITORING_ADDITIONAL_CHART_OPERATIONS', {}
)
File renamed without changes.
296 changes: 296 additions & 0 deletions openwisp_monitoring/db/backends/elasticsearch/tests/client_tests.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,296 @@
from datetime import datetime, timedelta
from importlib import reload
from unittest.mock import patch

from celery.exceptions import Retry
from django.conf import settings
from django.core.exceptions import ValidationError
from django.utils.timezone import now
from elasticsearch.exceptions import ElasticsearchException
from freezegun import freeze_time
from pytz import timezone as tz

from openwisp_monitoring.device.settings import SHORT_RETENTION_POLICY
from openwisp_monitoring.device.tests import DeviceMonitoringTestCase
from openwisp_monitoring.device.utils import SHORT_RP, manage_short_retention_policy

from ....exceptions import TimeseriesWriteException
from ... import timeseries_db
from .. import queries as queries_module
from ..index import MetricDocument


class TestDatabaseClient(DeviceMonitoringTestCase):
def test_get_query_fields_function(self):
c = self._create_chart(test_data=None, configuration='histogram')
q = c.get_query(fields=['ssh', 'http2', 'apple-music'])
self.assertIn("'ssh': {'sum': {'field': 'points.fields.ssh'}}", str(q))
self.assertIn("'http2': {'sum': {'field': 'points.fields.http2'}}", str(q))
self.assertIn(
"'apple-music': {'sum': {'field': 'points.fields.apple-music'}}", str(q)
)

def test_default_query(self):
c = self._create_chart(test_data=False)
q = timeseries_db.default_chart_query(tags=True)
self.assertEqual(c.query, q)

def test_write(self):
timeseries_db.write('test_write', dict(value=2))
measurement = timeseries_db.read(key='test_write', fields='value')[0]
self.assertEqual(measurement['value'], 2)

def test_general_write(self):
m = self._create_general_metric(name='Sync test')
m.write(1)
measurement = timeseries_db.read(key='sync_test', fields='value')[0]
self.assertEqual(measurement['value'], 1)

def test_object_write(self):
om = self._create_object_metric()
om.write(3)
measurement = timeseries_db.read(
key='test_metric', fields='value', tags=om.tags
)[0]
self.assertEqual(measurement['value'], 3)

def test_general_same_key_different_fields(self):
down = self._create_general_metric(
name='traffic (download)', key='traffic', field_name='download'
)
down.write(200)
up = self._create_general_metric(
name='traffic (upload)', key='traffic', field_name='upload'
)
up.write(100)
measurement = timeseries_db.read(key='traffic', fields='download')[0]
self.assertEqual(measurement['download'], 200)
measurement = timeseries_db.read(key='traffic', fields='upload')[0]
self.assertEqual(measurement['upload'], 100)

def test_object_same_key_different_fields(self):
user = self._create_user()
user_down = self._create_object_metric(
name='traffic (download)',
key='traffic',
field_name='download',
content_object=user,
)
user_down.write(200)
user_up = self._create_object_metric(
name='traffic (upload)',
key='traffic',
field_name='upload',
content_object=user,
)
user_up.write(100)
measurement = timeseries_db.read(
key='traffic', fields='download', tags=user_down.tags
)[0]
self.assertEqual(measurement['download'], 200)
measurement = timeseries_db.read(
key='traffic', fields='upload', tags=user_up.tags
)[0]
self.assertEqual(measurement['upload'], 100)

def test_get_query_1d(self):
c = self._create_chart(test_data=None, configuration='uptime')
q = c.get_query(time='1d')
time_map = c.GROUP_MAP['1d']
self.assertIn(
"{'range': {'points.time': {'from': 'now-1d/d', 'to': 'now/d'}}}", str(q)
)
self.assertIn(f"'fixed_interval': '{time_map}'", str(q))

def test_get_query_30d(self):
c = self._create_chart(test_data=None, configuration='uptime')
q = c.get_query(time='30d')
time_map = c.GROUP_MAP['30d']
self.assertIn(
"{'range': {'points.time': {'from': 'now-30d/d', 'to': 'now/d'}}}", str(q)
)
self.assertIn(f"'fixed_interval': '{time_map}'", str(q))

def test_retention_policy(self):
manage_short_retention_policy()
rp = timeseries_db.get_list_retention_policies()
assert 'default' in rp
assert SHORT_RP in rp
days = f'{int(SHORT_RETENTION_POLICY.split("h")[0]) // 24}d'
self.assertEqual(
rp['short']['policy']['phases']['hot']['actions']['rollover']['max_age'],
days,
)

def test_get_query(self):
c = self._create_chart(test_data=False)
m = c.metric
params = dict(
field_name=m.field_name,
key=m.key,
content_type=m.content_type_key,
object_id=m.object_id,
time=c.DEFAULT_TIME,
)
expected = timeseries_db.get_query(
c.type,
params,
c.DEFAULT_TIME,
c.GROUP_MAP,
query=c.query,
timezone=settings.TIME_ZONE,
)
self.assertEqual(c.get_query(), expected)

def test_query_no_index(self):
timeseries_db.delete_metric_data(key='ping')
c = self._create_chart(test_data=False)
q = c.get_query()
self.assertEqual(timeseries_db.query(q, index='ping'), {})
self.assertEqual(timeseries_db.get_list_query(q), [])

def test_1d_chart_data(self):
c = self._create_chart()
data = c.read(time='1d')
self.assertIn('x', data)
self.assertEqual(len(data['x']), 144)
self.assertIn('traces', data)
self.assertEqual(9.0, data['traces'][0][1][-1])
# Test chart with old data has same length
m = self._create_general_metric(name='dummy')
c = self._create_chart(metric=m, test_data=False)
m.write(6.0, time=now() - timedelta(hours=23))
data = c.read(time='1d')
self.assertIn('x', data)
self.assertEqual(len(data['x']), 144)
self.assertIn('traces', data)
self.assertIn(6.0, data['traces'][0][1])

def test_delete_metric_data(self):
obj = self._create_user()
om = self._create_object_metric(name='Logins', content_object=obj)
om.write(100)
self.assertEqual(om.read()[0]['value'], 100)
timeseries_db.delete_metric_data(key=om.key, tags=om.tags)

def test_invalid_query(self):
q = timeseries_db.default_chart_query()
q['query']['nested']['query']['must'] = 'invalid'
try:
timeseries_db.validate_query(q)
except ValidationError as e:
self.assertIn('ParsingException: [bool] malformed query', str(e))

def test_non_aggregation_query(self):
q = {'query': timeseries_db.default_chart_query()['query']}
self.assertEqual(timeseries_db.get_list_query(q), [])

def test_timestamp_precision(self):
c = self._create_chart()
points = timeseries_db.get_list_query(c.get_query(), precision='ms')
self.assertIsInstance(points[0]['time'], float)
points = timeseries_db.get_list_query(c.get_query(), precision='s')
self.assertIsInstance(points[0]['time'], int)

def create_docs_single_index(self):
m = self._create_object_metric(name='dummy')
m.write(1)
d = self._create_device(organization=self._create_org())
m2 = self._create_object_metric(name='dummy', content_object=d)
m2.write(1)
self.assertEqual(len(timeseries_db.get_db.indices.get_alias(name='dummy')), 1)

def test_additional_chart_operations_setting(self):
modify_operators = {
'upload': {'operator': '/', 'value': 1000000},
'download': {'operator': '/', 'value': 1000000},
}
path = 'openwisp_monitoring.db.backends.elasticsearch.queries.ADDITIONAL_CHART_OPERATIONS'
with patch.dict(path, modify_operators, clear=True):
queries = reload(queries_module)
self.assertEqual(queries.ADDITIONAL_CHART_OPERATIONS, modify_operators)
self.assertEqual(queries.math_map['upload'], modify_operators['upload'])
self.assertEqual(queries.math_map['download'], modify_operators['download'])

def test_read(self):
c = self._create_chart()
data = c.read()
key = c.metric.field_name
self.assertIn('x', data)
self.assertIn('traces', data)
self.assertEqual(len(data['x']), 168)
charts = data['traces']
self.assertEqual(charts[0][0], key)
self.assertEqual(len(charts[0][1]), 168)
self.assertTrue(all(elem in charts[0][1] for elem in [3, 6, 9]))

def test_read_multiple(self):
c = self._create_chart(test_data=None, configuration='multiple_test')
m1 = c.metric
m2 = self._create_object_metric(
name='test metric 2',
key='test_metric',
field_name='value2',
content_object=m1.content_object,
)
now_ = now()
for n in range(0, 3):
time = now_ - timedelta(days=n)
m1.write(n + 1, time=time)
m2.write(n + 2, time=time)
data = c.read()
f1 = m1.field_name
f2 = 'value2'
self.assertIn('x', data)
self.assertIn('traces', data)
self.assertEqual(len(data['x']), 168)
charts = data['traces']
self.assertIn(f1, charts[0][0])
self.assertIn(f2, charts[1][0])
self.assertEqual(len(charts[0][1]), 168)
self.assertEqual(len(charts[1][1]), 168)
self.assertTrue(all(elem in charts[0][1] for elem in [3, 2, 1]))
self.assertTrue(all(elem in charts[1][1] for elem in [4, 3, 2]))

def test_ilm_disabled(self):
with patch.object(timeseries_db, 'ilm_enabled', False):
self.assertFalse(timeseries_db.ilm_enabled)
self.assertIsNone(
timeseries_db.create_or_alter_retention_policy(name='default')
)
self.assertIsNone(timeseries_db.get_list_retention_policies())

@patch.object(MetricDocument, 'get', side_effect=ElasticsearchException)
def test_write_retry(self, mock_write):
with self.assertRaises(TimeseriesWriteException):
timeseries_db.write('test_write', {'value': 1})
m = self._create_general_metric(name='Test metric')
with self.assertRaises(Retry):
m.write(1)

@patch.object(MetricDocument, 'get', side_effect=ElasticsearchException)
def test_timeseries_write_params(self, mock_write):
with freeze_time('Jan 14th, 2020') as frozen_datetime:
m = self._create_general_metric(name='Test metric')
with self.assertRaises(Retry) as e:
m.write(1)
frozen_datetime.tick(delta=timedelta(minutes=10))
self.assertEqual(
now(), datetime(2020, 1, 14, tzinfo=tz('UTC')) + timedelta(minutes=10)
)
task_signature = e.exception.sig
with patch.object(timeseries_db, 'write') as mock_write:
self._retry_task(task_signature)
mock_write.assert_called_with(
'test_metric',
{'value': 1},
database=None,
retention_policy=None,
tags={},
# this should be the original time at the moment of first failure
timestamp='2020-01-14T00:00:00Z',
)

def _retry_task(self, task_signature):
task_kwargs = task_signature.kwargs
task_signature.type.run(**task_kwargs)
43 changes: 38 additions & 5 deletions openwisp_monitoring/db/backends/influxdb/client.py
Original file line number Diff line number Diff line change
@@ -52,7 +52,6 @@ class DatabaseClient(object):
backend_name = 'influxdb'

def __init__(self, db_name=None):
self._db = None
self.db_name = db_name or TIMESERIES_DB['NAME']
self.client_error = InfluxDBClientError

@@ -164,7 +163,7 @@ def read(self, key, fields, tags, **kwargs):
q = f'{q} LIMIT {limit}'
return list(self.query(q, precision='s').get_points())

def get_list_query(self, query, precision='s'):
def get_list_query(self, query, precision='s', **kwargs):
return list(self.query(query, precision=precision).get_points())

def get_list_retention_policies(self):
@@ -269,16 +268,23 @@ def __transform_field(self, field, function, operation=None):

def _get_top_fields(
self,
default_query,
query,
params,
chart_type,
group_map,
number,
time,
timezone=settings.TIME_ZONE,
get_fields=True,
):
"""
Returns top fields if ``get_fields`` set to ``True`` (default)
else it returns points containing the top fields.
"""
q = default_query.replace('{field_name}', '{fields}')
q = self.get_query(
query=query,
query=q,
params=params,
chart_type=chart_type,
group_map=group_map,
@@ -287,7 +293,7 @@ def _get_top_fields(
time=time,
timezone=timezone,
)
res = list(self.query(q, precision='s').get_points())
res = self.get_list_query(q)
if not res:
return []
res = res[0]
@@ -297,4 +303,31 @@ def _get_top_fields(
keys = list(sorted_dict.keys())
keys.reverse()
top = keys[0:number]
return [item.replace('sum_', '') for item in top]
top_fields = [item.replace('sum_', '') for item in top]
if get_fields:
return top_fields
query = self.get_query(
query=query,
params=params,
chart_type=chart_type,
group_map=group_map,
summary=True,
fields=top_fields,
time=time,
timezone=timezone,
)
return self.get_list_query(query)

def default_chart_query(self, tags):
q = "SELECT {field_name} FROM {key} WHERE time >= '{time}'"
if tags:
q += " AND content_type = '{content_type}' AND object_id = '{object_id}'"
return q

def _device_data(self, key, tags, rp, **kwargs):
""" returns last snapshot of ``device_data`` """
query = (
f"SELECT data FROM {rp}.{key} WHERE pk = '{tags['pk']}' "
"ORDER BY time DESC LIMIT 1"
)
return self.get_list_query(query, precision=None)
9 changes: 0 additions & 9 deletions openwisp_monitoring/db/backends/influxdb/queries.py
Original file line number Diff line number Diff line change
@@ -58,12 +58,3 @@
)
},
}

default_chart_query = [
"SELECT {field_name} FROM {key} WHERE time >= '{time}'",
" AND content_type = '{content_type}' AND object_id = '{object_id}'",
]

device_data_query = (
"SELECT data FROM {0}.{1} WHERE pk = '{2}' " "ORDER BY time DESC LIMIT 1"
)
Empty file.
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
from datetime import datetime, timedelta
from datetime import date, datetime, timedelta
from unittest.mock import patch

from celery.exceptions import Retry
from django.conf import settings
from django.core.exceptions import ValidationError
from django.test import TestCase
from django.utils.timezone import now
@@ -15,8 +16,8 @@
from openwisp_monitoring.device.utils import SHORT_RP, manage_short_retention_policy
from openwisp_monitoring.monitoring.tests import TestMonitoringMixin

from ...exceptions import TimeseriesWriteException
from .. import timeseries_db
from ....exceptions import TimeseriesWriteException
from ... import timeseries_db

Chart = load_model('monitoring', 'Chart')
Notification = load_model('openwisp_notifications', 'Notification')
@@ -46,16 +47,6 @@ def test_get_custom_query(self):
q = c.get_query(query=custom_q, fields=['SUM(*)'])
self.assertIn('SELECT SUM(*) FROM', q)

def test_is_aggregate_bug(self):
m = self._create_object_metric(name='summary_avg')
c = Chart(metric=m, configuration='dummy')
self.assertFalse(timeseries_db._is_aggregate(c.query))

def test_is_aggregate_fields_function(self):
m = self._create_object_metric(name='is_aggregate_func')
c = Chart(metric=m, configuration='uptime')
self.assertTrue(timeseries_db._is_aggregate(c.query))

def test_get_query_fields_function(self):
c = self._create_chart(test_data=None, configuration='histogram')
q = c.get_query(fields=['ssh', 'http2', 'apple-music'])
@@ -150,21 +141,6 @@ def test_object_same_key_different_fields(self):
measurement = timeseries_db.get_list_query(q)[0]
self.assertEqual(measurement['upload'], 100)

def test_delete_metric_data(self):
m = self._create_general_metric(name='test_metric')
m.write(100)
self.assertEqual(m.read()[0]['value'], 100)
timeseries_db.delete_metric_data(key=m.key)
self.assertEqual(m.read(), [])
om = self._create_object_metric(name='dummy')
om.write(50)
m.write(100)
self.assertEqual(m.read()[0]['value'], 100)
self.assertEqual(om.read()[0]['value'], 50)
timeseries_db.delete_metric_data()
self.assertEqual(m.read(), [])
self.assertEqual(om.read(), [])

def test_get_query_1d(self):
c = self._create_chart(test_data=None, configuration='uptime')
q = c.get_query(time='1d')
@@ -197,27 +173,11 @@ def test_query_set(self):
)
self.assertEqual(c.query, expected)
self.assertEqual(
''.join(timeseries_db.queries.default_chart_query[0:2]), c._default_query
''.join(timeseries_db.default_chart_query(tags=c.metric.tags)),
c._default_query,
)
c.metric.object_id = None
self.assertEqual(timeseries_db.queries.default_chart_query[0], c._default_query)

def test_read_order(self):
m = self._create_general_metric(name='dummy')
m.write(30)
m.write(40, time=now() - timedelta(days=2))
with self.subTest('Test ascending read order'):
metric_data = m.read(limit=2, order='time')
self.assertEqual(metric_data[0]['value'], 40)
self.assertEqual(metric_data[1]['value'], 30)
with self.subTest('Test descending read order'):
metric_data = m.read(limit=2, order='-time')
self.assertEqual(metric_data[0]['value'], 30)
self.assertEqual(metric_data[1]['value'], 40)
with self.subTest('Test invalid read order'):
with self.assertRaises(timeseries_db.client_error) as e:
metric_data = m.read(limit=2, order='invalid')
self.assertIn('Invalid order "invalid" passed.', str(e))
self.assertEqual(timeseries_db.default_chart_query(tags=None), c._default_query)

def test_read_with_rp(self):
self._create_admin()
@@ -293,3 +253,59 @@ def test_timeseries_write_params(self, mock_write):
def _retry_task(self, task_signature):
task_kwargs = task_signature.kwargs
task_signature.type.run(**task_kwargs)

def test_get_query(self):
c = self._create_chart(test_data=False)
m = c.metric
now_ = now()
today = date(now_.year, now_.month, now_.day)
time = today - timedelta(days=6)
expected = c.query.format(
field_name=m.field_name,
key=m.key,
content_type=m.content_type_key,
object_id=m.object_id,
time=str(time),
)
expected = "{0} tz('{1}')".format(expected, settings.TIME_ZONE)
self.assertEqual(c.get_query(), expected)

def test_read(self):
c = self._create_chart()
data = c.read()
key = c.metric.field_name
self.assertIn('x', data)
self.assertIn('traces', data)
self.assertEqual(len(data['x']), 3)
charts = data['traces']
self.assertEqual(charts[0][0], key)
self.assertEqual(len(charts[0][1]), 3)
self.assertEqual(charts[0][1], [3, 6, 9])

def test_read_multiple(self):
c = self._create_chart(test_data=None, configuration='multiple_test')
m1 = c.metric
m2 = self._create_object_metric(
name='test metric 2',
key='test_metric',
field_name='value2',
content_object=m1.content_object,
)
now_ = now()
for n in range(0, 3):
time = now_ - timedelta(days=n)
m1.write(n + 1, time=time)
m2.write(n + 2, time=time)
data = c.read()
f1 = m1.field_name
f2 = 'value2'
self.assertIn('x', data)
self.assertIn('traces', data)
self.assertEqual(len(data['x']), 3)
charts = data['traces']
self.assertIn(f1, charts[0][0])
self.assertIn(f2, charts[1][0])
self.assertEqual(len(charts[0][1]), 3)
self.assertEqual(len(charts[1][1]), 3)
self.assertEqual(charts[0][1], [3, 2, 1])
self.assertEqual(charts[1][1], [4, 3, 2])
55 changes: 55 additions & 0 deletions openwisp_monitoring/db/tests.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
from datetime import timedelta

from django.utils.timezone import now
from swapper import load_model

from . import timeseries_db
from .backends import load_backend_module

Chart = load_model('monitoring', 'Chart')
tests = load_backend_module(module='tests.client_tests')


class TestDatabaseClient(tests.TestDatabaseClient):
def test_is_aggregate_bug(self):
m = self._create_object_metric(name='summary_avg')
c = Chart(metric=m, configuration='dummy')
self.assertFalse(timeseries_db._is_aggregate(c.query))

def test_is_aggregate_fields_function(self):
m = self._create_object_metric(name='is_aggregate_func')
c = Chart(metric=m, configuration='uptime')
self.assertTrue(timeseries_db._is_aggregate(c.query))

def test_delete_metric_data(self):
m = self._create_general_metric(name='test_metric')
m.write(100)
self.assertEqual(m.read()[0]['value'], 100)
timeseries_db.delete_metric_data(key=m.key)
self.assertEqual(m.read(), [])
om = self._create_object_metric(name='dummy')
om.write(50)
m.write(100)
self.assertEqual(m.read()[0]['value'], 100)
self.assertEqual(om.read()[0]['value'], 50)
timeseries_db.delete_metric_data()
self.assertEqual(m.read(), [])
self.assertEqual(om.read(), [])

def test_read_order(self):
timeseries_db.delete_metric_data()
m = self._create_general_metric(name='dummy')
m.write(40, time=now() - timedelta(days=2))
m.write(30)
with self.subTest('Test ascending read order'):
metric_data = m.read(limit=2, order='time')
self.assertEqual(metric_data[0]['value'], 40)
self.assertEqual(metric_data[1]['value'], 30)
with self.subTest('Test descending read order'):
metric_data = m.read(limit=2, order='-time')
self.assertEqual(metric_data[0]['value'], 30)
self.assertEqual(metric_data[1]['value'], 40)
with self.subTest('Test invalid read order'):
with self.assertRaises(timeseries_db.client_error) as e:
metric_data = m.read(limit=2, order='invalid')
self.assertIn('Invalid order "invalid" passed.', str(e))
7 changes: 4 additions & 3 deletions openwisp_monitoring/device/base/models.py
Original file line number Diff line number Diff line change
@@ -22,7 +22,7 @@

from openwisp_utils.base import TimeStampedEditableModel

from ...db import device_data_query, timeseries_db
from ...db import timeseries_db
from ...monitoring.signals import threshold_crossed
from ...monitoring.tasks import timeseries_write
from .. import settings as app_settings
@@ -104,11 +104,12 @@ def data(self):
"""
if self.__data:
return self.__data
q = device_data_query.format(SHORT_RP, self.__key, self.pk)
cache_key = get_device_cache_key(device=self, context='current-data')
points = cache.get(cache_key)
if not points:
points = timeseries_db.get_list_query(q, precision=None)
points = timeseries_db._device_data(
rp=SHORT_RP, tags={'pk': self.pk}, key=self.__key, fields='data'
)
if not points:
return None
self.data_timestamp = points[0]['time']
6 changes: 3 additions & 3 deletions openwisp_monitoring/device/tests/test_api.py
Original file line number Diff line number Diff line change
@@ -289,10 +289,10 @@ def test_get_device_metrics_csv(self):
'2',
'0.4',
'0.1',
'2.0',
'1.0',
'2',
'1',
'9.73',
'0.0',
'0',
'8.27',
],
)
37 changes: 23 additions & 14 deletions openwisp_monitoring/monitoring/base/models.py
Original file line number Diff line number Diff line change
@@ -20,7 +20,7 @@

from openwisp_utils.base import TimeStampedEditableModel

from ...db import default_chart_query, timeseries_db
from ...db import timeseries_db
from ..configuration import (
CHART_CONFIGURATION_CHOICES,
DEFAULT_COLORS,
@@ -338,10 +338,8 @@ def top_fields(self):

@property
def _default_query(self):
q = default_chart_query[0]
if self.metric.object_id:
q += default_chart_query[1]
return q
tags = True if self.metric.object_id else False
return timeseries_db.default_chart_query(tags)

def get_query(
self,
@@ -362,10 +360,10 @@ def get_top_fields(self, number):
Returns list of top ``number`` of fields (highest sum) of a
measurement in the specified time range (descending order).
"""
q = self._default_query.replace('{field_name}', '{fields}')
params = self._get_query_params(self.DEFAULT_TIME)
return timeseries_db._get_top_fields(
query=q,
default_query=self._default_query,
query=self.get_query(),
chart_type=self.type,
group_map=self.GROUP_MAP,
number=number,
@@ -410,16 +408,23 @@ def read(
try:
query_kwargs = dict(time=time, timezone=timezone)
if self.top_fields:
fields = self.get_top_fields(self.top_fields)
data_query = self.get_query(fields=fields, **query_kwargs)
summary_query = self.get_query(
fields=fields, summary=True, **query_kwargs
points = summary = timeseries_db._get_top_fields(
default_query=self._default_query,
chart_type=self.type,
group_map=self.GROUP_MAP,
number=self.top_fields,
params=self._get_query_params(self.DEFAULT_TIME),
time=time,
query=self.query,
get_fields=False,
)
else:
data_query = self.get_query(**query_kwargs)
summary_query = self.get_query(summary=True, **query_kwargs)
points = timeseries_db.get_list_query(data_query)
summary = timeseries_db.get_list_query(summary_query)
points = timeseries_db.get_list_query(data_query, key=self.metric.key)
summary = timeseries_db.get_list_query(
summary_query, key=self.metric.key
)
except timeseries_db.client_error as e:
logging.error(e, exc_info=True)
raise e
@@ -471,7 +476,11 @@ def _round(value, decimal_places):
control = 1.0 / 10 ** decimal_places
if value < control:
decimal_places += 2
return round(value, decimal_places)
value = round(value, decimal_places)
# added for Elasticsearch division outputs (traffic metric)
if isinstance(value, float) and value.is_integer():
value = int(value)
return value


class AbstractAlertSettings(TimeStampedEditableModel):
30 changes: 23 additions & 7 deletions openwisp_monitoring/monitoring/tests/__init__.py
Original file line number Diff line number Diff line change
@@ -7,6 +7,7 @@

from ...db import timeseries_db
from ...db.backends import TIMESERIES_DB
from ...db.backends.elasticsearch.queries import _make_query
from ..configuration import (
register_chart,
register_metric,
@@ -80,7 +81,10 @@
"SELECT {fields|SUM|/ 1} FROM {key} "
"WHERE time >= '{time}' AND content_type = "
"'{content_type}' AND object_id = '{object_id}'"
)
),
'elasticsearch': _make_query(
{'{field_name}': {'sum': {'field': 'points.fields.{field_name}'}}}
),
},
},
'dummy': {
@@ -97,7 +101,7 @@
'description': 'Bugged chart for testing purposes.',
'unit': 'bugs',
'order': 999,
'query': {'influxdb': "BAD"},
'query': {'influxdb': "BAD", 'elasticsearch': "BAD"},
},
'default': {
'type': 'line',
@@ -109,7 +113,8 @@
'influxdb': (
"SELECT {field_name} FROM {key} WHERE time >= '{time}' AND "
"content_type = '{content_type}' AND object_id = '{object_id}'"
)
),
'elasticsearch': _make_query(),
},
},
'multiple_test': {
@@ -122,7 +127,13 @@
'influxdb': (
"SELECT {field_name}, value2 FROM {key} WHERE time >= '{time}' AND "
"content_type = '{content_type}' AND object_id = '{object_id}'"
)
),
'elasticsearch': _make_query(
{
'{field_name}': {'sum': {'field': 'points.fields.{field_name}'}},
'value2': {'sum': {'field': 'points.fields.value2'}},
}
),
},
},
'mean_test': {
@@ -135,7 +146,8 @@
'influxdb': (
"SELECT MEAN({field_name}) AS {field_name} FROM {key} WHERE time >= '{time}' AND "
"content_type = '{content_type}' AND object_id = '{object_id}'"
)
),
'elasticsearch': _make_query(),
},
},
'sum_test': {
@@ -148,7 +160,10 @@
'influxdb': (
"SELECT SUM({field_name}) AS {field_name} FROM {key} WHERE time >= '{time}' AND "
"content_type = '{content_type}' AND object_id = '{object_id}'"
)
),
'elasticsearch': _make_query(
{'{field_name}': {'sum': {'field': 'points.fields.{field_name}'}}}
),
},
},
'top_fields_mean': {
@@ -162,7 +177,8 @@
"SELECT {fields|MEAN} FROM {key} "
"WHERE time >= '{time}' AND content_type = "
"'{content_type}' AND object_id = '{object_id}'"
)
),
'elasticsearch': _make_query(),
},
},
}
57 changes: 0 additions & 57 deletions openwisp_monitoring/monitoring/tests/test_charts.py
Original file line number Diff line number Diff line change
@@ -2,7 +2,6 @@
from datetime import date, timedelta
from unittest.mock import patch

from django.conf import settings
from django.core.exceptions import ImproperlyConfigured, ValidationError
from django.test import TestCase
from django.utils.timezone import now
@@ -25,18 +24,6 @@ class TestCharts(TestMonitoringMixin, TestCase):
Tests for functionalities related to charts
"""

def test_read(self):
c = self._create_chart()
data = c.read()
key = c.metric.field_name
self.assertIn('x', data)
self.assertIn('traces', data)
self.assertEqual(len(data['x']), 3)
charts = data['traces']
self.assertEqual(charts[0][0], key)
self.assertEqual(len(charts[0][1]), 3)
self.assertEqual(charts[0][1], [3, 6, 9])

def test_read_summary_avg(self):
m = self._create_object_metric(name='summary_avg')
c = self._create_chart(metric=m, test_data=False, configuration='mean_test')
@@ -133,34 +120,6 @@ def test_read_summary_top_fields_acid(self):
self.assertEqual(data['summary'], {'google': 87500000, 'facebook': 37503000})
self.assertEqual(c.get_top_fields(2), ['google', 'facebook'])

def test_read_multiple(self):
c = self._create_chart(test_data=None, configuration='multiple_test')
m1 = c.metric
m2 = self._create_object_metric(
name='test metric 2',
key='test_metric',
field_name='value2',
content_object=m1.content_object,
)
now_ = now()
for n in range(0, 3):
time = now_ - timedelta(days=n)
m1.write(n + 1, time=time)
m2.write(n + 2, time=time)
data = c.read()
f1 = m1.field_name
f2 = 'value2'
self.assertIn('x', data)
self.assertIn('traces', data)
self.assertEqual(len(data['x']), 3)
charts = data['traces']
self.assertIn(f1, charts[0][0])
self.assertIn(f2, charts[1][0])
self.assertEqual(len(charts[0][1]), 3)
self.assertEqual(len(charts[1][1]), 3)
self.assertEqual(charts[0][1], [3, 2, 1])
self.assertEqual(charts[1][1], [4, 3, 2])

def test_json(self):
c = self._create_chart()
data = c.read()
@@ -180,22 +139,6 @@ def test_read_bad_query(self):
else:
self.fail('ValidationError not raised')

def test_get_query(self):
c = self._create_chart(test_data=False)
m = c.metric
now_ = now()
today = date(now_.year, now_.month, now_.day)
time = today - timedelta(days=6)
expected = c.query.format(
field_name=m.field_name,
key=m.key,
content_type=m.content_type_key,
object_id=m.object_id,
time=str(time),
)
expected = "{0} tz('{1}')".format(expected, settings.TIME_ZONE)
self.assertEqual(c.get_query(), expected)

def test_description(self):
c = self._create_chart(test_data=False)
self.assertEqual(c.description, 'Dummy chart for testing purposes.')
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -5,6 +5,7 @@ openwisp-controller @ https://github.com/openwisp/openwisp-controller/tarball/ma
# is the one dictated by openwisp-controller
influxdb~=5.3
django-celery-email~=3.0.0
mac-vendor-lookup~=0.1
django-cache-memoize~=0.1
django-nested-admin~=3.3.2
swapper~=1.1
8 changes: 6 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
@@ -55,6 +55,10 @@ def get_install_requires():
include_package_data=True,
zip_safe=False,
install_requires=get_install_requires(),
extras_require={
'elasticsearch': ['elasticsearch-dsl>=7.0.0,<8.0.0'],
'influxdb': ['influxdb>=5.2,<5.3'],
},
classifiers=[
'Development Status :: 3 - Alpha',
'Environment :: Web Environment',
@@ -64,7 +68,7 @@ def get_install_requires():
'License :: OSI Approved :: GNU General Public License v3 (GPLv3)',
'Operating System :: OS Independent',
'Framework :: Django',
'Programming Language :: Python :: 2.7',
'Programming Language :: Python :: 3.4',
'Programming Language :: Python :: 3.6',
'Programming Language :: Python :: 3.7',
],
)
16 changes: 15 additions & 1 deletion tests/openwisp2/settings.py
Original file line number Diff line number Diff line change
@@ -19,7 +19,7 @@
}
}

TIMESERIES_DATABASE = {
INFLUXDB_SETTINGS = {
'BACKEND': 'openwisp_monitoring.db.backends.influxdb',
'USER': 'openwisp',
'PASSWORD': 'openwisp',
@@ -28,6 +28,20 @@
'PORT': '8086',
}

ELASTICSEARCH_SETTINGS = {
'BACKEND': 'openwisp_monitoring.db.backends.elasticsearch',
'USER': 'openwisp',
'PASSWORD': 'openwisp',
'NAME': 'openwisp2',
'HOST': os.getenv('ELASTICSEARCH_HOST', 'localhost'),
'PORT': '9200',
}

if os.environ.get('elasticsearch', False):
TIMESERIES_DATABASE = ELASTICSEARCH_SETTINGS
else:
TIMESERIES_DATABASE = INFLUXDB_SETTINGS

SECRET_KEY = 'fn)t*+$)ugeyip6-#txyy$5wf2ervc0d2n#h)qb)y5@ly$t*@w'

INSTALLED_APPS = [

0 comments on commit 044a29f

Please sign in to comment.