Source code for pandagg.search

# adapted from elasticsearch-dsl/search.py

from __future__ import annotations

import copy
import json
from typing import (
    Optional,
    Union,
    Tuple,
    List,
    Any,
    TypeVar,
    Dict,
    Iterator,
    TYPE_CHECKING,
)

from elasticsearch import Elasticsearch
from elasticsearch.helpers import scan

from pandagg.node.aggs.abstract import TypeOrAgg
from pandagg.query import Bool
from pandagg.response import SearchResponse, Hit, Aggregations
from pandagg.tree.mappings import _mappings, Mappings
from pandagg.tree.query import (
    Query,
    ADD,
    TypeOrQuery,
    InsertionModes,
    SingleOrMultipleQueryClause,
)
from pandagg.tree.aggs import Aggs, AggsDictOrNode
from pandagg.types import (
    MappingsDict,
    QueryName,
    ClauseBody,
    AggName,
    SearchResponseDict,
    DeleteByQueryResponse,
    SearchDict,
    BucketDict,
    AfterKey,
)
from pandagg.utils import DSLMixin

if TYPE_CHECKING:
    import pandas as pd
    from pandagg.document import DocumentMeta

# because Search.bool method shadows bool typing
bool_ = bool

T = TypeVar("T", bound="Request")


[docs]class Request: def __init__( self: T, using: Optional[Elasticsearch], index: Optional[Union[str, Tuple[str], List[str]]] = None, ) -> None: self._using: Optional[Elasticsearch] = using self._index: Optional[List[str]] = None if isinstance(index, (tuple, list)): self._index = list(index) elif index: self._index = [index] self._params: Dict[str, Any] = {} def _get_connection(self) -> Elasticsearch: if self._using is None: raise ValueError( "An Elasticsearch client must be provided in order to execute queries." ) return self._using
[docs] def params(self: T, **kwargs: Any) -> T: """ Specify query params to be used when executing the search. All the keyword arguments will override the current values. See https://elasticsearch-py.readthedocs.io/en/master/api.html#elasticsearch.Elasticsearch.search for all available parameters. Example:: s = Search() s = s.params(routing='user-1', preference='local') """ from_ = kwargs.pop("from_", None) if from_ is not None: kwargs["from"] = from_ s = self._clone() s._params.update(kwargs) return s
[docs] def index(self: T, *index: Union[str, List[str], Tuple[str]]) -> T: """ Set the index for the search. If called empty it will remove all information. Example: s = Search() s = s.index('twitter-2015.01.01', 'twitter-2015.01.02') s = s.index(['twitter-2015.01.01', 'twitter-2015.01.02']) """ # .index() resets s = self._clone() if not index: s._index = None else: indexes = [] for i in index: if isinstance(i, str): indexes.append(i) elif isinstance(i, list): indexes += i elif isinstance(i, tuple): indexes += list(i) s._index = (self._index or []) + indexes return s
[docs] def using(self: T, client: Elasticsearch) -> T: """ Associate the search request with an elasticsearch client. A fresh copy will be returned with current instance remaining unchanged. :arg client: an instance of ``elasticsearch.Elasticsearch`` to use or an alias to look up in ``elasticsearch_dsl.connections`` """ s = self._clone() s._using = client return s
def _clone(self: T) -> T: s = self.__class__(using=self._using, index=self._index) s._params = self._params.copy() return s def __copy__(self: T) -> T: return self._clone()
[docs]class MultiSearch(Request): """ Combine multiple :class:`~elasticsearch_dsl.Search` objects into a single request. """ def __init__( self, using: Optional[Elasticsearch], index: Optional[Union[str, Tuple[str], List[str]]] = None, ) -> None: super(MultiSearch, self).__init__(using=using, index=index) self._searches: List[Search] = [] def __getitem__(self, key: int) -> Search: return self._searches[key] def __iter__(self) -> Iterator[Search]: return iter(self._searches) def _clone(self) -> "MultiSearch": ms = MultiSearch(using=self._using, index=self._index) ms._params = self._params.copy() ms._searches = self._searches[:] return ms
[docs] def add(self: "MultiSearch", search: Search) -> "MultiSearch": """ Adds a new :class:`~elasticsearch_dsl.Search` object to the request:: ms = MultiSearch(index='my-index') ms = ms.add(Search(doc_type=Category).filter('term', category='python')) ms = ms.add(Search(doc_type=Blog)) """ ms = self._clone() ms._searches.append(search) return ms
[docs] def to_dict(self) -> List[Union[Dict, SearchDict]]: out: List[Union[Dict, SearchDict]] = [] s: Search for s in self._searches: meta = {} if s._index: meta["index"] = s._index meta.update(s._params) out.append(meta) out.append(s.to_dict()) return out
[docs] def execute(self) -> List[SearchResponseDict]: """ Execute the multi search request and return a list of search results. """ es = self._get_connection() return es.msearch(index=self._index, body=self.to_dict(), **self._params)
def __eq__(self, other: Any) -> bool: return ( isinstance(other, Search) and other._index == self._index and other.to_dict() == self.to_dict() ) def __repr__(self) -> str: return json.dumps(self.to_dict(), indent=2)