Source code for WDL.Env

# pyre-strict
"""
Environments, for identifier resolution during WDL typechecking and evaluation.
"""
from typing import Optional, TypeVar, Generic, Any, Callable, Set, Iterator

T = TypeVar("T")
S = TypeVar("S")


[docs]class Binding(Generic[T]): """ An individual, immutable binding of a possibly-namespaced name to a right-hand-side value of type ``T``. ``T`` is typically ``Value.Base`` (value environments) or ``Type.Base`` (type environments). The binding may also reference an additional informational value of arbitrary type. """ _name: str _value: T _info: Any # pyre-ignore def __init__(self, name: str, value: T, info: Any = None) -> None: # pyre-ignore self._name = name self._value = value self._info = info @property def name(self) -> str: """:type: str Namedspaced names are flat, dot-separated strings. """ return self._name @property def value(self) -> T: ":type: T" return self._value @property def info(self) -> Any: # pyre-ignore ":type: Any" return self._info
[docs]class Bindings(Generic[T]): """WDL.Env.Bindings(binding: Optional[WDL.Env.Binding[T]] = None, next: Optional[WDL.Env.Bindings[T]] = None) An environment consisting of an immutable linked-list of :class:`WDL.Env.Binding` objects. ``WDL.Env.Bindings()`` is the empty environment. ``Bindings[T]`` is iterable for the individual ``Binding[T]`` objects:: env = WDL.Env.Bindings() env = env.bind("x", 1).bind("y", 42) print(env["x"]) # 1 print(",".join(str(b.value) for b in env)) # 1,42 """ _binding: Optional[Binding[T]] _next: "Optional[Bindings[T]]" _namespaces: Optional[Set[str]] = None def __init__( self, binding: Optional[Binding[T]] = None, next: "Optional[Bindings[T]]" = None, ) -> None: assert binding or not next self._binding = binding self._next = next def __bool__(self) -> bool: return next(self.__iter__(), None) is not None def __iter__(self) -> Iterator[Binding[T]]: mask = set() pos = self while pos is not None: if isinstance(pos._binding, Binding) and pos._binding.name not in mask: mask.add(pos._binding.name) yield pos._binding pos = pos._next def __len__(self) -> int: return sum(1 for _ in self)
[docs] def bind(self, name: str, value: T, info: Any = None) -> "Bindings[T]": # pyre-ignore """ Return an environment with a new binding prepended. Any existing binding for the same name is shadowed by the new one. (This should not usually arise in view of the immutability of WDL values.) """ assert name and not (name.startswith(".") or name.endswith(".")) return Bindings(Binding(name, value, info), self)
[docs] def resolve_binding(self, name: str) -> Binding[T]: """Look up a :class:`WDL.Env.Binding` object by name :raise KeyError: no such binding """ for b in self: if b.name == name: return b raise KeyError()
[docs] def resolve(self, name: str) -> T: """ Look up a bound value by name. Equivalently, ``env[name]`` :raise KeyError: no such binding """ return self.resolve_binding(name).value
[docs] def get(self, name: str, default: Optional[T] = None) -> Optional[T]: """ Look up a bound value by name, returning the default value or ``None`` if there's no such binding. """ try: return self.resolve(name) except KeyError: return default
def __getitem__(self, name: str) -> T: return self.resolve(name)
[docs] def has_binding(self, name: str) -> bool: """ Determine existence of a binding for the name. Equivalently, ``name in env`` """ try: self.resolve(name) return True except KeyError: return False
def __contains__(self, name: str) -> bool: if isinstance(name, str): return self.has_binding(name) return False
[docs] def map(self, f: Callable[[Binding[T]], Optional[Binding[S]]]) -> "Bindings[S]": """ Copy the environment with each binding transformed by the given function. If the function returns ``None`` then the binding is excluded. """ ans = Bindings() for b in self: fb = f(b) if isinstance(fb, Binding): ans = Bindings(fb, ans) return _rev(ans)
[docs] def filter(self, pred: Callable[[Binding[T]], bool]) -> "Bindings[T]": "Copy the environment with only those bindings for which ``pred`` returns True" return self.map(lambda b: b if pred(b) else None)
[docs] def subtract(self, rhs: "Bindings[S]") -> "Bindings[T]": "Copy the environment excluding any binding for which ``rhs`` has a binding with the same name" def flt(b: Binding[T]) -> bool: try: rhs.resolve(b.name) return False except KeyError: return True return self.filter(flt)
@property def namespaces(self) -> Set[str]: """:type: Set[str] Return the environment's namespaces, all the distinct dot-separated prefixes of the binding names. Each element ends with a dot. """ if self._namespaces is None: self._namespaces = self._next.namespaces if self._next is not None else set() if isinstance(self._binding, Binding): names = self._binding.name.split(".") if len(names) > 1: for i in range(len(names) - 1): ns = ".".join(names[: i + 1]) + "." self._namespaces.add(ns) return self._namespaces.copy()
[docs] def has_namespace(self, namespace: str) -> bool: "Determine existence of a namespace in the environment" assert namespace if not namespace.endswith("."): namespace += "." return namespace in (self._namespaces if self._namespaces else self.namespaces)
[docs] def enter_namespace(self, namespace: str) -> "Bindings[T]": """ Generate an environment with only those bindings in the given namespace, with the namespace prefix removed from each binding's name. """ assert namespace if not namespace.endswith("."): namespace += "." def enter(b: Binding[T]) -> Optional[Binding[T]]: if b.name.startswith(namespace): return Binding(b.name[len(namespace) :], b.value, b.info) return None return self.map(enter)
[docs] def wrap_namespace(self, namespace: str) -> "Bindings[T]": "Copy the environment with the given namespace prefixed to each binding name" assert namespace if not namespace.endswith("."): namespace += "." ans = Bindings() pos = self while pos is not None: if isinstance(pos._binding, Binding): ans = Bindings( Binding(namespace + pos._binding.name, pos._binding.value, pos._binding.info), ans, ) pos = pos._next return _rev(ans)
def _rev(env: Bindings[T]) -> Bindings[T]: ans = Bindings() pos = env while pos is not None: if pos._binding: ans = Bindings(pos._binding, ans) pos = pos._next return ans
[docs]def merge(*args: Bindings[T]) -> Bindings[T]: """ Merge several ``Bindings[T]`` environments into one. For efficiency, the largest environment should be supplied as the last argument. """ ans = [args[-1] if args else Bindings()] def visit(b: Binding[T]) -> None: ans[0] = Bindings(b, ans[0]) for env in reversed(args[:-1]): assert isinstance(env, Bindings) for b in _rev(env): visit(b) return ans[0]