Source code for fhirpath.search

# _*_ coding: utf-8 _*_
import logging
import re
from typing import (
    TYPE_CHECKING,
    Any,
    Dict,
    List,
    Optional,
    Pattern,
    Set,
    Text,
    Tuple,
    Type,
    Union,
    cast,
)
from urllib.parse import unquote_plus
from warnings import warn

from multidict import MultiDict, MultiDictProxy
from zope.interface import implementer

from fhirpath.engine import EngineResult, EngineResultBody, EngineResultHeader
from fhirpath.enums import (
    FHIR_VERSION,
    GroupType,
    MatchType,
    SortOrderType,
    WhereConstraintType,
)
from fhirpath.exceptions import ValidationError
from fhirpath.fhirspec import (
    FHIRSearchSpecFactory,
    ResourceSearchParameterDefinition,
    SearchParameter,
    lookup_fhir_resource_spec,
    search_param_prefixes,
)
from fhirpath.fql import (
    G_,
    T_,
    V_,
    contains_,
    eb_,
    exact_,
    exists_,
    not_,
    not_exists_,
    sa_,
    sort_,
)
from fhirpath.fql.types import ElementPath
from fhirpath.interfaces import IGroupTerm, ISearch, ISearchContext
from fhirpath.query import Q_, QueryResult
from fhirpath.storage import SEARCH_PARAMETERS_STORAGE

__author__ = "Md Nazrul Islam <email2nazrul@gmail.com>"

escape_comma_replacer: Text = "_ESCAPE_COMMA_"
uri_scheme: Pattern = re.compile(r"^https?://", re.I)
value_prefixes: Set[str] = {"eq", "ne", "gt", "lt", "ge", "le", "sa", "eb", "ap"}
has_dot_as: Pattern = re.compile(r"\.as\([a-z]+\)$", re.I ^ re.U)
has_dot_is: Pattern = re.compile(r"\.is\([a-z]+\)$", re.I ^ re.U)
has_dot_where: Pattern = re.compile(r"\.where\([a-z=\'\"()]+\)", re.I ^ re.U)
parentheses_wrapped: Pattern = re.compile(r"^\(.+\)$")
logger = logging.getLogger("fhirpath.search")

DEFAULT_RESULT_COUNT = 100


[docs]def has_escape_comma(val): return "\\," in val
[docs]@implementer(ISearchContext) class SearchContext(object): """ """ __slots__ = ( "resource_types", "engine", "unrestricted", "async_result", "definitions", "search_params_intersection", ) definitions: List[ResourceSearchParameterDefinition] def __init__(self, engine, resource_type, unrestricted=False, async_result=None): """ """ self.engine = engine self.resource_types = [resource_type] if resource_type else [] self.unrestricted = unrestricted self.async_result = self.engine.__class__.is_async() if async_result is not None: warn( "'async_result' is no longer used, as Engine has that info already. " "this parameter will be removed in future release.", category=DeprecationWarning, ) self.definitions = self.get_parameters_definition(self.engine.fhir_release)
[docs] def get_parameters_definition( self, fhir_release: FHIR_VERSION, ) -> List[ResourceSearchParameterDefinition]: """ """ fhir_release = FHIR_VERSION.normalize(fhir_release) storage = SEARCH_PARAMETERS_STORAGE.get(fhir_release.name) if storage.empty(): spec = FHIRSearchSpecFactory.from_release(fhir_release.name) spec.write() # if self.resource_types is empty, return the searchparams # definitions of the generic "Resource" type. return [ storage.get(resource_type) for resource_type in (self.resource_types or ["Resource"]) ]
[docs] def augment_with_types(self, resource_types: List[str]): if len(resource_types) == 0: return self.resource_types.extend(resource_types) self.definitions = self.get_parameters_definition(self.engine.fhir_release) self.search_params_intersection = [ sp for sp in self.definitions[0] if all(sp in d for d in self.definitions) ]
[docs] def resolve_path_context(self, search_param: SearchParameter): """ """ if search_param.expression is None: raise NotImplementedError # Some Safeguards if search_param.type == "composite": raise NotImplementedError if search_param.type in ("token", "composite") and search_param.code.startswith( "combo-" ): raise NotImplementedError dotted_path = search_param.expression if parentheses_wrapped.match(dotted_path): dotted_path = dotted_path[1:-1] return self._dotted_path_to_path_context(dotted_path)
[docs] def normalize_param( self, param_name, raw_value ) -> List[Tuple[ElementPath, str, Optional[str]]]: """ """ try: parts = param_name.split(":") param_name_ = parts[0] modifier_ = parts[1] except IndexError: modifier_ = None normalized_params: List[Tuple[ElementPath, str, Optional[str]]] = [] search_params_def = self._get_search_param_definitions(param_name_) for sp in search_params_def: # Look out for any composite or combo type parameter if sp.type == "composite": normalized_params.extend( self._normalize_composite_param( raw_value, param_def=sp, modifier=modifier_ ) ) continue if len(raw_value) == 0: raw_value = None elif len(raw_value) == 1: raw_value = raw_value[0] values: List = self.normalize_param_value(raw_value, sp) if len(values) == 0: # empty parameters are not considered an error, they should be ignored continue if len(values) == 1: param_value_ = values[0] else: param_value_ = values Search.validate_normalized_value(param_name_, param_value_, modifier_) _path = self.resolve_path_context(sp) normalized_params.append((_path, param_value_, modifier_)) return normalized_params
[docs] def normalize_param_value( self, raw_value: Union[List, str], search_param: SearchParameter ): normalized_values: List[Any] = [] if not raw_value: return [] elif isinstance(raw_value, list): bucket: List[str] = list() for rv in raw_value: bucket.extend(self.normalize_param_value(rv, search_param)) if len(bucket) == 1: normalized_values.append(bucket[0]) else: normalized_values.append(bucket) else: escape_ = has_escape_comma(raw_value) if escape_: param_value = raw_value.replace("\\,", escape_comma_replacer) else: param_value = raw_value value_parts = param_value.split(",") bucket_ = list() for val in value_parts: comparison_operator = "eq" if escape_: val_ = val.replace(escape_comma_replacer, "\\,") else: val_ = val for prefix in search_param_prefixes: if val_.startswith(prefix) and search_param.support_prefix(): comparison_operator = prefix val_ = val_[2:] break bucket_.append((comparison_operator, val_)) if len(bucket_) == 1: normalized_values.append(bucket_[0]) else: normalized_values.append((None, bucket_)) return normalized_values
def _get_search_param_definitions(self, param_name) -> List[SearchParameter]: """ """ params_def = [] for definition in self.definitions: search_param = getattr(definition, param_name, None) if search_param is None: if param_name in ("_format", "_pretty"): continue raise ValidationError( "No search definition is available for search parameter " f"``{param_name}`` on Resource ``{definition.resource_type}``." ) params_def.append(search_param) return params_def def _dotted_path_to_path_context(self, dotted_path): """ """ if len(dotted_path.split(".")) == 1: raise ValidationError("Invalid dotted path ´{0}´".format(dotted_path)) path_ = ElementPath.from_el_path(dotted_path) path_.finalize(self.engine) return path_ def _normalize_composite_param( self, raw_value, param_def, modifier ) -> List[Tuple[ElementPath, str, Optional[str]]]: """ """ if len(raw_value) < 1: raise NotImplementedError( "Currently duplicate composite type params are not allowed or supported" ) value_parts = raw_value[0].split("$") if len(value_parts) != len(param_def.component): raise ValueError( f"Composite search param {param_def.name} expects {len(param_def.component)} " f"values separated by a '$', got {len(value_parts)}." ) results: List[Tuple[ElementPath, str, Optional[str]]] = [ self.parse_composite_parameter_component( component, value_part, param_def, modifier ) for component, value_part in zip(param_def.component, value_parts) ] return results
[docs] def parse_composite_parameter_component( self, component, raw_value, param_def, modifier ): result = [] for expr in component["expression"].split("|"): component_dotted_path = ".".join([param_def.expression, expr.strip()]) component_param_value = self.normalize_param_value(raw_value, param_def) if len(component_param_value) == 1: component_param_value = component_param_value[0] result.append( ( self._dotted_path_to_path_context(component_dotted_path), component_param_value, modifier, ) ) if len(result) == 1: return result[0] return result
[docs]class AsyncSearch(Search): """ """ async def __call__(self, as_json=False): """ """ # TODO: chaining # reverse chaining (_has) if self.result_params.get("_has"): has_queries = self.has() # compute the intersection of referenced resources' ID # from the result of _has queries. self.reverse_chaining_results = {} for ref_param, q in has_queries: res = await q.fetchall() self.reverse_chaining_results = { r_type: set(ids).intersection(self.reverse_chaining_results[r_type]) if self.reverse_chaining_results.get(r_type) else set(ids) for r_type, ids in res.extract_references(ref_param).items() if r_type in self.context.resource_types } # if the _has predicates did not match any documents, return an empty result # FIXME: we use the result of the last _has query to build the empty bundle, # but we should be more explicit about the query context. if not self.reverse_chaining_results: return self.response(res, [], as_json) # MAIN QUERY self.main_query = self.build() # TODO handle count with _includes if self.result_params.get("_summary") == "count": main_result = await self.main_query.count_raw() else: main_result = await self.main_query.fetchall() assert main_result is not None # _include self.include_queries = self.include(main_result) include_results: List[EngineResult] = [ await q.fetchall() for q in self.include_queries ] # _revinclude self.rev_include_queries = self.rev_include(main_result) rev_include_results: List[EngineResult] = [ await q.fetchall() for q in self.rev_include_queries ] all_includes = [*include_results, *rev_include_results] return self.response(main_result, all_includes, as_json)