import json
from pandagg.node._node import Node
from typing import Optional, List, Union, Dict, Any, Tuple, Iterator, Type
from pandagg.types import (
Meta,
BucketKey,
BucketDict,
AggClauseDict,
AggType,
Script,
GapPolicy,
AggName,
AggClauseResponseDict,
BucketsWrapperDict,
BucketKeyAtom,
BucketsDict,
)
[docs]class AggClause(Node):
"""
Wrapper around elasticsearch aggregation concept.
https://www.elastic.co/guide/en/elasticsearch/reference/2.3/search-aggregations.html
Each aggregation can be seen both a Node that can be encapsulated in a parent agg.
Define a method to build aggregation request.
"""
_classes: Dict[AggType, Type["AggClause"]]
_type_name = "agg"
KEY: str
VALUE_ATTRS: List[str]
WHITELISTED_MAPPING_TYPES: List[str]
BLACKLISTED_MAPPING_TYPES: List[str]
def __init__(
self, meta: Optional[Meta] = None, identifier: Optional[str] = None, **body: Any
) -> None:
# remove empty keys from body, make __init__ clearer
self.body: Dict[str, Any] = {k: v for k, v in body.items() if v is not None}
self.meta: Optional[Meta] = meta
self._children: Dict[AggName, Any] = {}
super(AggClause, self).__init__(identifier=identifier)
[docs] def line_repr(self, depth: int, **kwargs: Any) -> Tuple[str, str]:
# root node
if not self.KEY:
return "_", ""
repr_args = [str(self.KEY)]
if self.body:
repr_args.append(self._params_repr(self.body))
unnamed = "<%s>" % ", ".join(repr_args)
return "", unnamed
@staticmethod
def _params_repr(params: Dict[str, Any]) -> str:
params = params or {}
return ", ".join(
"%s=%s" % (str(k), str(json.dumps(params[k], sort_keys=True)))
for k in sorted(params.keys())
)
[docs] @classmethod
def valid_on_field_type(cls, field_type: str) -> bool:
if hasattr(cls, "WHITELISTED_MAPPING_TYPES"):
return field_type in cls.WHITELISTED_MAPPING_TYPES
if hasattr(cls, "BLACKLISTED_MAPPING_TYPES"):
return field_type not in cls.BLACKLISTED_MAPPING_TYPES
# by default laxist
# TODO - constraint to only allowed types
return True
[docs] def to_dict(self) -> AggClauseDict:
"""
ElasticSearch aggregation queries follow this formatting::
{
"<aggregation_name>" : {
"<aggregation_type>" : {
<aggregation_body>
}
[,"meta" : { [<meta_data_body>] } ]?
}
}
to_dict() returns the following part (without aggregation name)::
{
"<aggregation_type>" : {
<aggregation_body>
}
[,"meta" : { [<meta_data_body>] } ]?
}
"""
if self.KEY is None:
raise ValueError("For typing only")
aggs = {self.KEY: self.body}
if self.meta:
aggs["meta"] = self.meta
return aggs
[docs] def is_convertible_to_composite_source(self) -> bool:
return False
def __str__(self) -> str:
return "<{class_}, type={type}, body={body}>".format(
class_=str(self.__class__.__name__),
type=str(self.KEY),
body=json.dumps(self.body),
)
def __eq__(self, other: Any) -> bool:
if isinstance(other, AggClause):
return other.to_dict() == self.to_dict()
# make sure we still equal to a dict with the same data
return other == self.to_dict()
TypeOrAgg = Union[AggType, AggClauseDict, AggClause]
[docs]def A(name: str, type_or_agg: Optional[TypeOrAgg] = None, **body: Any) -> AggClause:
"""
Accept multiple syntaxes, return a AggNode instance.
:param name: aggregation clause name
:param type_or_agg:
:param body:
:return: AggNode
"""
if isinstance(type_or_agg, str):
# _translate_agg("per_user", "terms", field="user")
return AggClause.get_dsl_class(type_or_agg)(**body)
if isinstance(type_or_agg, AggClause):
# _translate_agg("per_user", Terms(field='user'))
if body:
raise ValueError(
'Body cannot be added using "AggNode" declaration, got %s.' % body
)
return type_or_agg
if isinstance(type_or_agg, dict):
# _translate_agg("per_user", {"terms": {"field": "user"}})
if body:
raise ValueError(
'Body cannot be added using "dict" agg declaration, got %s.' % body
)
type_or_agg = type_or_agg.copy()
children_aggs = (
type_or_agg.pop("aggs", None) or type_or_agg.pop("aggregations", None) or {}
)
if len(type_or_agg) != 1:
raise ValueError(
"Invalid aggregation declaration (two many keys): got <%s>"
% type_or_agg
)
type_, body_ = type_or_agg.popitem()
body_ = body_.copy()
if children_aggs:
body_["aggs"] = children_aggs
return AggClause.get_dsl_class(type_)(**body_)
if type_or_agg is None:
# if type_or_agg is not provided, by default execute a terms aggregation
# _translate_agg("per_user")
return AggClause.get_dsl_class("terms")(field=name, **body)
raise ValueError('"type_or_agg" must be among "dict", "AggNode", "str"')
[docs]class Root(AggClause):
"""
Not a real aggregation. Just the initial empty dict (used as lighttree.Tree root).
"""
KEY: str = "_root"
[docs] def line_repr(self, depth: int, **kwargs: Any) -> Tuple[str, str]:
return "_", ""
[docs]class MetricAgg(AggClause):
"""
Metric aggregation are aggregations providing a single bucket, with value attributes to be extracted.
"""
[docs]class BucketAggClause(AggClause):
"""
Bucket aggregation have special abilities: they can encapsulate other aggregations as children.
Each time, the extracted value is a 'doc_count'.
Provide methods:
- to build aggregation request (with children aggregations)
- to to extract buckets from raw response
- to build query to filter documents belonging to that bucket
Note: the aggs attribute's only purpose is for children initiation with the following syntax:
>>> from pandagg.aggs import Terms, Avg
>>> agg = Terms(
>>> field='some_path',
>>> aggs={
>>> 'avg_agg': Avg(field='some_other_path')
>>> }
>>> )
"""
def __init__(self, **body: Any) -> None:
identifier: Optional[str] = body.pop("identifier", None)
aggs = body.pop("aggs", None) or body.pop("aggregations", None)
super(BucketAggClause, self).__init__(identifier=identifier, **body)
self._children: Dict[AggName, Any] = aggs or {} # type: ignore
[docs]class UniqueBucketAgg(BucketAggClause):
"""Aggregations providing a single bucket."""
[docs]class MultipleBucketAgg(BucketAggClause):
IMPLICIT_KEYED: bool = False
def __init__(
self, keyed: bool = False, key_as_string: bool = False, **body: Any
) -> None:
"""
Aggregation that return either a list or a map of buckets.
If keyed, ES buckets are expected as dict, else as list (in this case key_path is used to extract key from each
list item).
"""
# keyed has another meaning in lighttree Node
self.keyed_: bool = keyed or self.IMPLICIT_KEYED
self.key_path: str = "key_as_string" if key_as_string else "key"
if keyed and not self.IMPLICIT_KEYED:
body["keyed"] = keyed
super(MultipleBucketAgg, self).__init__(**body)
def _extract_bucket_key(self, bucket: BucketDict) -> BucketKey:
return bucket[self.key_path]
[docs]class FieldOrScriptMetricAgg(MetricAgg):
"""
Metric aggregation based on single field.
"""
def __init__(
self, field: Optional[str] = None, script: Optional[Script] = None, **body: Any
) -> None:
self.field: Optional[str] = field
super(FieldOrScriptMetricAgg, self).__init__(field=field, script=script, **body)
[docs]class Pipeline(UniqueBucketAgg):
def __init__(
self, buckets_path: str, gap_policy: Optional[GapPolicy] = None, **body: Any
) -> None:
super(Pipeline, self).__init__(
buckets_path=buckets_path, gap_policy=gap_policy, **body
)
[docs]class ScriptPipeline(Pipeline):
VALUE_ATTRS: List[str] = ["value"]
def __init__(
self,
script: Script,
buckets_path: str,
gap_policy: Optional[GapPolicy] = None,
**body: Any
) -> None:
super(ScriptPipeline, self).__init__(
buckets_path=buckets_path, gap_policy=gap_policy, script=script, **body
)