Skip to content

Commit

Permalink
Merge pull request #67 from alk-lbinet/clarifications
Browse files Browse the repository at this point in the history
Clarifications
  • Loading branch information
alk-lbinet authored Apr 19, 2021
2 parents b49cda3 + 75699d1 commit 6b9a393
Show file tree
Hide file tree
Showing 23 changed files with 396 additions and 481 deletions.
9 changes: 7 additions & 2 deletions pandagg/aggs.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
from pandagg.tree.aggs.aggs import Aggs

from pandagg.node.aggs.bucket import (
Terms,
Filters,
Expand All @@ -11,8 +9,11 @@
ReverseNested,
Range,
Missing,
MatchAll,
)

from pandagg.node.aggs.composite import Composite

from pandagg.node.aggs.metric import (
Avg,
Max,
Expand Down Expand Up @@ -46,6 +47,8 @@
SerialDiff,
)

from pandagg.tree.aggs import Aggs

__all__ = [
"Aggs",
"Terms",
Expand Down Expand Up @@ -85,4 +88,6 @@
"BucketSelector",
"BucketSort",
"SerialDiff",
"MatchAll",
"Composite",
]
4 changes: 2 additions & 2 deletions pandagg/interactive/_field_agg_factory.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-

from pandagg.node.aggs.abstract import AggNode
from pandagg.node.aggs.abstract import AggClause

from pandagg.node.types import MAPPING_TYPES

Expand All @@ -13,7 +13,7 @@ def list_available_aggs_on_field(field_type):
"""
return [
agg_class
for agg_class in AggNode._classes.values()
for agg_class in AggClause._classes.values()
if agg_class.valid_on_field_type(field_type)
]

Expand Down
2 changes: 1 addition & 1 deletion pandagg/interactive/response.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

from lighttree import TreeBasedObj

from pandagg.tree.aggs.aggs import Aggs
from pandagg.tree.aggs import Aggs


class IResponse(TreeBasedObj):
Expand Down
2 changes: 1 addition & 1 deletion pandagg/node/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
from .query import *
from .aggs import *
from .mapping import *
from .query import *
1 change: 1 addition & 0 deletions pandagg/node/aggs/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from .bucket import *
from .composite import *
from .metric import *
from .pipeline import *
68 changes: 58 additions & 10 deletions pandagg/node/aggs/abstract.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,57 @@

from __future__ import unicode_literals
from builtins import str as text
from six import text_type

import json

from pandagg.node._node import Node


class AggNode(Node):
def A(name, type_or_agg=None, **body):
"""Accept multiple syntaxes, return a AggNode instance.
:param type_or_agg:
:param body:
:return: AggNode
"""
if isinstance(type_or_agg, text_type):
# _translate_agg("per_user", "terms", field="user")
return AggClause._get_dsl_class(type_or_agg)(**body)
if isinstance(type_or_agg, AggClause):
# _translate_agg("per_user", Terms(field='user'))
if body:
raise ValueError(
'Body cannot be added using "AggNode" declaration, got %s.' % body
)
return type_or_agg
if isinstance(type_or_agg, dict):
# _translate_agg("per_user", {"terms": {"field": "user"}})
if body:
raise ValueError(
'Body cannot be added using "dict" agg declaration, got %s.' % body
)
type_or_agg = type_or_agg.copy()
children_aggs = (
type_or_agg.pop("aggs", None) or type_or_agg.pop("aggregations", None) or {}
)
if len(type_or_agg) != 1:
raise ValueError(
"Invalid aggregation declaration (two many keys): got <%s>"
% type_or_agg
)
type_, body_ = type_or_agg.popitem()
body_ = body_.copy()
if children_aggs:
body_["aggs"] = children_aggs
return AggClause._get_dsl_class(type_)(**body_)
if type_or_agg is None:
# if type_or_agg is not provided, by default execute a terms aggregation
# _translate_agg("per_user")
return AggClause._get_dsl_class("terms")(field=name, **body)
raise ValueError('"type_or_agg" must be among "dict", "AggNode", "str"')


class AggClause(Node):
"""Wrapper around elasticsearch aggregation concept.
https://www.elastic.co/guide/en/elasticsearch/reference/2.3/search-aggregations.html
Expand All @@ -29,7 +73,7 @@ def __init__(self, meta=None, **body):
self.body = body
self.meta = meta
self._children = {}
super(AggNode, self).__init__(identifier=identifier)
super(AggClause, self).__init__(identifier=identifier)

def line_repr(self, depth, **kwargs):
# root node
Expand Down Expand Up @@ -108,13 +152,13 @@ def __str__(self):
)

def __eq__(self, other):
if isinstance(other, AggNode):
if isinstance(other, AggClause):
return other.to_dict() == self.to_dict()
# make sure we still equal to a dict with the same data
return other == self.to_dict()


class Root(AggNode):
class Root(AggClause):
"""Not a real aggregation. Just the initial empty dict."""

KEY = "_root"
Expand All @@ -130,7 +174,7 @@ def extract_bucket_value(cls, response, value_as_dict=False):
return None


class MetricAgg(AggNode):
class MetricAgg(AggClause):
"""Metric aggregation are aggregations providing a single bucket, with value attributes to be extracted."""

VALUE_ATTRS = None
Expand All @@ -142,7 +186,7 @@ def get_filter(self, key):
return None


class BucketAggNode(AggNode):
class BucketAggNode(AggClause):
"""Bucket aggregation have special abilities: they can encapsulate other aggregations as children.
Each time, the extracted value is a 'doc_count'.
Expand All @@ -168,7 +212,7 @@ def __init__(self, meta=None, **body):
self.body = body
self.meta = meta
self._children = body.pop("aggs", None) or body.pop("aggregations", None) or {}
super(AggNode, self).__init__(identifier=identifier)
super(AggClause, self).__init__(identifier=identifier)

def extract_buckets(self, response_value):
raise NotImplementedError()
Expand Down Expand Up @@ -233,9 +277,13 @@ class FieldOrScriptMetricAgg(MetricAgg):

VALUE_ATTRS = None

def __init__(self, meta=None, **body):
self.field = body.get("field")
self.script = body.get("script")
def __init__(self, field=None, script=None, meta=None, **body):
self.field = field
self.script = script
if field is not None:
body["field"] = field
if script is not None:
body["script"] = script
super(FieldOrScriptMetricAgg, self).__init__(meta=meta, **body)


Expand Down
12 changes: 6 additions & 6 deletions pandagg/node/aggs/bucket.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,15 @@ class Filter(UniqueBucketAgg):
KEY = "filter"
VALUE_ATTRS = ["doc_count"]

def __init__(self, filter=None, meta=None, **kwargs):
if (filter is not None) != (not kwargs):
def __init__(self, filter=None, meta=None, **body):
if (filter is not None) != (not body):
raise ValueError(
'Filter aggregation requires exactly one of "filter" or "kwargs"'
'Filter aggregation requires exactly one of "filter" or "body"'
)
if filter:
filter_ = filter.copy()
else:
filter_ = kwargs.copy()
filter_ = body.copy()
self.filter = filter_
super(Filter, self).__init__(meta=meta, **filter_)

Expand All @@ -49,8 +49,8 @@ def get_filter(self, key):


class MatchAll(Filter):
def __init__(self, meta=None):
super(MatchAll, self).__init__(filter={"match_all": {}}, meta=meta)
def __init__(self, meta=None, **body):
super(MatchAll, self).__init__(filter={"match_all": {}}, meta=meta, **body)


class Nested(UniqueBucketAgg):
Expand Down
41 changes: 41 additions & 0 deletions pandagg/node/aggs/composite.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
from .abstract import BucketAggNode


class Composite(BucketAggNode):
def __init__(self, sources, size=None, after_key=None, meta=None, **body):
"""https://www.elastic.co/guide/en/elasticsearch/reference/current/search-aggregations-bucket-composite-aggregation.html
:param sources:
:param size:
:param after_key:
:param meta:
:param body:
"""
self._sources = sources
self._size = size
self._after_key = after_key
self._children = body.pop("aggs", None) or body.pop("aggregations", None) or {}
if size is not None:
body["size"] = size
if after_key is not None:
body["after_key"] = after_key
super(Composite, self).__init__(meta=meta, sources=sources, **body)

def extract_buckets(self, response_value):
for bucket in response_value["buckets"]:
yield bucket["key"], bucket

def get_filter(self, key):
"""In composite aggregation, key is a map, source name -> value"""
if not key:
return
conditions = []
for source in self._sources:
name, agg = source.popitem()
if name not in key:
continue
agg_type, agg_body = source.popitem()
agg_instance = self._get_dsl_class(agg_type)(**agg_body)
conditions.append(agg_instance.get_filter(key=key[name]))
if not conditions:
return
return {"bool": {"filter": conditions}}
36 changes: 36 additions & 0 deletions pandagg/node/query/abstract.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,45 @@
from builtins import str as text
import json

from six import text_type

from pandagg.node._node import Node


def Q(type_or_query=None, **body):
"""Accept multiple syntaxes, return a QueryClause node.
:param type_or_query:
:param body:
:return: QueryClause
"""
if isinstance(type_or_query, QueryClause):
if body:
raise ValueError(
'Body cannot be added using "QueryClause" declaration, got %s.' % body
)
return type_or_query

if isinstance(type_or_query, dict):
if body:
raise ValueError(
'Body cannot be added using "dict" query clause declaration, got %s.'
% body
)
type_or_query = type_or_query.copy()
# {"term": {"some_field": 1}}
# {"bool": {"filter": [{"term": {"some_field": 1}}]}}
if len(type_or_query) != 1:
raise ValueError(
"Invalid query clause declaration (two many keys): got <%s>"
% type_or_query
)
type_, body_ = type_or_query.popitem()
return QueryClause._get_dsl_class(type_)(**body_)
if isinstance(type_or_query, text_type):
return QueryClause._get_dsl_class(type_or_query)(**body)
raise ValueError('"type_or_query" must be among "dict", "AggNode", "str"')


class QueryClause(Node):
_type_name = "query"
KEY = None
Expand Down
2 changes: 1 addition & 1 deletion pandagg/query.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
from pandagg.tree.query.abstract import Query
from pandagg.node.query.shape import Shape
from pandagg.node.query.term_level import (
Exists,
Expand Down Expand Up @@ -42,6 +41,7 @@
Wrapper,
)
from pandagg.node.query.specialized_compound import ScriptScore, PinnedQuery
from pandagg.tree.query import Query

__all__ = [
"Query",
Expand Down
6 changes: 1 addition & 5 deletions pandagg/response.py
Original file line number Diff line number Diff line change
Expand Up @@ -260,11 +260,7 @@ def _grouping_agg(self, name=None):
return child_key, child_node
return key, node
# if use of groupby method in Aggs class, use groupby pointer
if self._aggs._groupby_ptr is not None:
return self._aggs.get(self._aggs._groupby_ptr)

id_ = self._aggs.deepest_linear_bucket_agg
return self._aggs.get(id_)
return self._aggs.get(self._aggs._groupby_ptr)

def to_tabular(
self,
Expand Down
6 changes: 3 additions & 3 deletions pandagg/search.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@
from pandagg.query import Bool
from pandagg.response import Response
from pandagg.tree.mapping import _mapping
from pandagg.tree.query.abstract import Query, ADD
from pandagg.tree.aggs.aggs import Aggs
from pandagg.tree.query import Query, ADD
from pandagg.tree.aggs import Aggs
from pandagg.utils import DSLMixin


Expand Down Expand Up @@ -264,7 +264,7 @@ def exclude(self, type_or_query, insert_below=None, on=None, mode=ADD, **body):
"""Must not wrapped in filter context."""
s = self._clone()
s._query = s._query.filter(
Bool(must_not=Query._translate_query(type_or_query=type_or_query, **body)),
Bool(must_not=Query._q(type_or_query=type_or_query, **body)),
insert_below=insert_below,
on=on,
mode=mode,
Expand Down
Loading

0 comments on commit 6b9a393

Please sign in to comment.