#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""Not implemented aggregations include:
- children agg
- geo-distance
- geo-hash grid
- ipv4
- sampler
- significant terms
"""
from pandagg.node.types import NUMERIC_TYPES
from pandagg.node.aggs.abstract import MultipleBucketAgg, UniqueBucketAgg
[docs]class Global(UniqueBucketAgg):
KEY = "global"
VALUE_ATTRS = ["doc_count"]
def __init__(self, meta=None):
super(Global, self).__init__(agg_body={}, meta=meta)
[docs] def get_filter(self, key):
return None
[docs]class Filter(UniqueBucketAgg):
KEY = "filter"
VALUE_ATTRS = ["doc_count"]
def __init__(self, filter=None, meta=None, **body):
if (filter is not None) != (not body):
raise ValueError(
'Filter aggregation requires exactly one of "filter" or "body"'
)
if filter:
filter_ = filter.copy()
else:
filter_ = body.copy()
self.filter = filter_
super(Filter, self).__init__(meta=meta, **filter_)
[docs] def get_filter(self, key):
return self.filter
[docs]class MatchAll(Filter):
def __init__(self, meta=None, **body):
super(MatchAll, self).__init__(filter={"match_all": {}}, meta=meta, **body)
[docs]class Nested(UniqueBucketAgg):
KEY = "nested"
VALUE_ATTRS = ["doc_count"]
WHITELISTED_MAPPING_TYPES = ["nested"]
def __init__(self, path, meta=None, **body):
self.path = path
super(Nested, self).__init__(path=path, meta=meta, **body)
[docs] def get_filter(self, key):
return None
[docs]class ReverseNested(UniqueBucketAgg):
KEY = "reverse_nested"
VALUE_ATTRS = ["doc_count"]
WHITELISTED_MAPPING_TYPES = ["nested"]
def __init__(self, path=None, meta=None, **body):
self.path = path
body_kwargs = dict(body)
if path:
body_kwargs["path"] = path
super(ReverseNested, self).__init__(meta=meta, **body_kwargs)
[docs] def get_filter(self, key):
return None
[docs]class Missing(UniqueBucketAgg):
KEY = "missing"
VALUE_ATTRS = ["doc_count"]
BLACKLISTED_MAPPING_TYPES = []
def __init__(self, field, meta=None, **body):
super(UniqueBucketAgg, self).__init__(field=field, meta=meta, **body)
[docs] def get_filter(self, key):
return {"bool": {"must_not": {"exists": {"field": self.field}}}}
[docs]class Terms(MultipleBucketAgg):
"""Terms aggregation."""
KEY = "terms"
VALUE_ATTRS = ["doc_count", "doc_count_error_upper_bound", "sum_other_doc_count"]
BLACKLISTED_MAPPING_TYPES = []
def __init__(self, field, missing=None, size=None, meta=None, **body):
self.field = field
self.missing = missing
self.size = size
body_kwargs = dict(body)
if missing is not None:
body_kwargs["missing"] = missing
if size is not None:
body_kwargs["size"] = size
super(Terms, self).__init__(field=field, meta=meta, **body_kwargs)
[docs] def get_filter(self, key):
"""Provide filter to get documents belonging to document of given key."""
if key == "missing":
return {"bool": {"must_not": {"exists": {"field": self.field}}}}
return {"term": {self.field: {"value": key}}}
[docs]class Filters(MultipleBucketAgg):
KEY = "filters"
VALUE_ATTRS = ["doc_count"]
DEFAULT_OTHER_KEY = "_other_"
IMPLICIT_KEYED = True
def __init__(
self, filters, other_bucket=False, other_bucket_key=None, meta=None, **body
):
self.filters = filters
self.other_bucket = other_bucket
self.other_bucket_key = other_bucket_key
body_kwargs = dict(body)
if other_bucket:
body_kwargs["other_bucket"] = other_bucket
if other_bucket_key:
body_kwargs["other_bucket_key"] = other_bucket_key
super(Filters, self).__init__(filters=filters, meta=meta, **body_kwargs)
[docs] def get_filter(self, key):
"""Provide filter to get documents belonging to document of given key."""
if key in self.filters.keys():
return self.filters[key]
if self.other_bucket:
if key == self.DEFAULT_OTHER_KEY or key == self.other_bucket_key:
return {
"bool": {
"must_not": {"bool": {"should": list(self.filters.values())}}
}
}
raise ValueError("Unkown <%s> key" % key)
[docs]class Histogram(MultipleBucketAgg):
KEY = "histogram"
VALUE_ATTRS = ["doc_count"]
WHITELISTED_MAPPING_TYPES = NUMERIC_TYPES
def __init__(self, field, interval, meta=None, **body):
self.field = field
self.interval = interval
super(Histogram, self).__init__(
field=field, interval=interval, meta=meta, **body
)
[docs] def get_filter(self, key):
try:
key = float(key)
except (TypeError, ValueError):
raise ValueError(
"Filter key of an histogram aggregation must be numeric, git <%s> of type <%s>"
% (key, type(key))
)
return {"range": {self.field: {"gte": key, "lt": key + self.interval}}}
[docs]class DateHistogram(MultipleBucketAgg):
KEY = "date_histogram"
VALUE_ATTRS = ["doc_count"]
WHITELISTED_MAPPING_TYPES = ["date"]
# interval is deprecated from 7.2 in favor of calendar_interval and fixed interval
def __init__(
self,
field,
interval=None,
calendar_interval=None,
fixed_interval=None,
meta=None,
keyed=False,
key_as_string=True,
**body
):
"""Date Histogram aggregation.
Note: interval is deprecated from 7.2 in favor of calendar_interval and fixed interval
:param keyed: defines returned buckets format: if True, as dict.
:param key_as_string: if True extracted key of bucket will be the formatted date (applicable if keyed=False)
"""
self.field = field
if not (interval or fixed_interval or calendar_interval):
raise ValueError(
'One of "interval", "calendar_interval" or "fixed_interval" must be provided.'
)
if interval:
body["interval"] = interval
if calendar_interval:
body["calendar_interval"] = calendar_interval
if fixed_interval:
body["fixed_interval"] = fixed_interval
self.interval = interval or calendar_interval or fixed_interval
super(DateHistogram, self).__init__(
field=field,
meta=meta,
keyed=keyed,
key_path="key_as_string" if key_as_string else "key",
**body
)
[docs] def get_filter(self, key):
# https://www.elastic.co/guide/en/elasticsearch/reference/current/common-options.html#date-math
return {
"range": {self.field: {"gte": key, "lt": "%s||+%s" % (key, self.interval)}}
}
[docs]class Range(MultipleBucketAgg):
KEY = "range"
VALUE_ATTRS = ["doc_count"]
WHITELISTED_MAPPING_TYPES = NUMERIC_TYPES
KEY_SEP = "-"
def __init__(self, field, ranges, keyed=False, meta=None, **body):
self.field = field
self.ranges = ranges
body_kwargs = dict(body)
if keyed:
self.bucket_key_suffix = "_as_string"
else:
self.bucket_key_suffix = None
super(Range, self).__init__(
field=field, ranges=ranges, meta=meta, keyed=keyed, **body_kwargs
)
@property
def from_key(self):
if self.bucket_key_suffix:
return "from%s" % self.bucket_key_suffix
return "from"
@property
def to_key(self):
if self.bucket_key_suffix:
return "to%s" % self.bucket_key_suffix
return "to"
def _extract_bucket_key(self, bucket):
if self.from_key in bucket:
key = "%s%s" % (bucket[self.from_key], self.KEY_SEP)
else:
key = "*-"
if self.to_key in bucket:
key += str(bucket[self.to_key])
else:
key += "*"
return key
[docs] def get_filter(self, key):
from_, to_ = key.split(self.KEY_SEP)
inner = {}
if from_ != "*":
inner["gte"] = from_
if to_ != "*":
inner["lt"] = to_
return {"range": {self.field: inner}}
[docs]class DateRange(Range):
KEY = "date_range"
VALUE_ATTRS = ["doc_count"]
WHITELISTED_MAPPING_TYPES = ["date"]
# cannot use range '-' separator since some keys contain it
KEY_SEP = "::"
def __init__(self, field, key_as_string=True, meta=None, **body):
self.key_as_string = key_as_string
super(DateRange, self).__init__(field=field, keyed=True, meta=meta, **body)
[docs]class Composite(MultipleBucketAgg):
KEY = "composite"
[docs] def get_filter(self, key):
raise NotImplementedError()