Source code for pandagg.node.aggs.bucket

# https://www.elastic.co/guide/en/elasticsearch/reference/current/search-aggregations-bucket.html

from typing import Any, Optional, Dict, Union, List

from pandagg.node.types import NUMERIC_TYPES
from pandagg.node.aggs.abstract import MultipleBucketAgg, UniqueBucketAgg
from pandagg.types import Meta, QueryClauseDict, RangeDict, DistanceType, ExecutionHint


[docs]class Global(UniqueBucketAgg): KEY = "global" VALUE_ATTRS = ["doc_count"] def __init__(self, **body: Any) -> None: super(Global, self).__init__(**body)
[docs]class Filter(UniqueBucketAgg): KEY = "filter" VALUE_ATTRS = ["doc_count"] def __init__( self, filter: Optional[QueryClauseDict] = None, meta: Optional[Meta] = None, **body: Any ): if (filter is not None) != (not body): raise ValueError( 'Filter aggregation requires exactly one of "filter" or "body"' ) if filter: filter_ = filter.copy() else: filter_ = body.copy() super(Filter, self).__init__(meta=meta, **filter_)
[docs]class MatchAll(Filter): def __init__(self, **body: Any): super(MatchAll, self).__init__(filter={"match_all": {}}, **body)
[docs]class Nested(UniqueBucketAgg): KEY = "nested" VALUE_ATTRS = ["doc_count"] WHITELISTED_MAPPING_TYPES = ["nested"] def __init__(self, path: str, **body: Any): self.path: str = path super(Nested, self).__init__(path=path, **body)
[docs]class ReverseNested(UniqueBucketAgg): KEY = "reverse_nested" VALUE_ATTRS = ["doc_count"] WHITELISTED_MAPPING_TYPES = ["nested"] def __init__(self, path: Optional[str] = None, **body: Any) -> None: self.path: Optional[str] = path super(ReverseNested, self).__init__(path=path, **body)
[docs]class Missing(UniqueBucketAgg): KEY = "missing" VALUE_ATTRS = ["doc_count"] def __init__(self, field: str, **body: Any) -> None: self.field: str = field super(Missing, self).__init__(field=field, **body)
[docs]class Sampler(UniqueBucketAgg): KEY = "sampler" VALUE_ATTRS = ["doc_count"] def __init__(self, shard_size: Optional[int] = None, **body: Any) -> None: super(Sampler, self).__init__(shard_size=shard_size, **body)
[docs]class DiversifiedSampler(UniqueBucketAgg): KEY = "diversified_sampler" VALUE_ATTRS = ["doc_count"] def __init__( self, field: str, shard_size: Optional[int], max_docs_per_value: Optional[int] = None, execution_hint: Optional[ExecutionHint] = None, **body: Any ) -> None: """ https://www.elastic.co/guide/en/elasticsearch/reference/current/search-aggregations-bucket-diversified-sampler-aggregation.html """ self.field = field super(DiversifiedSampler, self).__init__( shard_size=shard_size, field=field, max_docs_per_value=max_docs_per_value, execution_hint=execution_hint, **body )
[docs]class Children(UniqueBucketAgg): KEY = "children" VALUE_ATTRS = ["doc_count"] def __init__(self, type: str, **body: Any) -> None: super(Children, self).__init__(type=type, **body)
[docs]class Parent(UniqueBucketAgg): KEY = "parent" VALUE_ATTRS = ["doc_count"] def __init__(self, type: str, **body: Any) -> None: super(Parent, self).__init__(type=type, **body)
[docs]class Terms(MultipleBucketAgg): """Terms aggregation.""" KEY = "terms" VALUE_ATTRS = ["doc_count", "doc_count_error_upper_bound", "sum_other_doc_count"] def __init__( self, field: str, missing: Optional[Union[int, str]] = None, size: Optional[int] = None, **body: Any ) -> None: self.field: str = field super(Terms, self).__init__(field=field, missing=missing, size=size, **body)
[docs] def is_convertible_to_composite_source(self) -> bool: # TODO: elasticsearch documentation is unclear about which body clauses are accepted as a source, for now just # sure that 'include'/'exclude' are not supported as composite source: # https://github.com/elastic/elasticsearch/issues/50368 if "include" in self.body or "exclude" in self.body: return False return True
[docs]class Filters(MultipleBucketAgg): KEY = "filters" VALUE_ATTRS = ["doc_count"] DEFAULT_OTHER_KEY = "_other_" IMPLICIT_KEYED = True def __init__( self, filters: Dict[str, QueryClauseDict], other_bucket: bool = False, other_bucket_key: Optional[str] = None, **body: Any ) -> None: super(Filters, self).__init__( filters=filters, other_bucket=other_bucket, other_bucket_key=other_bucket_key, **body )
[docs]class AdjacencyMatrix(MultipleBucketAgg): KEY = "adjacency_matrix" VALUE_ATTRS = ["doc_count"] def __init__( self, filters: Dict[str, QueryClauseDict], separator: Optional[str] = None, **body: Any ) -> None: super(AdjacencyMatrix, self).__init__( filters=filters, separator=separator, **body )
[docs]class Histogram(MultipleBucketAgg): KEY = "histogram" VALUE_ATTRS = ["doc_count"] WHITELISTED_MAPPING_TYPES = NUMERIC_TYPES def __init__(self, field: str, interval: int, **body: Any) -> None: self.field: str = field super(Histogram, self).__init__(field=field, interval=interval, **body)
[docs] def is_convertible_to_composite_source(self) -> bool: return True
[docs]class DateHistogram(MultipleBucketAgg): KEY = "date_histogram" VALUE_ATTRS = ["doc_count"] WHITELISTED_MAPPING_TYPES = ["date"] def __init__( self, field: str, interval: str = None, calendar_interval: str = None, fixed_interval: str = None, key_as_string: bool = True, **body: Any ) -> None: """Date Histogram aggregation. :param key_as_string: if True extracted key of bucket will be the formatted date Note: interval is deprecated from 7.2 in favor of calendar_interval and fixed interval """ self.field: str = field if not (interval or fixed_interval or calendar_interval): raise ValueError( 'One of "interval", "calendar_interval" or "fixed_interval" must be provided.' ) super(DateHistogram, self).__init__( field=field, interval=interval, calendar_interval=calendar_interval, fixed_interval=fixed_interval, key_as_string=key_as_string, **body )
[docs] def is_convertible_to_composite_source(self) -> bool: return True
[docs]class VariableWidthHistogram(MultipleBucketAgg): KEY = "variable_width_histogram" VALUE_ATTRS = ["doc_count", "min", "max"] def __init__(self, field: str, buckets: int, **body: Any) -> None: """ https://www.elastic.co/guide/en/elasticsearch/reference/current/search-aggregations-bucket-variablewidthhistogram-aggregation.html Note: This aggregation cannot currently be nested under any aggregation that collects from more than a single bucket. """ self.field = field super(VariableWidthHistogram, self).__init__( field=field, buckets=buckets, **body )
[docs]class AutoDateHistogram(MultipleBucketAgg): KEY = "auto_date_histogram" VALUE_ATTRS = ["doc_count"] def __init__( self, field: str, buckets: Optional[int] = None, format: Optional[str] = None, time_zone: Optional[str] = None, minimum_interval: Optional[str] = None, missing: Optional[str] = None, key_as_string: bool = True, **body: Any ) -> None: self.field: str = field super(AutoDateHistogram, self).__init__( field=field, buckets=buckets, format=format, time_zone=time_zone, minimum_interval=minimum_interval, missing=missing, key_as_string=key_as_string, **body )
[docs]class Range(MultipleBucketAgg): KEY = "range" VALUE_ATTRS = ["doc_count"] WHITELISTED_MAPPING_TYPES = NUMERIC_TYPES def __init__( self, field: str, ranges: List[RangeDict], keyed: bool = False, **body: Any ) -> None: self.field: str = field super(Range, self).__init__(field=field, ranges=ranges, keyed=keyed, **body)
[docs]class DateRange(Range): KEY = "date_range" VALUE_ATTRS = ["doc_count"] WHITELISTED_MAPPING_TYPES = ["date"]
[docs]class IPRange(Range): KEY = "ip_range" VALUE_ATTRS = ["doc_count"] WHITELISTED_MAPPING_TYPES = ["ip"]
[docs]class GeoDistance(Range): KEY = "geo_distance" VALUE_ATTRS = ["doc_count"] WHITELISTED_MAPPING_TYPES = ["geo_point"] def __init__( self, field: str, origin: str, ranges: List[RangeDict], unit: Optional[str] = None, distance_type: Optional[DistanceType] = None, keyed: bool = False, **body: Any ) -> None: super(Range, self).__init__( field=field, origin=origin, ranges=ranges, unit=unit, distance_type=distance_type, keyed=keyed, **body )
[docs]class GeoHashGrid(MultipleBucketAgg): KEY = "geohash_grid" VALUE_ATTRS = ["doc_count"] WHITELISTED_MAPPING_TYPES = ["geo_point", "geo_shape"] def __init__( self, field: str, precision: Optional[int] = None, bounds: Optional[Dict] = None, size: Optional[int] = None, shard_size: Optional[int] = None, **body: Any ) -> None: self.field = field super(GeoHashGrid, self).__init__( field=field, precision=precision, bounds=bounds, size=size, shard_size=shard_size, **body )
[docs]class GeoTileGrid(MultipleBucketAgg): KEY = "geotile_grid" VALUE_ATTRS = ["doc_count"] WHITELISTED_MAPPING_TYPES = ["geo_point", "geo_shape"] def __init__( self, field: str, precision: Optional[int] = None, bounds: Optional[Dict] = None, size: Optional[int] = None, shard_size: Optional[int] = None, **body: Any ) -> None: self.field = field super(GeoTileGrid, self).__init__( field=field, precision=precision, bounds=bounds, size=size, shard_size=shard_size, **body )
[docs]class SignificantTerms(MultipleBucketAgg): KEY = "significant_terms" VALUE_ATTRS = ["doc_count", "score", "bg_count"] def __init__(self, field: str, **body: Any) -> None: """ https://www.elastic.co/guide/en/elasticsearch/reference/current/search-aggregations-bucket-significantterms-aggregation.html """ self.field = field super(SignificantTerms, self).__init__(field=field, **body)
[docs]class SignificantText(MultipleBucketAgg): KEY = "significant_text" VALUE_ATTRS = ["doc_count", "score", "bg_count"] WHITELISTED_MAPPING_TYPES = ["text"] def __init__(self, field: str, **body: Any) -> None: """ https://www.elastic.co/guide/en/elasticsearch/reference/current/search-aggregations-bucket-significanttext-aggregation.html """ self.field = field super(SignificantText, self).__init__(field=field, **body)
[docs]class RareTerms(MultipleBucketAgg): KEY = "rare_terms" VALUE_ATTRS = ["doc_count"] def __init__( self, field: str, max_doc_count: Optional[int] = None, precision: Optional[float] = None, include: Optional[Union[str, List[str]]] = None, exclude: Optional[Union[str, List[str]]] = None, missing: Optional[Any] = None, **body: Any ) -> None: """ https://www.elastic.co/guide/en/elasticsearch/reference/current/search-aggregations-bucket-rare-terms-aggregation.html """ self.field = field super(RareTerms, self).__init__( field=field, max_doc_count=max_doc_count, precision=precision, include=include, exclude=exclude, missing=missing, **body )
[docs]class MultiTerms(MultipleBucketAgg): KEY = "multi_terms" VALUE_ATTRS = ["doc_count", "doc_count_error_upper_bound", "sum_other_doc_count"] def __init__(self, terms: List[Dict], **body: Any) -> None: """ https://www.elastic.co/guide/en/elasticsearch/reference/current/search-aggregations-bucket-multi-terms-aggregation.html """ super(MultiTerms, self).__init__(terms=terms, key_as_string=True, **body)