Source code for fhirpath.utils

# _*_ coding: utf-8 _*_
import datetime
import inspect
import math
import os
import pkgutil
import re
import sys
import time
import uuid
from importlib import import_module
from inspect import signature
from types import ModuleType
from typing import (
    TYPE_CHECKING,
    Any,
    Dict,
    List,
    Match,
    Optional,
    Pattern,
    Text,
    Type,
    Union,
    cast,
)

import pkg_resources
from pydantic.validators import bool_validator
from yarl import URL
from zope.interface import implementer

from fhirpath.thirdparty import Proxy

from .enums import FHIR_VERSION
from .interfaces import IPathInfoContext
from .json import json_dumps, json_loads  # noqa: F401
from .storage import FHIR_RESOURCE_CLASS_STORAGE, PATH_INFO_STORAGE
from .types import PrimitiveDataTypes

if TYPE_CHECKING:
    from fhir.resources.fhirabstractmodel import FHIRAbstractModel
    from fhir.resources.fhirtypes import (  # noqa: F401
        AbstractBaseType,
        AbstractType,
        Primitive,
    )
    from pydantic.fields import ModelField  # noqa: F401
    from pydantic.main import BaseConfig  # noqa: F401


__author__ = "Md Nazrul Islam <email2nazrul@gmail.com>"

LOCAL_TIMEZONE: Optional[datetime.timezone] = None


[docs]def fallback_callable(*args, **kwargs): """Always return None""" return None
def _reraise(tp, value, tb=None): if value is None: value = tp if value.__traceback__ is not tb: raise value.with_traceback(tb) raise value
[docs]def reraise(klass, msg=None, callback=None, **kw): """Reraise custom exception class""" if not issubclass(klass, Exception): raise RuntimeError(f"Class ``{klass}`` must be derived from Exception class.") t, v, tb = sys.exc_info() msg = msg or str(v) try: instance = klass(msg, **kw) if callable(callback): instance = callback(instance) _reraise(instance, None, tb) finally: del t, v, tb
[docs]def force_str(value: Any, allow_non_str: bool = True) -> Text: """ """ if isinstance(value, bytes): return value.decode("utf8", "strict") if not isinstance(value, str) and allow_non_str: value = str(value) return value
[docs]def force_bytes( string: Text, encoding: Text = "utf8", errors: Text = "strict" ) -> bytes: if isinstance(string, bytes): if encoding == "utf8": return string else: return string.decode("utf8", errors).encode(encoding, errors) if not isinstance(string, str): string = str(string) return string.encode(encoding, errors)
[docs]def import_string(dotted_path: Text) -> type: """Shameless hack from django utils, please don't mind!""" module_path: Text class_name: Text try: module_path, class_name = dotted_path.rsplit(".", 1) except (ValueError, AttributeError): msg = f"{dotted_path} doesnt look like a module path" return reraise(ImportError, msg) module: ModuleType = import_module(module_path) cls: type try: cls = getattr(module, class_name) except AttributeError: msg = f'Module "{module_path}" does not define a "{class_name}" attribute/class' return reraise(ImportError, msg) return cls
[docs]def builder(func): """ Decorator for wrapper "builder" functions. These are functions on the Query class or other classes used for building queries which mutate the query and return self. To make the build functions immutable, this decorator is used which will deepcopy the current instance. This decorator will return the return value of the inner function or the new copy of the instance. The inner function does not need to return self. """ import copy def _copy(self, *args, **kwargs): self_copy = copy.copy(self) result = func(self_copy, *args, **kwargs) # Return self if the inner function returns None. # This way the inner function can return something # different (for example when creating joins, a different builder is returned). if result is None: return self_copy return result return _copy
[docs]def lookup_all_fhir_domain_resource_classes( fhir_release: FHIR_VERSION = FHIR_VERSION.DEFAULT, ) -> Dict[str, str]: """ """ container: Dict[str, str] = {} fhir_release = FHIR_VERSION.normalize(fhir_release) pkg = "fhir.resources" if fhir_release.name != FHIR_VERSION.DEFAULT.value: pkg += f".{fhir_release.name}" prime_module_type: ModuleType = import_module(pkg) for _importer, module_name, ispkg in pkgutil.walk_packages( prime_module_type.__path__, # type: ignore prime_module_type.__name__ + ".", onerror=lambda x: None, ): if ispkg or (len(pkg.split(".")) + 1) < len(module_name.split(".")): continue module_type: ModuleType = import_module(module_name) for klass_name, _klass in inspect.getmembers(module_type, inspect.isclass): if inspect.getmro(_klass)[1].__name__ != "DomainResource": continue container[klass_name] = f"{_klass.__module__}.{klass_name}" return container
[docs]def lookup_fhir_class_path( resource_type: Text, cache: bool = True, fhir_release: FHIR_VERSION = FHIR_VERSION.DEFAULT, ) -> Optional[Text]: # noqa: E999 """This function finds FHIR resource model class (from fhir.resources) and return dotted path string. :arg resource_type: the resource type name (required). i.e Organization :arg cache: (default True) the flag which indicates should query fresh or serve from cache if available. :arg fhir_release: FHIR Release (version) name. i.e FHIR_VERSION.STU3, FHIR_VERSION.R4 :return dotted full string path. i.e fhir.resources.organization.Organization Example:: >>> from fhirpath.utils import lookup_fhir_class_path >>> from zope.interface import Invalid >>> dotted_path = lookup_fhir_class_path('Patient') >>> 'fhir.resources.patient.Patient' == dotted_path True >>> dotted_path = lookup_fhir_class_path('FakeResource') >>> dotted_path is None True """ fhir_release = FHIR_VERSION.normalize(fhir_release) storage = FHIR_RESOURCE_CLASS_STORAGE.get(fhir_release.name) if storage.exists(resource_type) and cache: return storage.get(resource_type) # Trying to get from entire modules prime_module: List[Text] = ["fhir", "resources"] if FHIR_VERSION["DEFAULT"].value != fhir_release.name: prime_module.append(fhir_release.name) prime_module_level = len(prime_module) prime_module_path: Text = ".".join(prime_module) prime_module_type: ModuleType = import_module(prime_module_path) for _importer, module_name, ispkg in pkgutil.walk_packages( prime_module_type.__path__, # type: ignore prime_module_type.__name__ + ".", onerror=lambda x: None, ): if ispkg or (prime_module_level + 1) < len(module_name.split(".")): continue module_type: ModuleType = import_module(module_name) for klass_name, _klass in inspect.getmembers(module_type, inspect.isclass): if klass_name == resource_type: storage.insert(resource_type, f"{module_name}.{resource_type}") return storage.get(resource_type) return None
[docs]def lookup_fhir_class( resource_type: Text, fhir_release: FHIR_VERSION = FHIR_VERSION.DEFAULT ) -> Type["FHIRAbstractModel"]: # noqa: E999 factory_paths: List[str] = ["fhir", "resources"] if ( FHIR_VERSION["DEFAULT"].value != fhir_release.name and fhir_release != FHIR_VERSION.DEFAULT ): factory_paths.append(fhir_release.name) factory_paths.append("get_fhir_model_class") factory: type = import_string(".".join(factory_paths)) try: klass = factory(resource_type) except KeyError: raise LookupError(f"{resource_type} is not a valid FHIR class") return klass
CONTAINS_PY_PACKAGE: Pattern = re.compile( r"^\${(?P<package_name>[0-9a-z._]+)}", re.IGNORECASE )
[docs]def expand_path(path_: Text) -> Text: """Path normalizer Supports: 1. Home Path expander 2. Package path discovery""" pkg_matched: Optional[Match[Text]] = CONTAINS_PY_PACKAGE.match(path_) if path_.startswith("~"): real_path = os.path.expanduser(path_) elif pkg_matched is not None: replacement = pkg_matched.group(0) package_name = pkg_matched.group("package_name") try: real_path = path_.replace( replacement, pkg_resources.get_distribution(package_name).location ) except pkg_resources.DistributionNotFound: msg = "Invalid package `{0}`! as provided in {1}".format( package_name, path_ ) return reraise(LookupError, msg) else: real_path = path_ if real_path.endswith(os.sep): real_path = real_path[: -len(os.sep)] return real_path
[docs]def proxy(obj): """Making proxy of any object""" try: return obj.__proxy__() except AttributeError: # trying to make ourself p_obj = Proxy() p_obj.initialize(obj) return p_obj
[docs]def unwrap_proxy(proxy_obj): """ """ assert isinstance(proxy_obj, Proxy) return proxy_obj.obj
[docs]class EmptyPathInfoContext: """Empty PathInfoContext for start(*) path!""" def __init__(self): """ """ self._parent = None self._children = None self._path = "*" self.fhir_release = None self.prop_name = None self.prop_original = None self.type_name = None self.type_class = None self.optional = None self.multiple = None self.type_is_primitive = None
EMPTY_PATH_INFO_CONTEXT = EmptyPathInfoContext()
[docs]@implementer(IPathInfoContext) class PathInfoContext: """ """ def __init__( self, path: str, fhir_release: FHIR_VERSION, prop_name: str, prop_original: str, type_name: str, type_class: Union[bool, "AbstractBaseType", "AbstractType", "Primitive"], type_field: "ModelField", type_model_config: Type["BaseConfig"], optional: bool, multiple: bool, type_is_primitive: bool, resource_type: str, ): """ """ self._parent: Optional[str] = None self._children: List[str] = list() self._path: str = path self.fhir_release: FHIR_VERSION = fhir_release self.prop_name: str = prop_name self.prop_original: str = prop_original self.type_name: str = type_name self.type_class: Union[ bool, "AbstractBaseType", "AbstractType", "Primitive" ] = type_class self.type_field: "ModelField" = type_field self.type_model_config: Type["BaseConfig"] = type_model_config self.optional: bool = optional self.multiple: bool = multiple self.type_is_primitive: bool = type_is_primitive self.resource_type: str = resource_type
[docs] @classmethod def context_from_path( cls, pathname: Text, fhir_release: FHIR_VERSION ) -> Union["PathInfoContext", "EmptyPathInfoContext"]: """ """ if pathname == "*": return EMPTY_PATH_INFO_CONTEXT fhir_release = FHIR_VERSION.normalize(fhir_release) storage = PATH_INFO_STORAGE.get(fhir_release.name) if storage.exists(pathname): # trying from cache! return storage.get(pathname) parts = pathname.split(".") resource_type = parts[0] model_path = lookup_fhir_class_path(resource_type, fhir_release=fhir_release) model_class: Type["FHIRAbstractModel"] = cast( Type["FHIRAbstractModel"], import_string(cast(Text, model_path)) ) new_path: Text = parts[0] context: Optional["PathInfoContext"] = None for index, part in enumerate(parts[1:], 1): new_path = "{0}.{1}".format(new_path, part) if storage.exists(new_path): context = storage.get(new_path) if TYPE_CHECKING: assert context if context.type_name in PrimitiveDataTypes: if (index + 1) < len(parts): raise ValueError("Invalid path {0}".format(pathname)) break else: klass = context.type_class model_class = lookup_fhir_class( klass.__resource_type__, # type: ignore FHIR_VERSION[klass.__fhir_release__], # type: ignore ) continue for field in model_class.element_properties(): if part != field.alias: continue type_model_config = model_class.__config__ multiple = str(field.outer_type_)[:12] == "typing.List[" if getattr(field.type_, "__resource_type__", None): # AbstractModelType model_class = lookup_fhir_class( field.type_.__resource_type__, FHIR_VERSION[field.type_.__fhir_release__], ) type_name = field.type_.__resource_type__ is_primitive = False else: is_primitive = True # Primitive type_name = getattr(field.type_, "__visit_name__", None) if type_name is None and field.type_ == bool: type_name = "boolean" if type_name is None: raise NotImplementedError context = cls( new_path, fhir_release=fhir_release, prop_name=field.name, prop_original=field.alias, type_name=type_name, type_class=field.type_, type_field=field, type_model_config=type_model_config, optional=(not field.required), multiple=multiple, type_is_primitive=is_primitive, resource_type=resource_type, ) if index > 1: context.parent = ".".join(new_path.split(".")[:-1]) # Get Property: should return parent Context obj instead # of just string parent_context = context.parent parent_context.add_child(new_path) # type: ignore storage.insert(new_path, context) if is_primitive: if (index + 1) < len(parts): raise ValueError("Invalid path {0}".format(pathname)) break # important! even context is None, that means not valid path (part) if context is None: break if TYPE_CHECKING: assert context return context
def __proxy__(self): """ """ return PathInfoContextProxy(self) def _set_parent(self, dotted_path: str): """ """ self._parent = dotted_path def _get_parent(self) -> "PathInfoContext": """ """ assert self._parent parent = PathInfoContext.context_from_path(self._parent, self.fhir_release) if TYPE_CHECKING: assert isinstance(parent, PathInfoContext) return parent parent = property(_get_parent, _set_parent) def _get_children(self): """ """ return [ PathInfoContext.context_from_path(child, self.fhir_release) for child in self._children ] def _set_children(self, paths): """ """ if isinstance(paths, str): paths = [paths] self._children = paths children = property(_get_children, _set_children)
[docs] def is_root(self): """ """ return self._parent is None
[docs] def add_child(self, path): """ """ if path not in self._children: self._children.append(path)
def __repr__(self): """ """ return "<{0}.{1}('{2}')>".format( self.__class__.__module__, self.__class__.__name__, self._path ) def __str__(self): """ """ return str(self._path)
[docs] def validate_value(self, value): """``pydantic`` way to validate value""" if self.type_class == bool: return bool_validator(value) for validator in self.type_class.__get_validators__(): sig = signature(validator) args = list(sig.parameters.keys()) if len(args) == 1: value = validator(value) elif len(args) == 2: value = validator(value, self.type_field) elif len(args) == 3: value = validator(value, self.type_field, self.type_model_config) else: raise NotImplementedError return value
[docs]class PathInfoContextProxy(Proxy): """ """ def __init__(self, context: PathInfoContext): """ """ super(PathInfoContextProxy, self).__init__() self.initialize(context)
[docs]class BundleWrapper: """ """ FHIR_REST_SERVER_PATH_PATTERN: Optional[Pattern] = None def __init__( self, engine, result, includes: List, url: URL, bundle_type="searchset", *, base_url: URL = None, init_data: Dict[str, Any] = None, ): """ """ self.fhir_version = engine.fhir_release self.bundle_model = lookup_fhir_class("Bundle", fhir_release=self.fhir_version) self.base_url: URL = base_url or BundleWrapper.calculate_fhir_base_url(url) if init_data: self.data = init_data else: self.data = BundleWrapper.init_data() self.data["type"] = bundle_type # our pagination is based main query result. # fixme: still issue for _has chaining self.data["total"] = result.header.total # attach main results self.attach_entry(result, "match") # attach included results for _include in includes: self.attach_entry(_include, "include") self.attach_links(url, len(result.body))
[docs] @classmethod def fhir_rest_server_path_pattern(cls): """ """ if cls.FHIR_REST_SERVER_PATH_PATTERN is None: all_resource_domain_types = set( list(lookup_all_fhir_domain_resource_classes(FHIR_VERSION.R4).keys()) + list( lookup_all_fhir_domain_resource_classes(FHIR_VERSION.STU3).keys() ) + list( lookup_all_fhir_domain_resource_classes(FHIR_VERSION.DSTU2).keys() ) ) all_resources = "|".join(all_resource_domain_types) pattern = re.compile( r"(?:(/(?P<resource_name>" + all_resources + r")" r"(?:" r"(?P<search2>/_search)|" r"(?P<resource_id>/[A-Za-z0-9\-.]{1,64})(?P<history>/_history)?" r")?" r")|(?P<search1>/_search))?" r"(?P<graphql>/\$graphql)?$" ) cls.FHIR_REST_SERVER_PATH_PATTERN = pattern return cls.FHIR_REST_SERVER_PATH_PATTERN
[docs] @staticmethod def calculate_fhir_base_url(url: URL) -> URL: """https://www.hl7.org/fhir/Bundle.html Section: 2.36.4 Resource URL & Uniqueness rules in a bundle. Section: 2.36.4.1 Resolving references in Bundles. """ _url = url raw_path = url.raw_path if len(raw_path) > 1 and raw_path.endswith("/"): raw_path = raw_path[:-1] matches = BundleWrapper.fhir_rest_server_path_pattern().search(raw_path) group_dict = {} if matches: group_dict = matches.groupdict() if group_dict.get("resource_name"): _url = _url.parent if group_dict.get("resource_id"): _url = _url.parent if group_dict.get("history"): _url = _url.parent elif group_dict.get("search2"): _url = _url.parent elif group_dict.get("search1"): _url = _url.parent if group_dict.get("graphql"): _url = _url.parent return _url
[docs] @staticmethod def init_data() -> Dict[str, Any]: """Initialized Bundle data""" data = {"id": str(uuid.uuid4()), "meta": {"lastUpdated": timestamp_utc()}} return data
[docs] def attach_entry(self, result, mode="match"): """ """ if "entry" not in self.data: self.data["entry"] = list() for row in result.body: resource = row[0] if isinstance(resource, dict): resource_id = resource["id"] resource_type = resource["resourceType"] elif getattr(resource.__class__, "get_resource_type", fallback_callable)(): resource_id = resource.id resource_type = resource.resource_type else: raise NotImplementedError( f"EngineRowResult must be a dict or FHIRAbstractModel, got: {resource}" ) # entry = BundleEntry entry = dict() entry["fullUrl"] = "{0}/{1}".format(resource_type, resource_id) entry["resource"] = resource # search = BundleEntrySearch search = {"mode": mode} entry["search"] = search self.data["entry"].append(entry)
def __call__(self, as_json=False): """ """ if as_json: # if as_json is True, return the bundle as python dict # instead of building a pydantic.BaseModel # important! self.data["resourceType"] = self.bundle_model.get_resource_type() return self.data return self.bundle_model.parse_obj(self.data)
[docs] def resolve_absolute_uri(self, relative_path: str) -> URL: """ """ try: resource, id = relative_path.split("/") except ValueError: raise ValueError( f"'{relative_path}' is not valid relative path. " "Format 'ResourceType/ResourceID'" ) return self.base_url / resource / id
[docs] def json(self): """ """ return self.__call__().json()
[docs]def get_local_timezone() -> datetime.timezone: if LOCAL_TIMEZONE is not None: return LOCAL_TIMEZONE is_dst = time.daylight and time.localtime().tm_isdst > 0 seconds = -(time.altzone if is_dst else time.timezone) tz = datetime.timezone(datetime.timedelta(seconds=seconds)) return tz
[docs]def timestamp_utc() -> datetime.datetime: """UTC datetime with timezone offset""" dt_now = datetime.datetime.utcnow() return dt_now.replace(tzinfo=datetime.timezone.utc)
[docs]def timestamp_local() -> datetime.datetime: """Timezone aware datetime with local timezone offset""" return datetime.datetime.now(tz=get_local_timezone())