"""
WDL values instantiated at runtime
Each value is represented by an instance of a Python class inheriting from
``WDL.Value.Base``.
.. inheritance-diagram:: WDL.Value
:top-classes: WDL.Value.Base
"""
import json
import copy
import base64
import hashlib
from abc import ABC
from typing import Any, List, Optional, Tuple, Dict, Iterable, Union, Callable, Set, TYPE_CHECKING
from contextlib import suppress
from . import Error, Type, Env
if TYPE_CHECKING:
from . import Expr
[docs]class Base(ABC):
"""The abstract base class for WDL values"""
type: Type.Base
":type: WDL.Type.Base"
value: Any
"""The "raw" Python value"""
_expr: "Optional[Expr.Base]"
def __init__(self, type: Type.Base, value: Any, expr: "Optional[Expr.Base]" = None) -> None:
assert isinstance(type, Type.Base)
self.type = type
if self.type.optional and not isinstance(self, Null):
self.type = self.type.copy(optional=False) # normalize runtime type
self.value = value
self._expr = None
if expr:
self.expr = expr
def __eq__(self, other) -> bool:
# nb: assumes static typechecking has ensured it's sensible to test these for equality
assert isinstance(other, Base) and self.type.equatable(other.type), (
f"cannot equate {self.type} {self} with {other.type} {other}"
)
return self.value == other.value
def __str__(self) -> str:
return json.dumps(self.json)
@property
def expr(self) -> "Optional[Expr.Base]":
"""
Reference to the WDL expression that generated this value, if it originated
from ``WDL.Expr.eval``
"""
return self._expr
@expr.setter
def expr(self, rhs: "Expr.Base"):
old_expr = self._expr # possibly None
if rhs is not old_expr:
self._expr = rhs
# recursively replace old_expr in children
stack = [ch for ch in self.children]
while stack:
desc = stack.pop()
if desc.expr is old_expr:
desc._expr = rhs
stack.extend(desc2 for desc2 in desc.children)
[docs] def coerce(self, desired_type: Optional[Type.Base] = None) -> "Base":
"""
Coerce the value to the desired type and return it. Types should be
checked statically on ``WDL.Expr.Base`` prior to evaluation.
:raises: ReferenceError for a null value and non-optional type
"""
if isinstance(desired_type, Type.String):
return String(str(self), self.expr)
if isinstance(desired_type, Type.Array) and self.type.coerces(
desired_type.item_type, check_quant=False
):
# coercion of T to Array[T] (x to [x])
# if self is an Array, then Array.coerce precludes this path
return Array(desired_type, [self.coerce(desired_type.item_type)], self.expr)
if desired_type and not self.type.coerces(desired_type):
# owing to static type-checking, this path should arise only rarely e.g. read_json()
raise Error.InputError(f"cannot coerce {str(self.type)} to {str(desired_type)}")
return self
[docs] def expect(self, desired_type: Optional[Type.Base] = None) -> "Base":
"""Alias for coerce"""
return self.coerce(desired_type)
@property
def json(self) -> Any:
"""Return a value representation which can be serialized to JSON using ``json.dumps``"""
"""(str, int, float, list, dict, or null)"""
return self.value
@property
def children(self) -> "Iterable[Base]":
return []
[docs]class Boolean(Base):
"""``value`` has Python type ``bool``"""
def __init__(self, value: bool, expr: "Optional[Expr.Base]" = None) -> None:
super().__init__(Type.Boolean(), value, expr)
[docs]class Float(Base):
"""``value`` has Python type ``float``"""
def __init__(self, value: float, expr: "Optional[Expr.Base]" = None) -> None:
super().__init__(Type.Float(), value, expr)
def __str__(self) -> str:
return "{:.6f}".format(self.value)
[docs]class Int(Base):
"""``value`` has Python type ``int``"""
def __init__(self, value: int, expr: "Optional[Expr.Base]" = None) -> None:
super().__init__(Type.Int(), value, expr)
def coerce(self, desired_type: Optional[Type.Base] = None) -> Base:
""""""
if isinstance(desired_type, Type.Float):
return Float(float(self.value), self.expr)
return super().coerce(desired_type)
[docs]class String(Base):
"""``value`` has Python type ``str``"""
def __init__(
self, value: str, expr: "Optional[Expr.Base]" = None, subtype: Optional[Type.Base] = None
) -> None:
subtype = subtype or Type.String()
super().__init__(subtype, value, expr)
def coerce(self, desired_type: Optional[Type.Base] = None) -> Base:
""""""
if isinstance(desired_type, Type.String):
return String(self.value, self.expr)
if isinstance(desired_type, Type.File) and not isinstance(self, File):
return File(self.value, self.expr)
if isinstance(desired_type, Type.Directory) and not isinstance(self, Directory):
return Directory(self.value, self.expr)
try:
if isinstance(desired_type, Type.Int):
return Int(int(self.value), self.expr)
if isinstance(desired_type, Type.Float):
return Float(float(self.value), self.expr)
except ValueError as exn:
msg = f"coercing String to {desired_type}: {exn}"
raise Error.EvalError(self.expr, msg) if self.expr else Error.RuntimeError(msg)
return super().coerce(desired_type)
[docs]class File(String):
"""``value`` has Python type ``str``"""
def __init__(self, value: str, expr: "Optional[Expr.Base]" = None) -> None:
super().__init__(value, expr=expr, subtype=Type.File())
if value != value.rstrip("/"):
raise Error.InputError("WDL.Value.File invalid path: " + value)
[docs]class Directory(String):
"""``value`` has Python type ``str``"""
def __init__(self, value: str, expr: "Optional[Expr.Base]" = None) -> None:
super().__init__(value, expr=expr, subtype=Type.Directory())
[docs]class Array(Base):
"""``value`` is a Python ``list`` of other ``WDL.Value.Base`` instances"""
value: List[Base]
type: Type.Array
def __init__(
self, item_type: Type.Base, value: List[Base], expr: "Optional[Expr.Base]" = None
) -> None:
self.value = []
self.type = Type.Array(item_type, nonempty=(len(value) > 0))
super().__init__(self.type, value, expr)
@property
def json(self) -> Any:
""""""
return [item.json for item in self.value]
def __str__(self) -> Any:
# nb: this is NOT json.dumps(self.json) because it applies item __str__ overrides
return "[" + ", ".join(str(item) for item in self.value) + "]"
@property
def children(self) -> Iterable[Base]:
return self.value
def coerce(self, desired_type: Optional[Type.Base] = None) -> Base:
""""""
if isinstance(desired_type, Type.Array):
if desired_type.nonempty and not self.value:
if self.expr:
raise Error.EmptyArray(self.expr)
else:
raise ValueError("Empty array for Array+ input/declaration")
if desired_type.item_type == self.type.item_type or (
isinstance(desired_type.item_type, Type.Any)
):
return self
return Array(
desired_type.item_type,
[v.coerce(desired_type.item_type) for v in self.value],
self.expr,
)
return super().coerce(desired_type)
[docs]class Map(Base):
value: List[Tuple[Base, Base]]
type: Type.Map
def __init__(
self,
item_type: Tuple[Type.Base, Type.Base],
value: List[Tuple[Base, Base]],
expr: "Optional[Expr.Base]" = None,
) -> None:
self.value = []
self.type = Type.Map(item_type)
super().__init__(self.type, value, expr)
@property
def json(self) -> Any:
""""""
ans = {}
if not self.type.item_type[0].coerces(Type.String()):
msg = f"cannot write {str(self.type)} to JSON"
raise (Error.EvalError(self.expr, msg) if self.expr else Error.RuntimeError(msg))
for k, v in self.value:
kstr = k.coerce(Type.String()).value
if kstr not in ans:
ans[kstr] = v.json
return ans
def __str__(self) -> Any:
items = {}
for k, v in self.value:
items[str(k)] = str(v)
return "{" + ", ".join(f"{k}: {v}" for k, v in items.items()) + "}"
@property
def children(self) -> Iterable[Base]:
for k, v in self.value:
yield k
yield v
def coerce(self, desired_type: Optional[Type.Base] = None) -> Base:
""""""
if isinstance(desired_type, Type.Map) and desired_type != self.type:
return Map(
desired_type.item_type,
[
(k.coerce(desired_type.item_type[0]), v.coerce(desired_type.item_type[1]))
for (k, v) in self.value
],
self.expr,
)
if isinstance(desired_type, Type.StructInstance):
# Runtime typecheck for initializing struct from read_{object,objects,map}
# This couldn't have been checked statically because the map keys weren't known.
assert self.type.item_type[0].coerces(Type.String())
try:
Type.Map(
self.type.item_type,
self.type.optional,
set(kv[0].coerce(Type.String()).value for kv in self.value),
).check(desired_type)
except TypeError as exn:
msg = "unusable runtime struct initializer"
if exn.args:
msg += ", " + exn.args[0]
raise (
Error.EvalError(
self.expr,
msg,
)
if self.expr
else Error.RuntimeError(msg)
)
assert desired_type.members
# coerce to desired member types
ans = {}
for k, v in self.value:
ks = k.coerce(Type.String()).value
try:
ans[ks] = v.coerce(desired_type.members[ks])
except Error.RuntimeError as exc:
# some coercions that typecheck could still fail, e.g. String to Int
msg = (
"runtime type mismatch initializing "
f"{desired_type.members[ks]} {ks} member of struct {desired_type.type_name}"
) + ((": " + exc.args[0]) if exc.args else "")
raise (
Error.EvalError(
self.expr,
msg,
)
if self.expr
else Error.RuntimeError(msg)
)
return Struct(desired_type, ans, self.expr)
return super().coerce(desired_type)
[docs]class Pair(Base):
value: Tuple[Base, Base]
type: Type.Pair
def __init__(
self,
left_type: Type.Base,
right_type: Type.Base,
value: Tuple[Base, Base],
expr: "Optional[Expr.Base]" = None,
) -> None:
self.value = value
self.type = Type.Pair(left_type, right_type)
super().__init__(self.type, value, expr)
def __str__(self) -> str:
assert isinstance(self.value, tuple)
return "(" + str(self.value[0]) + "," + str(self.value[1]) + ")"
@property
def json(self) -> Any:
""""""
return {"left": self.value[0].json, "right": self.value[1].json}
@property
def children(self) -> Iterable[Base]:
yield self.value[0]
yield self.value[1]
def coerce(self, desired_type: Optional[Type.Base] = None) -> Base:
""""""
if isinstance(desired_type, Type.Pair) and desired_type != self.type:
return Pair(
desired_type.left_type,
desired_type.right_type,
(
self.value[0].coerce(desired_type.left_type),
self.value[1].coerce(desired_type.right_type),
),
self.expr,
)
return super().coerce(desired_type)
[docs]class Null(Base):
"""Represents the missing value which optional inputs may take.
``type`` and ``value`` are both None."""
def __init__(self, expr: "Optional[Expr.Base]" = None) -> None:
super().__init__(Type.Any(null=True), None, expr)
def coerce(self, desired_type: Optional[Type.Base] = None) -> Base:
""""""
if desired_type and not desired_type.optional and not isinstance(desired_type, Type.Any):
if isinstance(desired_type, (Type.File, Type.Directory)):
# This case arises processing task outputs; we convert nonexistent paths to Null
# before coercing to the declared output type (+ checking whether it's optional).
raise FileNotFoundError()
# normally the typechecker should prevent the following cases, but it might have had
# check_quant=False
if isinstance(desired_type, Type.String):
return String("", self.expr)
if isinstance(desired_type, Type.Array) and desired_type.item_type.optional:
return Array(desired_type, [self.coerce(desired_type.item_type)], self.expr)
if self.expr:
raise Error.NullValue(self.expr)
raise Error.InputError("'None' for non-optional input/declaration")
return self
def __str__(self) -> str:
return "None"
@property
def json(self) -> Any:
""""""
return None
[docs]class Struct(Base):
value: Dict[str, Base]
# records the names of any extraneous keys that were present in the JSON/Map/Object from which
# this struct was initialized
extra: Set[str]
def __init__(
self,
type: Union[Type.Object, Type.StructInstance],
value: Dict[str, Base],
expr: "Optional[Expr.Base]" = None,
extra: Optional[Set[str]] = None,
) -> None:
# type may be Object for the transient evaluation of an object literal or read_json(); we
# expect it to be coerced to a StructInstance in short order.
value = dict(value)
if isinstance(type, Type.StructInstance):
# fill in null for any omitted optional members
assert type.members
for k in type.members:
if k not in value:
assert type.members[k].optional
value[k] = Null()
self.value = value
self.extra = extra or set()
super().__init__(type, value, expr)
def coerce(self, desired_type: Optional[Type.Base] = None) -> Base:
""""""
if isinstance(desired_type, Type.StructInstance):
return self._coerce_to_struct(desired_type)
if isinstance(desired_type, Type.Map):
return self._coerce_to_map(desired_type)
if not isinstance(desired_type, (Type.Any, Type.Object)):
self._eval_error(f"cannot coerce struct to {desired_type}")
# Object coercion is a no-op because we expect a further coercion to StructInstance to
# follow in short order, providing the expected member types.
return self
def _coerce_to_struct(self, desired_type: Type.StructInstance) -> Base:
assert desired_type.members
if isinstance(self.type, Type.StructInstance) and self.type.type_id == desired_type.type_id:
return self
try:
# Runtime typecheck for initializing StructInstance from read_json(), where the
# Object type isn't known until runtime
self.type.check(desired_type)
except TypeError as exn:
msg = "unusable runtime struct initializer"
if exn.args:
msg += ", " + exn.args[0]
self._eval_error(msg)
# coerce to desired member types
members = {}
extra = set()
for k in self.value:
if k not in desired_type.members:
extra.add(k)
else:
try:
members[k] = self.value[k].coerce(desired_type.members[k])
except Error.RuntimeError as exc:
# some coercions that typecheck could still fail, e.g. String to Int; note the
# offending member, taking care not to obscure it if the struct is nested
msg = ""
if exc.args:
if "member of struct" in exc.args[0]:
raise
msg = ": " + exc.args[0]
msg = (
"runtime type mismatch initializing "
f"{desired_type.members[k]} {k} member of struct {desired_type.type_name}"
) + msg
self._eval_error(msg)
return Struct(desired_type, members, expr=self.expr, extra=extra)
def _coerce_to_map(self, desired_type: Type.Map) -> Map:
# runtime coercion e.g. Map[String,String] foo = read_json("foo.txt")
assert isinstance(self.type, Type.Object)
key_type = desired_type.item_type[0]
if not Type.String().coerces(key_type):
self._eval_error(f"cannot coerce struct member names to {desired_type} keys")
value_type = desired_type.item_type[1]
entries = []
for k, v in self.value.items():
if not isinstance(v, Null) or value_type.optional:
map_key = None
map_value = None
try:
map_key = String(k).coerce(key_type)
except Error.RuntimeError:
self._eval_error(f"cannot coerce struct member name {k} to {desired_type} key")
if self.type.members[k].coerces(value_type):
with suppress(Error.RuntimeError):
map_value = v.coerce(value_type)
if map_value is None:
self._eval_error(
"cannot coerce struct member"
f" {self.type.members[k]} {k} to {value_type} map value"
)
assert map_key and map_value
entries.append((map_key, map_value))
return Map(desired_type.item_type, entries)
def _eval_error(self, msg: str) -> None:
raise (
Error.EvalError(
self.expr,
msg,
)
if self.expr
else Error.RuntimeError(msg)
) from None
def __str__(self) -> Any:
return "{" + ", ".join(f"{k}: {str(v)}" for k, v in self.value.items()) + "}"
@property
def json(self) -> Any:
""""""
ans = {}
for k, v in self.value.items():
ans[k] = v.json
return ans
@property
def children(self) -> Iterable[Base]:
return self.value.values()
[docs]def from_json(type: Type.Base, value: Any) -> Base:
"""
Instantiate a WDL value of the specified type from a parsed JSON value (str, int, float, list,
dict, or null).
If type is :class:`WDL.Type.Any()`, attempts to infer a WDL type & value from the JSON's
intrinsic types. This isn't ideal; for example, Files can't be distinguished from Strings, and
JSON lists and dicts with heterogeneous item types may give undefined results.
:raise WDL.Error.InputError: if the given value isn't coercible to the specified type
"""
if isinstance(type, Type.Any):
return _infer_from_json(value)
if isinstance(type, (Type.Boolean, Type.Any)) and value in [True, False]:
return Boolean(value)
if isinstance(type, (Type.Int, Type.Any)) and isinstance(value, int):
return Int(value)
if isinstance(type, (Type.Float, Type.Any)) and isinstance(value, (float, int)):
return Float(float(value))
if isinstance(type, Type.File) and isinstance(value, str):
return File(value)
if isinstance(type, Type.Directory) and isinstance(value, str):
return Directory(value)
if isinstance(type, (Type.String, Type.Any)) and isinstance(value, str):
return String(value)
if isinstance(type, Type.Array) and isinstance(value, list):
return Array(type.item_type, [from_json(type.item_type, item) for item in value])
if isinstance(type, Type.Pair) and isinstance(value, dict) and set(value) == {"left", "right"}:
return Pair(
type.left_type,
type.right_type,
(from_json(type.left_type, value["left"]), from_json(type.right_type, value["right"])),
)
if (
isinstance(type, Type.Map)
and Type.String().coerces(type.item_type[0])
and isinstance(value, dict)
):
items = []
for k, v in value.items():
assert isinstance(k, str)
items.append((String(k).coerce(type.item_type[0]), from_json(type.item_type[1], v)))
return Map(type.item_type, items)
if isinstance(type, Type.StructInstance) and isinstance(value, dict) and type.members:
for k, ty in type.members.items():
if k not in value and not ty.optional:
raise Error.InputError(
f"initializer for struct {str(type)} omits required field(s)"
)
members = {}
extra = set()
for k, v in value.items():
assert isinstance(k, str)
if k not in type.members:
extra.add(k)
else:
try:
members[k] = from_json(type.members[k], v)
except Error.InputError:
raise Error.InputError(
f"couldn't initialize struct {str(type)} {type.members[k]} {k} from {json.dumps(v)}"
) from None
# Struct.__init__ will populate null for any omitted optional members
return Struct(type, members, extra=extra)
if type.optional and value is None:
return Null()
raise Error.InputError(f"couldn't construct {str(type)} from {json.dumps(value)}")
def _infer_from_json(j: Any) -> Base:
if isinstance(j, str):
return String(j)
if isinstance(j, bool):
return Boolean(j)
if isinstance(j, int):
return Int(j)
if isinstance(j, float):
return Float(j)
if j is None:
return Null()
# compound: don't yet try to infer unified types for nested values, since we expect a coercion
# to a StructInstance type to follow in short order, providing the expected item/member types
if isinstance(j, list):
return Array(Type.Any(), [_infer_from_json(v) for v in j])
if isinstance(j, dict):
members = {}
member_types = {}
for k in j:
assert isinstance(k, str)
members[k] = _infer_from_json(j[k])
member_types[k] = members[k].type
return Struct(Type.Object(member_types), members)
raise Error.InputError(f"couldn't construct value from: {json.dumps(j)}")
[docs]def rewrite_paths(v: Base, f: Callable[[Union[File, Directory]], Optional[str]]) -> Base:
"""
Produce a deep copy of the given Value with all File & Directory paths (including those nested
inside compound Values) rewritten by the given function. The function may return None to
replace the File/Directory value with None/Null.
"""
def map_paths(w: Base) -> Base:
w = copy.copy(w)
if isinstance(w, (File, Directory)):
fw = f(w)
if fw is None:
return Null(expr=w.expr)
w.value = fw
# recursive descent into compound Values
elif isinstance(w.value, list):
value2: List[Any] = []
for elt in w.value:
if isinstance(elt, tuple):
assert len(elt) == 2 and all(isinstance(x, Base) for x in elt)
value2.append((map_paths(elt[0]), map_paths(elt[1])))
else:
assert isinstance(elt, Base)
value2.append(map_paths(elt))
w.value = value2
elif isinstance(w.value, tuple):
assert len(w.value) == 2 and sum(1 for x in w.value if not isinstance(x, Base)) == 0
w.value = (map_paths(w.value[0]), map_paths(w.value[1]))
elif isinstance(w.value, dict):
value3 = {}
for key in w.value:
assert isinstance(key, str) and isinstance(w.value[key], Base)
value3[key] = map_paths(w.value[key])
w.value = value3
else:
assert w.value is None or isinstance(w.value, (int, float, bool, str))
return w
return map_paths(v)
[docs]def rewrite_env_paths(
env: Env.Bindings[Base], f: Callable[[Union[File, Directory]], Optional[str]]
) -> Env.Bindings[Base]:
"""
Produce a deep copy of the given Value Env with all File & Directory paths rewritten by the
given function.
"""
return env.map(lambda binding: Env.Binding(binding.name, rewrite_paths(binding.value, f)))
[docs]def rewrite_files(v: Base, f: Callable[[str], Optional[str]]) -> Base:
"""
Produce a deep copy of the given Value with all File names rewritten by the given function
(including Files nested inside compound Values).
(deprecated: use ``rewrite_paths`` to handle Directory values as well)
"""
return rewrite_paths(v, lambda fd: f(fd.value) if isinstance(fd, File) else fd.value)
[docs]def rewrite_env_files(
env: Env.Bindings[Base], f: Callable[[str], Optional[str]]
) -> Env.Bindings[Base]:
"""
Produce a deep copy of the given Value Env with all File names rewritten by the given function.
(deprecated: use ``rewrite_env_paths`` to handle Directory values as well)
"""
return env.map(lambda binding: Env.Binding(binding.name, rewrite_files(binding.value, f)))
[docs]def digest_env(env: Env.Bindings[Base]) -> str:
"""
Digest the Value Env, for use e.g. as a cache key. The digest is an opaque string of a few
dozen alphanumeric characters.
"""
from . import values_to_json
env_json = json.dumps(values_to_json(env), separators=(",", ":"), sort_keys=True)
sha256 = hashlib.sha256(env_json.encode("utf-8")).digest()
return base64.b32encode(sha256[:20]).decode().lower()