Source code for WDL.Value

"""
WDL values instantiated at runtime

Each value is represented by an instance of a Python class inheriting from
``WDL.Value.Base``.

.. inheritance-diagram:: WDL.Value
   :top-classes: WDL.Value.Base
"""
import json
import copy
import base64
import hashlib
from abc import ABC
from typing import Any, List, Optional, Tuple, Dict, Iterable, Union, Callable, Set
from contextlib import suppress
from . import Error, Type, Env


[docs]class Base(ABC): """The abstract base class for WDL values""" type: Type.Base ":type: WDL.Type.Base" value: Any """The "raw" Python value""" _expr: "Optional[Expr.Base]" def __init__(self, type: Type.Base, value: Any, expr: "Optional[Expr.Base]" = None) -> None: assert isinstance(type, Type.Base) self.type = type if self.type.optional and not isinstance(self, Null): self.type = self.type.copy(optional=False) # normalize runtime type self.value = value self._expr = None if expr: self.expr = expr def __eq__(self, other) -> bool: if self.value is None: return other.value is None return self.type == other.type and self.value == other.value def __str__(self) -> str: return json.dumps(self.json) @property def expr(self) -> "Optional[Expr.Base]": """ Reference to the WDL expression that generated this value, if it originated from ``WDL.Expr.eval`` """ return self._expr @expr.setter def expr(self, rhs: "Expr.Base"): old_expr = self._expr # possibly None if rhs is not old_expr: self._expr = rhs # recursively replace old_expr in children stack = [ch for ch in self.children] while stack: desc = stack.pop() if desc.expr is old_expr: desc._expr = rhs stack.extend(desc2 for desc2 in desc.children)
[docs] def coerce(self, desired_type: Optional[Type.Base] = None) -> "Base": """ Coerce the value to the desired type and return it. Types should be checked statically on ``WDL.Expr.Base`` prior to evaluation. :raises: ReferenceError for a null value and non-optional type """ if isinstance(desired_type, Type.String): return String(str(self), self.expr) if isinstance(desired_type, Type.Array) and self.type.coerces( desired_type.item_type, check_quant=False ): # coercion of T to Array[T] (x to [x]) # if self is an Array, then Array.coerce precludes this path return Array(desired_type, [self.coerce(desired_type.item_type)], self.expr) if desired_type and not self.type.coerces(desired_type): # owing to static type-checking, this path should arise only rarely e.g. read_json() raise Error.InputError(f"cannot coerce {str(self.type)} to {str(desired_type)}") return self
[docs] def expect(self, desired_type: Optional[Type.Base] = None) -> "Base": """Alias for coerce""" return self.coerce(desired_type)
@property def json(self) -> Any: """Return a value representation which can be serialized to JSON using ``json.dumps``""" """(str, int, float, list, dict, or null)""" return self.value @property def children(self) -> "Iterable[Base]": return []
[docs]class Boolean(Base): """``value`` has Python type ``bool``""" def __init__(self, value: bool, expr: "Optional[Expr.Base]" = None) -> None: super().__init__(Type.Boolean(), value, expr)
[docs]class Float(Base): """``value`` has Python type ``float``""" def __init__(self, value: float, expr: "Optional[Expr.Base]" = None) -> None: super().__init__(Type.Float(), value, expr) def __str__(self) -> str: return "{:.6f}".format(self.value)
[docs]class Int(Base): """``value`` has Python type ``int``""" def __init__(self, value: int, expr: "Optional[Expr.Base]" = None) -> None: super().__init__(Type.Int(), value, expr) def coerce(self, desired_type: Optional[Type.Base] = None) -> Base: """""" if isinstance(desired_type, Type.Float): return Float(float(self.value), self.expr) return super().coerce(desired_type)
[docs]class String(Base): """``value`` has Python type ``str``""" def __init__( self, value: str, expr: "Optional[Expr.Base]" = None, subtype: Optional[Type.Base] = None ) -> None: subtype = subtype or Type.String() super().__init__(subtype, value, expr) def coerce(self, desired_type: Optional[Type.Base] = None) -> Base: """""" if isinstance(desired_type, Type.String): return String(self.value, self.expr) if isinstance(desired_type, Type.File) and not isinstance(self, File): return File(self.value, self.expr) if isinstance(desired_type, Type.Directory) and not isinstance(self, Directory): return Directory(self.value, self.expr) try: if isinstance(desired_type, Type.Int): return Int(int(self.value), self.expr) if isinstance(desired_type, Type.Float): return Float(float(self.value), self.expr) except ValueError as exn: msg = f"coercing String to {desired_type}: {exn}" raise Error.EvalError(self.expr, msg) if self.expr else Error.RuntimeError(msg) return super().coerce(desired_type)
[docs]class File(String): """``value`` has Python type ``str``""" def __init__(self, value: str, expr: "Optional[Expr.Base]" = None) -> None: super().__init__(value, expr=expr, subtype=Type.File()) if value != value.rstrip("/"): raise Error.InputError("WDL.Value.File invalid path: " + value)
[docs]class Directory(String): """``value`` has Python type ``str``""" def __init__(self, value: str, expr: "Optional[Expr.Base]" = None) -> None: super().__init__(value, expr=expr, subtype=Type.Directory())
[docs]class Array(Base): """``value`` is a Python ``list`` of other ``WDL.Value.Base`` instances""" value: List[Base] type: Type.Array def __init__( self, item_type: Type.Base, value: List[Base], expr: "Optional[Expr.Base]" = None ) -> None: self.value = [] self.type = Type.Array(item_type, nonempty=(len(value) > 0)) super().__init__(self.type, value, expr) @property def json(self) -> Any: """""" return [item.json for item in self.value] def __str__(self) -> Any: # nb: this is NOT json.dumps(self.json) because it applies item __str__ overrides return "[" + ", ".join(str(item) for item in self.value) + "]" @property def children(self) -> Iterable[Base]: return self.value def coerce(self, desired_type: Optional[Type.Base] = None) -> Base: """""" if isinstance(desired_type, Type.Array): if desired_type.nonempty and not self.value: if self.expr: raise Error.EmptyArray(self.expr) else: raise ValueError("Empty array for Array+ input/declaration") if desired_type.item_type == self.type.item_type or ( isinstance(desired_type.item_type, Type.Any) ): return self return Array( desired_type, [v.coerce(desired_type.item_type) for v in self.value], self.expr ) return super().coerce(desired_type)
[docs]class Map(Base): value: List[Tuple[Base, Base]] type: Type.Map def __init__( self, item_type: Tuple[Type.Base, Type.Base], value: List[Tuple[Base, Base]], expr: "Optional[Expr.Base]" = None, ) -> None: self.value = [] self.type = Type.Map(item_type) super().__init__(self.type, value, expr) @property def json(self) -> Any: """""" ans = {} if not self.type.item_type[0].coerces(Type.String()): msg = f"cannot write {str(self.type)} to JSON" raise (Error.EvalError(self.expr, msg) if self.expr else Error.RuntimeError(msg)) for k, v in self.value: kstr = k.coerce(Type.String()).value if kstr not in ans: ans[kstr] = v.json return ans def __str__(self) -> Any: items = {} for k, v in self.value: items[str(k)] = str(v) return "{" + ", ".join(f"{k}: {v}" for k, v in items.items()) + "}" @property def children(self) -> Iterable[Base]: for k, v in self.value: yield k yield v def coerce(self, desired_type: Optional[Type.Base] = None) -> Base: """""" if isinstance(desired_type, Type.Map) and desired_type != self.type: return Map( desired_type.item_type, [ (k.coerce(desired_type.item_type[0]), v.coerce(desired_type.item_type[1])) for (k, v) in self.value ], self.expr, ) if isinstance(desired_type, Type.StructInstance): # Runtime typecheck for initializing struct from read_{object,objects,map} # This couldn't have been checked statically because the map keys weren't known. assert self.type.item_type[0].coerces(Type.String()) try: Type.Map( self.type.item_type, self.type.optional, set(kv[0].coerce(Type.String()).value for kv in self.value), ).check(desired_type) except TypeError as exn: msg = "unusable runtime struct initializer" if exn.args: msg += ", " + exn.args[0] raise Error.EvalError( self.expr, msg, ) if self.expr else Error.RuntimeError(msg) assert desired_type.members # coerce to desired member types ans = {} for k, v in self.value: k = k.coerce(Type.String()).value try: ans[k] = v.coerce(desired_type.members[k]) except Error.RuntimeError as exc: # some coercions that typecheck could still fail, e.g. String to Int msg = ( "runtime type mismatch initializing " f"{desired_type.members[k]} {k} member of struct {desired_type.type_name}" ) + ((": " + exc.args[0]) if exc.args else "") raise Error.EvalError( self.expr, msg, ) if self.expr else Error.RuntimeError(msg) return Struct(desired_type, ans, self.expr) return super().coerce(desired_type)
[docs]class Pair(Base): value: Tuple[Base, Base] type: Type.Pair def __init__( self, left_type: Type.Base, right_type: Type.Base, value: Tuple[Base, Base], expr: "Optional[Expr.Base]" = None, ) -> None: self.value = value self.type = Type.Pair(left_type, right_type) super().__init__(self.type, value, expr) def __str__(self) -> str: assert isinstance(self.value, tuple) return "(" + str(self.value[0]) + "," + str(self.value[1]) + ")" @property def json(self) -> Any: """""" return {"left": self.value[0].json, "right": self.value[1].json} @property def children(self) -> Iterable[Base]: yield self.value[0] yield self.value[1] def coerce(self, desired_type: Optional[Type.Base] = None) -> Base: """""" if isinstance(desired_type, Type.Pair) and desired_type != self.type: return Pair( desired_type.left_type, desired_type.right_type, ( self.value[0].coerce(desired_type.left_type), self.value[1].coerce(desired_type.right_type), ), self.expr, ) return super().coerce(desired_type)
[docs]class Null(Base): """Represents the missing value which optional inputs may take. ``type`` and ``value`` are both None.""" def __init__(self, expr: "Optional[Expr.Base]" = None) -> None: super().__init__(Type.Any(null=True), None, expr) def coerce(self, desired_type: Optional[Type.Base] = None) -> Base: """""" if desired_type and not desired_type.optional and not isinstance(desired_type, Type.Any): if isinstance(desired_type, (Type.File, Type.Directory)): # This case arises processing task outputs; we convert nonexistent paths to Null # before coercing to the declared output type (+ checking whether it's optional). raise FileNotFoundError() # normally the typechecker should prevent the following cases, but it might have had # check_quant=False if isinstance(desired_type, Type.String): return String("", self.expr) if isinstance(desired_type, Type.Array) and desired_type.item_type.optional: return Array(desired_type, [self.coerce(desired_type.item_type)], self.expr) if self.expr: raise Error.NullValue(self.expr) raise Error.InputError("'None' for non-optional input/declaration") return self def __str__(self) -> str: return "None" @property def json(self) -> Any: """""" return None
[docs]class Struct(Base): value: Dict[str, Base] # records the names of any extraneous keys that were present in the JSON/Map/Object from which # this struct was initialized extra: Set[str] def __init__( self, type: Union[Type.Object, Type.StructInstance], value: Dict[str, Base], expr: "Optional[Expr.Base]" = None, extra: Optional[Set[str]] = None, ) -> None: # type may be Object for the transient evaluation of an object literal or read_json(); we # expect it to be coerced to a StructInstance in short order. value = dict(value) if isinstance(type, Type.StructInstance): # fill in null for any omitted optional members assert type.members for k in type.members: if k not in value: assert type.members[k].optional value[k] = Null() self.value = value self.extra = extra or set() super().__init__(type, value, expr) def coerce(self, desired_type: Optional[Type.Base] = None) -> Base: """""" if isinstance(desired_type, Type.StructInstance): return self._coerce_to_struct(desired_type) if isinstance(desired_type, Type.Map): return self._coerce_to_map(desired_type) if not isinstance(desired_type, (Type.Any, Type.Object)): self._eval_error(f"cannot coerce struct to {desired_type}") # Object coercion is a no-op because we expect a further coercion to StructInstance to # follow in short order, providing the expected member types. return self def _coerce_to_struct(self, desired_type: Type.StructInstance) -> Base: assert desired_type.members if isinstance(self.type, Type.StructInstance) and self.type.type_id == desired_type.type_id: return self try: # Runtime typecheck for initializing StructInstance from read_json(), where the # Object type isn't known until runtime self.type.check(desired_type) except TypeError as exn: msg = "unusable runtime struct initializer" if exn.args: msg += ", " + exn.args[0] self._eval_error(msg) # coerce to desired member types members = {} extra = set() for k in self.value: if k not in desired_type.members: extra.add(k) else: try: members[k] = self.value[k].coerce(desired_type.members[k]) except Error.RuntimeError as exc: # some coercions that typecheck could still fail, e.g. String to Int; note the # offending member, taking care not to obscure it if the struct is nested msg = "" if exc.args: if "member of struct" in exc.args[0]: raise msg = ": " + exc.args[0] msg = ( "runtime type mismatch initializing " f"{desired_type.members[k]} {k} member of struct {desired_type.type_name}" ) + msg self._eval_error(msg) return Struct(desired_type, members, expr=self.expr, extra=extra) def _coerce_to_map(self, desired_type: Type.Map) -> Map: # runtime coercion e.g. Map[String,String] foo = read_json("foo.txt") assert isinstance(self.type, Type.Object) key_type = desired_type.item_type[0] if not Type.String().coerces(key_type): self._eval_error(f"cannot coerce struct member names to {desired_type} keys") value_type = desired_type.item_type[1] entries = [] for k, v in self.value.items(): if not isinstance(v, Null) or value_type.optional: map_key = None map_value = None try: map_key = String(k).coerce(key_type) except Error.RuntimeError: self._eval_error(f"cannot coerce struct member name {k} to {desired_type} key") if self.type.members[k].coerces(value_type): with suppress(Error.RuntimeError): map_value = v.coerce(value_type) if map_value is None: self._eval_error( "cannot coerce struct member" f" {self.type.members[k]} {k} to {value_type} map value" ) entries.append((map_key, map_value)) return Map(desired_type.item_type, entries) def _eval_error(self, msg: str) -> None: raise ( Error.EvalError( self.expr, msg, ) if self.expr else Error.RuntimeError(msg) ) from None def __str__(self) -> Any: return "{" + ", ".join(f"{k}: {str(v)}" for k, v in self.value.items()) + "}" @property def json(self) -> Any: """""" ans = {} for k, v in self.value.items(): ans[k] = v.json return ans @property def children(self) -> Iterable[Base]: return self.value.values()
[docs]def from_json(type: Type.Base, value: Any) -> Base: """ Instantiate a WDL value of the specified type from a parsed JSON value (str, int, float, list, dict, or null). If type is :class:`WDL.Type.Any()`, attempts to infer a WDL type & value from the JSON's intrinsic types. This isn't ideal; for example, Files can't be distinguished from Strings, and JSON lists and dicts with heterogeneous item types may give undefined results. :raise WDL.Error.InputError: if the given value isn't coercible to the specified type """ if isinstance(type, Type.Any): return _infer_from_json(value) if isinstance(type, (Type.Boolean, Type.Any)) and value in [True, False]: return Boolean(value) if isinstance(type, (Type.Int, Type.Any)) and isinstance(value, int): return Int(value) if isinstance(type, (Type.Float, Type.Any)) and isinstance(value, (float, int)): return Float(float(value)) if isinstance(type, Type.File) and isinstance(value, str): return File(value) if isinstance(type, Type.Directory) and isinstance(value, str): return Directory(value) if isinstance(type, (Type.String, Type.Any)) and isinstance(value, str): return String(value) if isinstance(type, Type.Array) and isinstance(value, list): return Array(type.item_type, [from_json(type.item_type, item) for item in value]) if isinstance(type, Type.Pair) and isinstance(value, dict) and set(value) == {"left", "right"}: return Pair( type.left_type, type.right_type, (from_json(type.left_type, value["left"]), from_json(type.right_type, value["right"])), ) if ( isinstance(type, Type.Map) and Type.String().coerces(type.item_type[0]) and isinstance(value, dict) ): items = [] for k, v in value.items(): assert isinstance(k, str) items.append((String(k).coerce(type.item_type[0]), from_json(type.item_type[1], v))) return Map(type.item_type, items) if isinstance(type, Type.StructInstance) and isinstance(value, dict) and type.members: for k, ty in type.members.items(): if k not in value and not ty.optional: raise Error.InputError( f"initializer for struct {str(type)} omits required field(s)" ) items = {} extra = set() for k, v in value.items(): assert isinstance(k, str) if k not in type.members: extra.add(k) else: try: items[k] = from_json(type.members[k], v) except Error.InputError: raise Error.InputError( f"couldn't initialize struct {str(type)} {type.members[k]} {k} from {json.dumps(v)}" ) from None return Struct(Type.Object(type.members), items, extra=extra) if type.optional and value is None: return Null() raise Error.InputError(f"couldn't construct {str(type)} from {json.dumps(value)}")
def _infer_from_json(j: Any) -> Base: if isinstance(j, str): return String(j) if isinstance(j, bool): return Boolean(j) if isinstance(j, int): return Int(j) if isinstance(j, float): return Float(j) if j is None: return Null() # compound: don't yet try to infer unified types for nested values, since we expect a coercion # to a StructInstance type to follow in short order, providing the expected item/member types if isinstance(j, list): return Array(Type.Any(), [_infer_from_json(v) for v in j]) if isinstance(j, dict): members = {} member_types = {} for k in j: assert isinstance(k, str) members[k] = _infer_from_json(j[k]) member_types[k] = members[k].type return Struct(Type.Object(member_types), members) raise Error.InputError(f"couldn't construct value from: {json.dumps(j)}")
[docs]def rewrite_paths(v: Base, f: Callable[[Union[File, Directory]], Optional[str]]) -> Base: """ Produce a deep copy of the given Value with all File & Directory paths (including those nested inside compound Values) rewritten by the given function. The function may return None to replace the File/Directory value with None/Null. """ def map_paths(w: Base) -> Base: w = copy.copy(w) if isinstance(w, (File, Directory)): fw = f(w) if fw is None: return Null(expr=w.expr) w.value = fw # recursive descent into compound Values elif isinstance(w.value, list): value2 = [] for elt in w.value: if isinstance(elt, tuple): assert len(elt) == 2 and sum(1 for x in elt if not isinstance(x, Base)) == 0 value2.append((map_paths(elt[0]), map_paths(elt[1]))) else: assert isinstance(elt, Base) value2.append(map_paths(elt)) w.value = value2 elif isinstance(w.value, tuple): assert len(w.value) == 2 and sum(1 for x in w.value if not isinstance(x, Base)) == 0 w.value = (map_paths(w.value[0]), map_paths(w.value[1])) elif isinstance(w.value, dict): value2 = {} for key in w.value: assert isinstance(key, str) and isinstance(w.value[key], Base) value2[key] = map_paths(w.value[key]) w.value = value2 else: assert w.value is None or isinstance(w.value, (int, float, bool, str)) return w return map_paths(v)
[docs]def rewrite_env_paths( env: Env.Bindings[Base], f: Callable[[Union[File, Directory]], Optional[str]] ) -> Env.Bindings[Base]: """ Produce a deep copy of the given Value Env with all File & Directory paths rewritten by the given function. """ return env.map(lambda binding: Env.Binding(binding.name, rewrite_paths(binding.value, f)))
[docs]def rewrite_files(v: Base, f: Callable[[str], Optional[str]]) -> Base: """ Produce a deep copy of the given Value with all File names rewritten by the given function (including Files nested inside compound Values). (deprecated: use ``rewrite_paths`` to handle Directory values as well) """ return rewrite_paths(v, lambda fd: f(fd.value) if isinstance(fd, File) else fd.value)
[docs]def rewrite_env_files( env: Env.Bindings[Base], f: Callable[[str], Optional[str]] ) -> Env.Bindings[Base]: """ Produce a deep copy of the given Value Env with all File names rewritten by the given function. (deprecated: use ``rewrite_env_paths`` to handle Directory values as well) """ return env.map(lambda binding: Env.Binding(binding.name, rewrite_files(binding.value, f)))
[docs]def digest_env(env: Env.Bindings[Base]) -> str: """ Digest the Value Env, for use e.g. as a cache key. The digest is an opaque string of a few dozen alphanumeric characters. """ from . import values_to_json env_json = json.dumps(values_to_json(env), separators=(",", ":"), sort_keys=True) # pyre-ignore sha256 = hashlib.sha256(env_json.encode("utf-8")).digest() return base64.b32encode(sha256[:20]).decode().lower()