#!/usr/bin/env python
# -*- coding: utf-8 -*-
import copy
from future.utils import iterkeys, iteritems
from pandagg.interactive.response import IResponse
from pandagg.node.aggs.abstract import UniqueBucketAgg, MetricAgg, Root
from pandagg.node.aggs.bucket import Nested, ReverseNested
from pandagg.tree.response import AggsResponseTree
[docs]class Response:
def __init__(self, data, search):
self.data = data
self.__search = search
self.took = data["took"]
self.timed_out = data["timed_out"]
self._shards = data["_shards"]
self.hits = Hits(data["hits"])
self.aggregations = Aggregations(
data.get("aggregations", {}), search=self.__search
)
self.profile = data.get("profile")
def __iter__(self):
return iter(self.hits)
@property
def success(self):
return (
self._shards["total"] == self._shards["successful"] and not self.timed_out
)
def __len__(self):
return len(self.hits)
def __repr__(self):
return (
"<Response> took %dms, success: %s, total result %s, contains %s hits"
% (self.took, self.success, self.hits._total_repr(), len(self.hits))
)
[docs]class Hits:
def __init__(self, hits):
self.data = hits
self.total = hits["total"]
self.hits = [Hit(hit) for hit in hits.get("hits", [])]
self.max_score = hits["max_score"]
def __len__(self):
return len(self.hits)
def __iter__(self):
return iter(self.hits)
def _total_repr(self):
if not isinstance(self.total, dict):
return str(self.total)
if self.total.get("relation") == "eq":
return str(self.total["value"])
if self.total.get("relation") == "gte":
return ">=%d" % self.total["value"]
raise ValueError("Invalid total %s" % self.total)
[docs] def to_dataframe(self, expand_source=True, source_only=True):
"""
Return hits as pandas dataframe.
Requires pandas dependency.
:param expand_source: if True, `_source` sub-fields are expanded as columns
:param source_only: if True, doesn't include hit metadata (except id which is used as dataframe index)
"""
try:
import pandas as pd
except ImportError:
raise ImportError(
'Using dataframe output format requires to install pandas. Please install "pandas" or '
"use another output format."
)
hits = self.data.get("hits", [])
if not hits:
return pd.DataFrame()
if not expand_source:
return pd.DataFrame(hits).set_index("_id")
flattened_hits = []
for hit in hits:
hit_metadata = hit.copy()
hit_source = hit_metadata.pop("_source")
if source_only:
hit_source["_id"] = hit_metadata["_id"]
else:
hit_source.update(hit_metadata)
flattened_hits.append(hit_source)
return pd.DataFrame(flattened_hits).set_index("_id")
def __repr__(self):
if not isinstance(self.total, dict):
total_repr = str(self.total)
elif self.total.get("relation") == "eq":
total_repr = str(self.total["value"])
elif self.total.get("relation") == "gte":
total_repr = ">%d" % self.total["value"]
else:
raise ValueError("Invalid total %s" % self.total)
return "<Hits> total: %s, contains %d hits" % (total_repr, len(self.hits))
[docs]class Hit:
def __init__(self, data):
self.data = data
self._source = data.get("_source")
self._score = data.get("_score")
self._id = data.get("_id")
self._type = data.get("_type")
self._index = data.get("_index")
def __repr__(self):
return "<Hit %s> score=%.2f" % (self._id, self._score)
[docs]class Aggregations:
def __init__(self, data, search):
self.data = data
self.__search = search
@property
def _aggs(self):
return self.__search._aggs
@property
def _query(self):
return self.__search._query
@property
def _client(self):
return self.__search._using
@property
def _index(self):
return self.__search._index
[docs] def keys(self):
return self.data.keys()
[docs] def get(self, key):
return self.data[key]
def _parse_group_by(
self,
response,
until,
row=None,
agg_name=None,
ancestors=None,
row_as_tuple=False,
with_single_bucket_groups=False,
):
"""
Recursive parsing of succession of grouping aggregation clauses.
Yields each row for which last bucket aggregation generated buckets.
"""
# initialization: cache ancestors once for faster computation, that will be passed as arguments to downside
# recursive calls
if ancestors is None:
until_id = self._aggs.id_from_key(until)
ancestors = self._aggs.ancestors(until_id, include_current=True)
# remove root (not an aggregation clause)
ancestors = [(k, n) for k, n in ancestors[:-1]]
agg_name, agg_node = ancestors[-1]
if agg_name not in response:
return
if not row:
row = [] if row_as_tuple else {}
agg_node = [n for k, n in ancestors if k == agg_name][0]
for key, raw_bucket in agg_node.extract_buckets(response[agg_name]):
sub_row = copy.copy(row)
if not isinstance(agg_node, UniqueBucketAgg) or with_single_bucket_groups:
if row_as_tuple:
sub_row.append(key)
else:
sub_row[agg_name] = key
if agg_name == until:
# end real yield
if row_as_tuple:
yield tuple(sub_row), raw_bucket
else:
yield sub_row, raw_bucket
elif agg_name in {k for k, _ in ancestors}:
# yield children
for child_key, _ in self._aggs.children(agg_node.identifier):
for nrow, nraw_bucket in self._parse_group_by(
row=sub_row,
response=raw_bucket,
agg_name=child_key,
until=until,
row_as_tuple=row_as_tuple,
ancestors=ancestors,
):
yield nrow, nraw_bucket
def _normalize_buckets(self, agg_response, agg_name=None):
"""
Recursive function to parse aggregation response as a normalized entities.
Each response bucket is represented as a dict with keys (key, level, value, children)::
{
"level": "owner.id",
"key": 35,
"value": 235,
"children": [
]
}
"""
agg_name = agg_name or self._aggs.root
id_ = self._aggs.id_from_key(agg_name)
agg_key, agg_node = self._aggs.get(id_)
agg_children = self._aggs.children(agg_node.identifier)
for key, raw_bucket in agg_node.extract_buckets(agg_response[agg_name]):
result = {
"level": agg_name,
"key": key,
"value": agg_node.extract_bucket_value(raw_bucket),
}
normalized_children = [
normalized_child
for child_key, child in agg_children
for normalized_child in self._normalize_buckets(
agg_name=child_key, agg_response=raw_bucket
)
]
if normalized_children:
result["children"] = normalized_children
yield result
def _grouping_agg(self, name=None):
"""
Return aggregation node that used as grouping node.
Note: in case there is only a nested aggregation below that node, group-by nested clause.
"""
if name is not None:
# override existing groupby_ptr
id_ = self._aggs.id_from_key(name)
if not self._aggs._is_eligible_grouping_node(id_):
raise ValueError(
"Cannot group by <%s>, not a valid grouping aggregation" % name
)
key, node = self._aggs.get(id_)
else:
key, node = self._aggs.get(self._aggs._groupby_ptr)
# if parent of single nested clause and nested_autocorrect
if self._aggs.nested_autocorrect:
children = self._aggs.children(node.identifier)
if len(children) == 1:
child_key, child_node = children[0]
if isinstance(child_node, (Nested, ReverseNested)):
return child_key, child_node
return key, node
[docs] def to_tabular(
self,
index_orient=True,
grouped_by=None,
expand_columns=True,
expand_sep="|",
normalize=True,
with_single_bucket_groups=False,
):
"""
Build tabular view of ES response grouping levels (rows) until 'grouped_by' aggregation node included is
reached, and using children aggregations of grouping level as values for each of generated groups (columns).
Suppose an aggregation of this shape (A & B bucket aggregations)::
A──> B──> C1
├──> C2
└──> C3
With grouped_by='B', breakdown ElasticSearch response (tree structure), into a tabular structure of this shape::
C1 C2 C3
A B
wood blue 10 4 0
red 7 5 2
steel blue 1 9 0
red 23 4 2
:param index_orient: if True, level-key samples are returned as tuples, else in a dictionnary
:param grouped_by: name of the aggregation node used as last grouping level
:param normalize: if True, normalize columns buckets
:return: index_names, values
"""
grouping_key, grouping_agg = self._grouping_agg(grouped_by)
if grouping_key is None:
index_values = [(tuple() if index_orient else dict(), self.data)]
index_names = []
else:
index_names = [
k
for k, a in self._aggs.ancestors(
grouping_agg.identifier, from_root=True, include_current=True
)
if (not isinstance(a, UniqueBucketAgg) or with_single_bucket_groups)
and k is not None
]
index_values = list(
self._parse_group_by(
response=self.data,
row_as_tuple=index_orient,
until=grouping_key,
with_single_bucket_groups=with_single_bucket_groups,
)
)
if not index_values:
return [], []
if index_orient:
rows = {
row_index: self._serialize_columns(
row_values,
normalize=normalize,
total_agg=grouping_agg,
expand_columns=expand_columns,
expand_sep=expand_sep,
)
for row_index, row_values in index_values
}
else:
rows = [
dict(
row_index,
**self._serialize_columns(
row_values,
normalize=normalize,
total_agg=grouping_agg,
expand_columns=expand_columns,
expand_sep=expand_sep,
)
)
for row_index, row_values in index_values
]
return index_names, rows
def _serialize_columns(
self, row_data, normalize, expand_columns, expand_sep, total_agg=None
):
# extract value (usually 'doc_count') of grouping agg node
result = {}
if total_agg is not None and not isinstance(total_agg, Root):
result[total_agg.VALUE_ATTRS[0]] = total_agg.extract_bucket_value(row_data)
grouping_agg_children = self._aggs.children(total_agg.identifier)
else:
grouping_agg_children = self._aggs.children(self._aggs.root)
# extract values of children, one columns per child
for child_key, child in grouping_agg_children:
if isinstance(child, (UniqueBucketAgg, MetricAgg)):
result[child_key] = child.extract_bucket_value(row_data[child_key])
elif expand_columns:
for key, bucket in child.extract_buckets(row_data[child_key]):
result[
"%s%s%s" % (child_key, expand_sep, key)
] = child.extract_bucket_value(bucket)
elif normalize:
result[child_key] = next(
self._normalize_buckets(row_data, child_key), None
)
else:
result[child_key] = row_data[child_key]
return result
[docs] def to_dataframe(
self, grouped_by=None, normalize_children=True, with_single_bucket_groups=False
):
try:
import pandas as pd
except ImportError:
raise ImportError(
'Using dataframe output format requires to install pandas. Please install "pandas" or '
"use another output format."
)
index_names, rows = self.to_tabular(
index_orient=True,
grouped_by=grouped_by,
normalize=normalize_children,
with_single_bucket_groups=with_single_bucket_groups,
)
index, values = zip(*iteritems(rows))
if not index:
return pd.DataFrame()
if len(index[0]) == 0:
index = (None,) * len(index)
else:
index = pd.MultiIndex.from_tuples(index, names=index_names)
return pd.DataFrame(index=index, data=list(values))
[docs] def to_normalized(self):
children = []
for k in sorted(iterkeys(self.data)):
for child in self._normalize_buckets(self.data, k):
children.append(child)
return {"level": "root", "key": None, "value": None, "children": children}
[docs] def to_tree(self):
return AggsResponseTree(aggs=self._aggs).parse(self.data)
[docs] def to_interactive_tree(self):
return IResponse(tree=self.to_tree(), search=self.__search, depth=1)
[docs] def serialize(self, output="tabular", **kwargs):
"""
:param output: output format, one of "raw", "tree", "interactive_tree", "normalized", "tabular", "dataframe"
:param kwargs: tabular serialization kwargs
:return:
"""
if output == "raw":
return self.data
elif output == "tree":
return self.to_tree()
elif output == "interactive_tree":
return self.to_interactive_tree()
elif output == "normalized":
return self.to_normalized()
elif output == "tabular":
return self.to_tabular(**kwargs)
elif output == "dataframe":
return self.to_dataframe(**kwargs)
else:
raise NotImplementedError("Unknown %s output format." % output)
def __repr__(self):
if not self.keys():
return "<Aggregations> empty"
return "<Aggregations> %s" % list(map(str, self.keys()))