Source code for pandagg.node.aggs.bucket

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""Not implemented aggregations include:
- children agg
- geo-distance
- geo-hash grid
- ipv4
- sampler
- significant terms
"""


from pandagg.node.types import NUMERIC_TYPES
from pandagg.node.aggs.abstract import MultipleBucketAgg, UniqueBucketAgg


[docs]class Global(UniqueBucketAgg): KEY = "global" VALUE_ATTRS = ["doc_count"] def __init__(self, meta=None): super(Global, self).__init__(agg_body={}, meta=meta)
[docs] def get_filter(self, key): return None
[docs]class Filter(UniqueBucketAgg): KEY = "filter" VALUE_ATTRS = ["doc_count"] def __init__(self, filter=None, meta=None, **body): if (filter is not None) != (not body): raise ValueError( 'Filter aggregation requires exactly one of "filter" or "body"' ) if filter: filter_ = filter.copy() else: filter_ = body.copy() self.filter = filter_ super(Filter, self).__init__(meta=meta, **filter_)
[docs] def get_filter(self, key): return self.filter
[docs]class MatchAll(Filter): def __init__(self, meta=None, **body): super(MatchAll, self).__init__(filter={"match_all": {}}, meta=meta, **body)
[docs]class Nested(UniqueBucketAgg): KEY = "nested" VALUE_ATTRS = ["doc_count"] WHITELISTED_MAPPING_TYPES = ["nested"] def __init__(self, path, meta=None, **body): self.path = path super(Nested, self).__init__(path=path, meta=meta, **body)
[docs] def get_filter(self, key): return None
[docs]class ReverseNested(UniqueBucketAgg): KEY = "reverse_nested" VALUE_ATTRS = ["doc_count"] WHITELISTED_MAPPING_TYPES = ["nested"] def __init__(self, path=None, meta=None, **body): self.path = path body_kwargs = dict(body) if path: body_kwargs["path"] = path super(ReverseNested, self).__init__(meta=meta, **body_kwargs)
[docs] def get_filter(self, key): return None
[docs]class Missing(UniqueBucketAgg): KEY = "missing" VALUE_ATTRS = ["doc_count"] BLACKLISTED_MAPPING_TYPES = [] def __init__(self, field, meta=None, **body): super(UniqueBucketAgg, self).__init__(field=field, meta=meta, **body)
[docs] def get_filter(self, key): return {"bool": {"must_not": {"exists": {"field": self.field}}}}
[docs]class Terms(MultipleBucketAgg): """Terms aggregation.""" KEY = "terms" VALUE_ATTRS = ["doc_count", "doc_count_error_upper_bound", "sum_other_doc_count"] BLACKLISTED_MAPPING_TYPES = [] def __init__(self, field, missing=None, size=None, meta=None, **body): self.field = field self.missing = missing self.size = size body_kwargs = dict(body) if missing is not None: body_kwargs["missing"] = missing if size is not None: body_kwargs["size"] = size super(Terms, self).__init__(field=field, meta=meta, **body_kwargs)
[docs] def get_filter(self, key): """Provide filter to get documents belonging to document of given key.""" if key == "missing": return {"bool": {"must_not": {"exists": {"field": self.field}}}} return {"term": {self.field: {"value": key}}}
[docs]class Filters(MultipleBucketAgg): KEY = "filters" VALUE_ATTRS = ["doc_count"] DEFAULT_OTHER_KEY = "_other_" IMPLICIT_KEYED = True def __init__( self, filters, other_bucket=False, other_bucket_key=None, meta=None, **body ): self.filters = filters self.other_bucket = other_bucket self.other_bucket_key = other_bucket_key body_kwargs = dict(body) if other_bucket: body_kwargs["other_bucket"] = other_bucket if other_bucket_key: body_kwargs["other_bucket_key"] = other_bucket_key super(Filters, self).__init__(filters=filters, meta=meta, **body_kwargs)
[docs] def get_filter(self, key): """Provide filter to get documents belonging to document of given key.""" if key in self.filters.keys(): return self.filters[key] if self.other_bucket: if key == self.DEFAULT_OTHER_KEY or key == self.other_bucket_key: return { "bool": { "must_not": {"bool": {"should": list(self.filters.values())}} } } raise ValueError("Unkown <%s> key" % key)
[docs]class Histogram(MultipleBucketAgg): KEY = "histogram" VALUE_ATTRS = ["doc_count"] WHITELISTED_MAPPING_TYPES = NUMERIC_TYPES def __init__(self, field, interval, meta=None, **body): self.field = field self.interval = interval super(Histogram, self).__init__( field=field, interval=interval, meta=meta, **body )
[docs] def get_filter(self, key): try: key = float(key) except (TypeError, ValueError): raise ValueError( "Filter key of an histogram aggregation must be numeric, git <%s> of type <%s>" % (key, type(key)) ) return {"range": {self.field: {"gte": key, "lt": key + self.interval}}}
[docs]class DateHistogram(MultipleBucketAgg): KEY = "date_histogram" VALUE_ATTRS = ["doc_count"] WHITELISTED_MAPPING_TYPES = ["date"] # interval is deprecated from 7.2 in favor of calendar_interval and fixed interval def __init__( self, field, interval=None, calendar_interval=None, fixed_interval=None, meta=None, keyed=False, key_as_string=True, **body ): """Date Histogram aggregation. Note: interval is deprecated from 7.2 in favor of calendar_interval and fixed interval :param keyed: defines returned buckets format: if True, as dict. :param key_as_string: if True extracted key of bucket will be the formatted date (applicable if keyed=False) """ self.field = field if not (interval or fixed_interval or calendar_interval): raise ValueError( 'One of "interval", "calendar_interval" or "fixed_interval" must be provided.' ) if interval: body["interval"] = interval if calendar_interval: body["calendar_interval"] = calendar_interval if fixed_interval: body["fixed_interval"] = fixed_interval self.interval = interval or calendar_interval or fixed_interval super(DateHistogram, self).__init__( field=field, meta=meta, keyed=keyed, key_path="key_as_string" if key_as_string else "key", **body )
[docs] def get_filter(self, key): # https://www.elastic.co/guide/en/elasticsearch/reference/current/common-options.html#date-math return { "range": {self.field: {"gte": key, "lt": "%s||+%s" % (key, self.interval)}} }
[docs]class Range(MultipleBucketAgg): KEY = "range" VALUE_ATTRS = ["doc_count"] WHITELISTED_MAPPING_TYPES = NUMERIC_TYPES KEY_SEP = "-" def __init__(self, field, ranges, keyed=False, meta=None, **body): self.field = field self.ranges = ranges body_kwargs = dict(body) if keyed: self.bucket_key_suffix = "_as_string" else: self.bucket_key_suffix = None super(Range, self).__init__( field=field, ranges=ranges, meta=meta, keyed=keyed, **body_kwargs ) @property def from_key(self): if self.bucket_key_suffix: return "from%s" % self.bucket_key_suffix return "from" @property def to_key(self): if self.bucket_key_suffix: return "to%s" % self.bucket_key_suffix return "to" def _extract_bucket_key(self, bucket): if self.from_key in bucket: key = "%s%s" % (bucket[self.from_key], self.KEY_SEP) else: key = "*-" if self.to_key in bucket: key += str(bucket[self.to_key]) else: key += "*" return key
[docs] def get_filter(self, key): from_, to_ = key.split(self.KEY_SEP) inner = {} if from_ != "*": inner["gte"] = from_ if to_ != "*": inner["lt"] = to_ return {"range": {self.field: inner}}
[docs]class DateRange(Range): KEY = "date_range" VALUE_ATTRS = ["doc_count"] WHITELISTED_MAPPING_TYPES = ["date"] # cannot use range '-' separator since some keys contain it KEY_SEP = "::" def __init__(self, field, key_as_string=True, meta=None, **body): self.key_as_string = key_as_string super(DateRange, self).__init__(field=field, keyed=True, meta=meta, **body)
[docs]class Composite(MultipleBucketAgg): KEY = "composite"
[docs] def get_filter(self, key): raise NotImplementedError()