Source code for WDL.Zip

"""
Routines for packaging a WDL source file, with all imported source files, into a ZIP file.

*New in v1.5.0*
"""
import io
import os
import json
import pathlib
import shutil
import logging
import tarfile
import tempfile
import contextlib
import zipfile
from typing import List, Dict, Optional, Any, Iterator, NamedTuple, Tuple

from . import Tree, Error
from ._util import path_really_within


[docs]def build( top_doc: Tree.Document, archive: str, logger: logging.Logger, inputs: Optional[Dict[str, Any]] = None, meta: Optional[Dict[str, Any]] = None, archive_format: str = "zip", additional_files: Optional[List[str]] = None, ): """ Generate zip archive of the WDL document, all its imports, optional default inputs, and a generated manifest JSON. If imports are drawn from outside the main WDL's directory (or by URI), they'll be stored in a special subdirectory and import statements will be rewritten to match. """ with contextlib.ExitStack() as cleanup: # write WDL source code to temp directory dir_to_zip = build_source_dir(cleanup, top_doc, logger) # add MANIFEST.json; schema roughly following Amazon Genomics CLI's: # https://aws.github.io/amazon-genomics-cli/docs/concepts/workflows/#multi-file-workflows manifest = {"mainWorkflowURL": os.path.basename(top_doc.pos.abspath)} if meta: manifest["meta"] = meta if inputs: manifest["inputFileURLs"] = ["default_input.json"] with open(os.path.join(dir_to_zip, "default_input.json"), "w") as inputs_file: json.dump(inputs, inputs_file, indent=2) with open(os.path.join(dir_to_zip, "MANIFEST.json"), "w") as manifest_file: json.dump(manifest, manifest_file, indent=2) logger.debug("manifest = " + json.dumps(manifest)) if additional_files: logger.debug(f"Additional files: {additional_files}") for file in additional_files: dest_path = os.path.join(dir_to_zip, os.path.basename(file)) if os.path.exists(dest_path): raise FileExistsError(f"Additional file overwrites existing path: {dest_path}") shutil.copy(file, dest_path) # zip the temp directory (into another temp directory) spool_dir = cleanup.enter_context(tempfile.TemporaryDirectory(prefix="miniwdl_zip_")) spool_zip = os.path.join(spool_dir, os.path.basename(archive)) logger.info(f"Prepare archive {spool_zip} from directory {dir_to_zip}") create_reproducible_archive(dir_to_zip, spool_zip, archive_format) # move into final location (hopefully atomic) dirname = os.path.dirname(archive) if dirname: os.makedirs(dirname, exist_ok=True) logger.info(f"Move archive to destination {archive}") shutil.move(spool_zip, archive)
def build_source_dir( cleanup: contextlib.ExitStack, top_doc: Tree.Document, logger: logging.Logger ) -> str: # directory of main WDL file (possibly URI) main_dir = os.path.dirname(top_doc.pos.abspath).rstrip("/") + "/" # collect all WDL docs keyed by abspath wdls = {} queue = [top_doc] while queue: a_doc = queue.pop() for imported_doc in a_doc.imports: queue.append(imported_doc.doc) wdls[a_doc.pos.abspath] = a_doc # derive archive paths zip_paths = build_zip_paths(main_dir, wdls, logger) assert sorted(list(zip_paths.keys())) == sorted(list(wdls.keys())) assert zip_paths[top_doc.pos.abspath] == os.path.basename(top_doc.pos.abspath) # write source files into temp directory (rewriting imports as needed) zip_dir = cleanup.enter_context(tempfile.TemporaryDirectory(prefix="miniwdl_zip_")) for abspath, a_doc in wdls.items(): source_lines = rewrite_imports(a_doc, zip_paths, logger) fn = os.path.join(zip_dir, zip_paths[abspath]) os.makedirs(os.path.dirname(fn), exist_ok=True) with open(fn, "w") as outfile: for line in source_lines: print(line, file=outfile) return zip_dir def build_zip_paths( main_dir: str, wdls: Dict[str, Tree.Document], logger: logging.Logger ) -> Dict[str, str]: # compute the path inside the archive at which to store each document import hashlib import base64 ans = {} outside_warn = False for abspath in wdls.keys(): if abspath.startswith(main_dir): ans[abspath] = os.path.relpath(abspath, main_dir) else: # place outside import under __outside_wdl, vaguely reproducing directory structure abspath2 = abspath.replace("://", "_") prefix = os.path.commonprefix([abspath2, main_dir.replace("://", "_")]) if prefix and not prefix.endswith("/"): prefix = os.path.dirname(prefix) + "/" ans[abspath] = "__outside_wdl/" + abspath2[len(prefix) :] outside_warn = True logger.info(f"{ans[abspath]} <= {abspath}") if outside_warn: logger.warning( "One or more source files are imported from outside the top-level WDL's directory." " The source archive will store them under __outside_wdl/" " and WDL import statements will be rewritten to match." ) return ans def rewrite_imports( doc: Tree.Document, zip_paths: Dict[str, str], logger: logging.Logger ) -> List[str]: # rewrite doc source_lines, changing import statements to refer to relative path in zip source_lines = doc.source_lines.copy() for imp in doc.imports: lo = imp.pos.line - 1 hi = imp.pos.end_line found = False for lineno in range(lo, hi): line = source_lines[lineno] old_uri = imp.uri new_uri = os.path.relpath( zip_paths[imp.doc.pos.abspath], os.path.dirname(zip_paths[doc.pos.abspath]) ) for quot in ('"', "'"): old_uri_pattern = f"{quot}{old_uri}{quot}" if old_uri_pattern in line: assert quot not in new_uri found = True line2 = line.replace(old_uri_pattern, f"{quot}{new_uri}{quot}") if line != line2: logger.debug(doc.pos.abspath) logger.debug(" " + line) logger.debug(" => " + line2) source_lines[lineno] = line2 assert found return source_lines def create_reproducible_archive(zip_dir: str, output_path: str, format: str): # write zip/tar archive with internal filenames lexicographically-ordered and all timestamps # set to an arbitrary constant src_dest_list = [ (path, path.relative_to(zip_dir)) for path in pathlib.Path(zip_dir).glob("**/*") # Finds all files recursively if path.is_file() or path.is_symlink() # Symlinks will be included in the zip as normal files ] # Sort paths by destination src_dest_list.sort(key=lambda x: x[1]) if format == "zip": _write_no_mtime_zip(output_path, src_dest_list) elif format == "tar": _write_no_mtime_tar(output_path, src_dest_list) else: raise ValueError(f"Unknown format: {format}") return output_path def _write_no_mtime_zip(zip_archive: str, src_dest_list: List[Tuple[pathlib.Path, pathlib.Path]]): with zipfile.ZipFile(zip_archive, "w") as archive: for src, dest in src_dest_list: # This always sets the mod time at 1980-1-1 dest_info = zipfile.ZipInfo(str(dest)) with archive.open(dest_info, "w") as archive_file: with open(src, "rb") as in_file: while True: block = in_file.read(io.DEFAULT_BUFFER_SIZE) if not block: break archive_file.write(block) def _write_no_mtime_tar(tar_archive: str, src_dest_list: List[Tuple[pathlib.Path, pathlib.Path]]): with tarfile.TarFile(tar_archive, "w") as archive: for src, dest in src_dest_list: dest_info = tarfile.TarInfo(str(dest)) # Mtime by default at 0 dest_info.size = os.stat(src).st_size with open(src, "rb") as in_file: archive.addfile(dest_info, in_file) UnpackedZip = NamedTuple( "UnpackedZip", [("dir", str), ("main_wdl", str), ("input_file", Optional[str])] ) """ Contextual value of `WDL.Zip.unpack()`: absolute paths of source directory, main WDL, and default input JSON file (if any). The source directory prefixes the latter paths. """
[docs]@contextlib.contextmanager def unpack(archive_fn: str) -> Iterator[UnpackedZip]: """ Open a context with the WDL source archive unpacked into a temp directory, yielding `UnpackedZip`. The temp directory will be deleted on context exit. A path to the MANIFEST.json of an already-unpacked source archive may also be used, or a directory containing one. In this case, it is NOT deleted on context exit. :: with WDL.Zip.unpack("/path/to/source.zip") as unpacked: doc = WDL.load(unpacked.main_wdl) ... """ with contextlib.ExitStack() as cleanup: # extract zip if needed (also allowing use of already-extracted manifest/dir) if os.path.isdir(archive_fn): archive_fn = os.path.join(archive_fn, "MANIFEST.json") if os.path.basename(archive_fn) == "MANIFEST.json": manifest_fn = archive_fn else: dn = cleanup.enter_context(tempfile.TemporaryDirectory(prefix="miniwdl_run_zip_")) try: shutil.unpack_archive(archive_fn, dn) except: raise Error.InputError("Unreadable source archive " + archive_fn) manifest_fn = os.path.join(dn, "MANIFEST.json") try: with open(manifest_fn) as infile: manifest = json.load(infile) assert isinstance(manifest, dict) and isinstance( manifest.get("mainWorkflowURL", None), str ) except: raise Error.InputError("Missing or invalid MANIFEST.json in " + archive_fn) dn = os.path.abspath(os.path.dirname(manifest_fn)) main_wdl = manifest["mainWorkflowURL"] input_file = None if ( isinstance(manifest.get("inputFileURLs", None), list) and manifest["inputFileURLs"] and isinstance(manifest["inputFileURLs"][0], str) ): input_file = manifest["inputFileURLs"][0] # sanity check main_wdl_abs = os.path.join(dn, main_wdl) input_file_abs = os.path.join(dn, input_file) if input_file else None if not (os.path.isfile(main_wdl_abs) and path_really_within(main_wdl_abs, dn)) or ( input_file_abs and not (os.path.isfile(input_file_abs) and path_really_within(input_file_abs, dn)) ): raise Error.InputError( "MANIFEST.json refers to missing or invalid files in " + archive_fn ) yield UnpackedZip(dn, main_wdl_abs, input_file_abs)