from __future__ import annotations
from typing_extensions import TypedDict
from typing import Optional, Union, Any, List, Dict, TYPE_CHECKING
from lighttree.node import NodeId
from lighttree import Tree
from pandagg.node.aggs.abstract import AggClause
from pandagg.node.mappings import Object, Nested
from pandagg.node.mappings.abstract import Field, RegularField, ComplexField, Root
from pandagg.exceptions import (
AbsentMappingFieldError,
InvalidOperationMappingFieldError,
)
from pandagg.tree._tree import TreeReprMixin
from pandagg.types import DocSource, MappingsDict, FieldName, FieldClauseDict
if TYPE_CHECKING:
from pandagg.document import DocumentSource
FieldPropertiesDictOrNode = Dict[FieldName, Union[FieldClauseDict, Field]]
[docs]class MappingsDictOrNode(TypedDict, total=False):
properties: FieldPropertiesDictOrNode
dynamic: bool
def _mappings(
m: Optional[Union[MappingsDict, MappingsDictOrNode, "Mappings"]]
) -> Optional["Mappings"]:
if m is None:
return None
if isinstance(m, dict):
return Mappings(**m)
if isinstance(m, Mappings):
return m
raise TypeError("Unsupported %s type for Mappings" % type(m))
[docs]class Mappings(TreeReprMixin, Tree[Field]):
def __init__(
self,
properties: Optional[FieldPropertiesDictOrNode] = None,
dynamic: Optional[bool] = None,
**body: Any
) -> None:
super(Mappings, self).__init__()
# a Mappings always has a root after __init__
self.root: str
root_node = Root(dynamic=dynamic, **body)
self.insert_node(node=root_node)
if properties:
self._insert(
pid=root_node.identifier, properties=properties, is_subfield=False
)
[docs] def to_dict(
self, from_: Optional[NodeId] = None, depth: Optional[int] = None
) -> MappingsDict:
"""
Serialize Mappings as dict.
:param from_: identifier of a field, if provided, limits serialization to this field and its
children (used for recursion, shouldn't be useful)
:param depth: integer, if provided, limit the serialization to a given depth
:return: dict
"""
from_ = self.root if from_ is None else from_
key, node = self.get(from_)
children_queries = {}
if depth is None or depth > 0:
if depth is not None:
depth -= 1
for child_key, child_node in self.children(node.identifier):
if child_node._source_only:
continue
children_queries[child_key] = self.to_dict(
from_=child_node.identifier, depth=depth
)
serialized_node = node.to_dict()
if children_queries:
if isinstance(node, Root) or node.KEY in ("object", "nested"):
serialized_node["properties"] = children_queries
else:
serialized_node["fields"] = children_queries
return serialized_node
[docs] def validate_agg_clause(self, agg_clause: AggClause, exc: bool = True) -> bool:
"""
Ensure that if aggregation clause relates to a field (`field` or `path`) this field exists in mappings, and that
required aggregation type is allowed on this kind of field.
:param agg_clause: AggClause you want to validate on these mappings
:param exc: boolean, if set to True raise exception if invalid
:rtype: boolean
"""
if hasattr(agg_clause, "path"):
agg_path: Optional[str] = agg_clause.path # type: ignore
if agg_path is None:
# reverse nested
return True
try:
# nested
self.get_node_id_by_path(agg_path.split("."))
return True
except Exception:
return False
if not hasattr(agg_clause, "field"):
return True
agg_field: str = agg_clause.field # type: ignore
# TODO take into account flattened data type
try:
nid = self.get_node_id_by_path(agg_field.split("."))
except Exception:
raise AbsentMappingFieldError(
u"Agg of type <%s> on non-existing field <%s>."
% (agg_clause.KEY, agg_field)
)
_, field_node = self.get(nid)
field_type = field_node.KEY
if not agg_clause.valid_on_field_type(field_type):
if not exc:
return False
raise InvalidOperationMappingFieldError(
u"Agg of type <%s> not possible on field of type <%s>."
% (agg_clause.KEY, field_type)
)
return True
[docs] def mapping_type_of_field(self, field_path: str) -> str:
"""
Return field type of provided field path.
>>> mappings = Mappings(dynamic=False, properties={
>>> 'id': {'type': 'keyword'},
>>> 'comments': {'type': 'nested', 'properties': {
>>> 'comment_text': {'type': 'text'},
>>> 'date': {'type': 'date'}
>>> }}
>>> })
>>> mappings.mapping_type_of_field('id')
'keyword'
>>> mappings.mapping_type_of_field('comments')
'nested'
>>> mappings.mapping_type_of_field('comments.comment_text')
'text'
"""
try:
nid = self.get_node_id_by_path(field_path.split("."))
except ValueError:
raise AbsentMappingFieldError(
u"<%s field is not present in mappings>" % field_path
)
_, node = self.get(nid)
return node.KEY
[docs] def nested_at_field(self, field_path: str) -> Optional[str]:
"""
Return nested path applied on a given path. Return `None` is none applies.
>>> mappings = Mappings(dynamic=False, properties={
>>> 'id': {'type': 'keyword'},
>>> 'comments': {'type': 'nested', 'properties': {
>>> 'comment_text': {'type': 'text'},
>>> 'date': {'type': 'date'}
>>> }}
>>> })
>>> mappings.nested_at_field('id')
None
>>> mappings.nested_at_field('comments')
'comments'
>>> mappings.nested_at_field('comments.comment_text')
'comments'
"""
nesteds = self.list_nesteds_at_field(field_path)
if nesteds:
return nesteds[0]
return None
[docs] def list_nesteds_at_field(self, field_path: str) -> List[str]:
"""
List nested paths that apply at a given path.
>>> mappings = Mappings(dynamic=False, properties={
>>> 'id': {'type': 'keyword'},
>>> 'comments': {'type': 'nested', 'properties': {
>>> 'comment_text': {'type': 'text'},
>>> 'date': {'type': 'date'}
>>> }}
>>> })
>>> mappings.list_nesteds_at_field('id')
[]
>>> mappings.list_nesteds_at_field('comments')
['comments']
>>> mappings.list_nesteds_at_field('comments.comment_text')
['comments']
"""
path_nid = self.get_node_id_by_path(field_path.split("."))
# from deepest to highest
return [
# all path items are strings
".".join(self.get_path(nid)) # type: ignore
for nid in self.ancestors_ids(path_nid, include_current=True)
if self.get(nid)[1].KEY == "nested"
]
def _insert(
self, pid: NodeId, properties: FieldPropertiesDictOrNode, is_subfield: bool
) -> None:
"""
Recursive method to insert properties in current mappings.
:param pid: parent field identifier
:param properties: fields definitions that are inserted below pid
:param is_subfield: are provided properties `fields` mappings parameter, cf
https://www.elastic.co/guide/en/elasticsearch/reference/current/multi-fields.html
"""
if not isinstance(properties, dict):
raise ValueError("Wrong declaration, got %s" % properties)
field: Field
for field_name, field_ in properties.items():
if isinstance(field_, dict):
field_ = field_.copy()
field = Field.get_dsl_class(field_.pop("type", "object"))(
_subfield=is_subfield, **field_
)
elif isinstance(field_, Field):
field = field_
field._subfield = is_subfield
pass
else:
raise ValueError("Unsupported type %s" % type(field_))
self.insert_node(field, key=field_name, parent_id=pid)
if isinstance(field, ComplexField) and field.properties:
self._insert(field.identifier, field.properties, False)
if isinstance(field, RegularField) and field.fields:
if is_subfield:
raise ValueError(
"Cannot insert subfields into a subfield on field %s"
% field_name
)
self._insert(field.identifier, field.fields, True)
[docs] def validate_document(self, d: Union[DocSource, DocumentSource]) -> None:
# if Document
if not isinstance(d, dict) and hasattr(d, "_to_dict_"):
d = d._to_dict_()
self._validate_document(d, pid=self.root)
def _validate_document(self, d: Any, pid: NodeId, path: str = "") -> None:
if d is None:
d = {}
if not isinstance(d, dict):
raise ValueError(
"Invalid document type, expected dict, got <%s> at '%s'"
% (type(d), path)
)
field_name: str
for field_name, field in self.children(pid): # type: ignore
full_path = ".".join([path, field_name]) if path else field_name
field_value = d.get(field_name)
if field._required and not field_value:
raise ValueError("Field <%s> is required" % full_path)
if field._multiple is True:
if field_value is not None:
if not isinstance(field_value, list):
raise ValueError("Field <%s> should be a array" % full_path)
field_value_list = field_value
else:
field_value_list = []
if field._required and not any(field_value_list):
# deal with case: [None]
raise ValueError("Field <%s> is required" % full_path)
elif field._multiple is False:
if isinstance(field_value, list):
raise ValueError("Field <%s> should not be an array" % full_path)
field_value_list = [field_value] if field_value else []
else:
# field._multiple is None -> no restriction
if isinstance(field_value, list):
field_value_list = field_value
else:
field_value_list = [field_value]
for value in field_value_list:
# nullable check has been done beforehands
if value:
if not field.is_valid_value(value):
raise ValueError(
"Field <%s> value <%s> is not compatible with field of type %s"
% (full_path, value, field.KEY)
)
if isinstance(field, (Object, Nested)):
self._validate_document(value, field.identifier, path=full_path)