Source code for pandagg.search

# adapted from elasticsearch-dsl/search.py
import copy
import json

from elasticsearch.helpers import scan

from pandagg.connections import get_connection
from pandagg.query import Bool
from pandagg.response import Response
from pandagg.tree.mappings import _mappings
from pandagg.tree.query import Query, ADD
from pandagg.tree.aggs import Aggs
from pandagg.utils import DSLMixin


[docs]class Request(object): def __init__(self, using, index=None): self._using = using self._index = None if isinstance(index, (tuple, list)): self._index = list(index) elif index: self._index = [index] self._params = {}
[docs] def params(self, **kwargs): """ Specify query params to be used when executing the search. All the keyword arguments will override the current values. See https://elasticsearch-py.readthedocs.io/en/master/api.html#elasticsearch.Elasticsearch.search for all available parameters. Example:: s = Search() s = s.params(routing='user-1', preference='local') """ from_ = kwargs.pop("from_", None) if from_ is not None: kwargs["from"] = from_ s = self._clone() s._params.update(kwargs) return s
[docs] def index(self, *index): """ Set the index for the search. If called empty it will remove all information. Example: s = Search() s = s.index('twitter-2015.01.01', 'twitter-2015.01.02') s = s.index(['twitter-2015.01.01', 'twitter-2015.01.02']) """ # .index() resets s = self._clone() if not index: s._index = None else: indexes = [] for i in index: if isinstance(i, str): indexes.append(i) elif isinstance(i, list): indexes += i elif isinstance(i, tuple): indexes += list(i) s._index = (self._index or []) + indexes return s
[docs] def using(self, client): """ Associate the search request with an elasticsearch client. A fresh copy will be returned with current instance remaining unchanged. :arg client: an instance of ``elasticsearch.Elasticsearch`` to use or an alias to look up in ``elasticsearch_dsl.connections`` """ s = self._clone() s._using = client return s
def _clone(self): s = self.__class__(using=self._using, index=self._index) s._params = self._params.copy() return s def __copy__(self): return self._clone()
[docs]class MultiSearch(Request): """ Combine multiple :class:`~elasticsearch_dsl.Search` objects into a single request. """ def __init__(self, **kwargs): super(MultiSearch, self).__init__(**kwargs) self._searches = [] def __getitem__(self, key): return self._searches[key] def __iter__(self): return iter(self._searches) def _clone(self): ms = super(MultiSearch, self)._clone() ms._searches = self._searches[:] return ms
[docs] def add(self, search): """ Adds a new :class:`~elasticsearch_dsl.Search` object to the request:: ms = MultiSearch(index='my-index') ms = ms.add(Search(doc_type=Category).filter('term', category='python')) ms = ms.add(Search(doc_type=Blog)) """ ms = self._clone() ms._searches.append(search) return ms
[docs] def to_dict(self): out = [] for s in self._searches: meta = {} if s._index: meta["index"] = s._index meta.update(s._params) out.append(meta) out.append(s.to_dict()) return out
[docs] def execute(self): """ Execute the multi search request and return a list of search results. """ es = get_connection(self._using) return es.msearch(index=self._index, body=self.to_dict(), **self._params)
def __eq__(self, other): return ( isinstance(other, Search) and other._index == self._index and other.to_dict() == self.to_dict() ) def __repr__(self): return json.dumps(self.to_dict(), indent=2)