Created
July 23, 2017 12:28
-
-
Save tetron/accd49db26ba8e93dafd36ec631ae4e8 to your computer and use it in GitHub Desktop.
Revisions
-
tetron created this gist
Jul 23, 2017 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,811 @@ # Implement support for Common Workflow Language (CWL) for Toil. # # Copyright (C) 2015 Curoverse, Inc # Copyright (C) 2016 UCSC Computational Genomics Lab # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. from toil.job import Job from toil.common import Toil from toil.version import baseVersion from toil.lib.bioio import setLoggingFromOptions from argparse import ArgumentParser import cwltool.errors import cwltool.load_tool import cwltool.main import cwltool.workflow import cwltool.expression import cwltool.builder import cwltool.resolver import cwltool.stdfsaccess from cwltool.pathmapper import adjustFiles from cwltool.process import shortname, adjustFilesWithSecondary, fillInDefaults, compute_checksums from cwltool.utils import aslist import schema_salad.validate as validate import schema_salad.ref_resolver import os import tempfile import json import sys import logging import copy import functools # Python 3 compatibility imports from six.moves import xrange from six import iteritems, string_types import six.moves.urllib.parse as urlparse cwllogger = logging.getLogger("cwltool") # The job object passed into CWLJob and CWLWorkflow # is a dict mapping to tuple of (key, dict) # the final dict is derived by evaluating each # tuple looking up the key in the supplied dict. # # This is necessary because Toil jobs return a single value (a dict) # but CWL permits steps to have multiple output parameters that may # feed into multiple other steps. This transformation maps the key in the # output object to the correct key of the input object. class IndirectDict(dict): pass class MergeInputs(object): def __init__(self, sources): self.sources = sources def resolve(self): raise NotImplementedError() class MergeInputsNested(MergeInputs): def resolve(self): return [v[1][v[0]] for v in self.sources] class MergeInputsFlattened(MergeInputs): def resolve(self): r = [] for v in self.sources: v = v[1][v[0]] if isinstance(v, list): r.extend(v) else: r.append(v) return r class StepValueFrom(object): def __init__(self, expr, inner, req): self.expr = expr self.inner = inner self.req = req def do_eval(self, inputs, ctx): return cwltool.expression.do_eval(self.expr, inputs, self.req, None, None, {}, context=ctx) def resolve_indirect_inner(d): if isinstance(d, IndirectDict): r = {} for k, v in d.items(): if isinstance(v, MergeInputs): r[k] = v.resolve() else: r[k] = v[1][v[0]] return r else: return d def resolve_indirect(d): inner = IndirectDict() if isinstance(d, IndirectDict) else {} needEval = False for k, v in iteritems(d): if isinstance(v, StepValueFrom): inner[k] = v.inner needEval = True else: inner[k] = v res = resolve_indirect_inner(inner) if needEval: ev = {} for k, v in iteritems(d): if isinstance(v, StepValueFrom): ev[k] = v.do_eval(res, res[k]) else: ev[k] = res[k] return ev else: return res def getFile(fileStore, dir, fileTuple, index=None, export=False, primary=None, rename_collision=False, existing={}): """Extract input file from Toil jobstore. Uses standard filestore to retrieve file, then provides a symlink to it for running. If export is True (for final outputs), it gets copied to the final location. Keeps track of files being used locally with 'existing' """ # File literal outputs with no path, from writeFile if fileTuple is None: raise cwltool.process.UnsupportedRequirement("CWL expression file inputs not yet supported in Toil") fileStoreID, fileName = fileTuple if rename_collision is False: if primary: dir = os.path.dirname(primary) else: dir = tempfile.mkdtemp(dir=dir) dstPath = os.path.join(dir, fileName) if rename_collision: n = 1 while os.path.exists(dstPath): n += 1 stem, ext = os.path.splitext(dstPath) stem = "%s_%s" % (stem, n) dstPath = stem + ext if export: fileStore.exportFile(fileStoreID, "file://" + dstPath) else: srcPath = fileStore.readGlobalFile(fileStoreID) if srcPath != dstPath: if os.path.exists(dstPath): if index.get(dstPath, None) != fileStoreID: raise Exception("Conflicting filesStoreID %s and %s both trying to link to %s" % (index.get(dstPath, None), fileStoreID, dstPath)) else: os.symlink(srcPath, dstPath) existing[srcPath] = dstPath index[dstPath] = fileStoreID return dstPath def writeFile(writeFunc, index, existing, x): """Write output files back into Toil jobstore. 'existing' is a set of files retrieved as inputs from getFile. This ensures they are mapped back as the same name if passed through. """ # Toil fileStore references are tuples of pickle and internal file if isinstance(x, tuple): return x # File literal outputs with no path, we don't write these and will fail # with unsupportedRequirement when retrieving later with getFile elif x.startswith("_:"): return None else: if x not in index: x = existing.get(x, x) if not urlparse.urlparse(x).scheme: rp = os.path.realpath(x) else: rp = x try: index[x] = (writeFunc(rp), os.path.basename(x)) except Exception as e: cwllogger.error("Got exception '%s' while copying '%s'", e, x) raise return index[x] def computeFileChecksums(fs_access, f): # File literal inputs with no path, no checksum if isinstance(f, dict) and f.get("location", "").startswith("_:"): return f else: return compute_checksums(fs_access, f) def addFilePartRefs(p): """Provides new v1.0 functionality for referencing file parts. """ if p.get("class") == "File" and p.get("path"): dirname, basename = os.path.split(p["path"]) nameroot, nameext = os.path.splitext(basename) for k, v in [("dirname", dirname,), ("basename", basename), ("nameroot", nameroot), ("nameext", nameext)]: p[k] = v return p def locToPath(p): """Back compatibility -- handle converting locations into paths. """ if "path" not in p and "location" in p: p["path"] = p["location"].replace("file:", "") return p def pathToLoc(p): """Associate path with location. v1.0 should be specifying location but older YAML uses path -- this provides back compatibility. """ if "path" in p: p["location"] = p["path"] return p class ResolveIndirect(Job): def __init__(self, cwljob): super(ResolveIndirect, self).__init__() self.cwljob = cwljob def run(self, fileStore): return resolve_indirect(self.cwljob) class CWLJobWrapper(Job): def __init__(self, tool, cwljob, **kwargs): super(CWLJob, self).__init__(cores=1, memory=1, disk=1, unitName=unitName) self.tool = tool self.cwljob = cwljob self.kwargs = kwargs def run(self): if 'builder' in self.kwargs: builder = self.kwargs["builder"] else: builder = cwltool.builder.Builder() builder.job = {} builder.requirements = [] builder.outdir = None builder.tmpdir = None builder.timeout = 0 builder.resources = {} req = tool.evalResources(builder, {}) realjob = CWLJob(cores=req["cores"], memory=(req["ram"]*1024*1024), disk=((req["tmpdirSize"]*1024*1024) + (req["outdirSize"]*1024*1024)), unitName=unitName) self.addChild(realjob) return realjob.rv() class CWLJob(Job): """Execute a CWL tool wrapper.""" def __init__(self, tool, cwljob, **kwargs): if 'builder' in kwargs: builder = kwargs["builder"] else: builder = cwltool.builder.Builder() builder.job = {} builder.requirements = [] builder.outdir = None builder.tmpdir = None builder.timeout = 0 builder.resources = {} req = tool.evalResources(builder, {}) self.cwltool = remove_pickle_problems(tool) # pass the default of None if basecommand is empty unitName = self.cwltool.tool.get("baseCommand", None) if isinstance(unitName, (list, tuple)): unitName = ' '.join(unitName) super(CWLJob, self).__init__(cores=req["cores"], memory=(req["ram"]*1024*1024), disk=((req["tmpdirSize"]*1024*1024) + (req["outdirSize"]*1024*1024)), unitName=unitName) #super(CWLJob, self).__init__() self.cwljob = cwljob try: self.jobName = str(self.cwltool.tool['id']) except KeyError: # fall back to the Toil defined class name if the tool doesn't have an identifier pass self.executor_options = kwargs def run(self, fileStore): cwljob = resolve_indirect(self.cwljob) fillInDefaults(self.cwltool.tool["inputs"], cwljob) inpdir = os.path.join(fileStore.getLocalTempDir(), "inp") outdir = os.path.join(fileStore.getLocalTempDir(), "out") tmpdir = os.path.join(fileStore.getLocalTempDir(), "tmp") os.mkdir(inpdir) os.mkdir(outdir) os.mkdir(tmpdir) # Copy input files out of the global file store, ensure path/location synchronized index = {} existing = {} adjustFilesWithSecondary(cwljob, functools.partial(getFile, fileStore, inpdir, index=index, existing=existing)) cwltool.pathmapper.adjustFileObjs(cwljob, pathToLoc) cwltool.pathmapper.adjustFileObjs(cwljob, addFilePartRefs) # Run the tool opts = copy.deepcopy(self.executor_options) # Exports temporary directory for batch systems that reset TMPDIR os.environ["TMPDIR"] = os.path.realpath(opts.pop("tmpdir", None) or tmpdir) (output, status) = cwltool.main.single_job_executor(self.cwltool, cwljob, basedir=os.getcwd(), outdir=outdir, tmpdir=tmpdir, tmpdir_prefix="tmp", make_fs_access=cwltool.stdfsaccess.StdFsAccess, **opts) if status != "success": raise cwltool.errors.WorkflowException(status) cwltool.pathmapper.adjustDirObjs(output, locToPath) cwltool.pathmapper.adjustFileObjs(output, locToPath) cwltool.pathmapper.adjustFileObjs(output, functools.partial(computeFileChecksums, cwltool.stdfsaccess.StdFsAccess(outdir))) # Copy output files into the global file store. adjustFiles(output, functools.partial(writeFile, fileStore.writeGlobalFile, {}, existing)) return output def makeJob(tool, jobobj, **kwargs): if tool.tool["class"] == "Workflow": wfjob = CWLWorkflow(tool, jobobj, **kwargs) followOn = ResolveIndirect(wfjob.rv()) wfjob.addFollowOn(followOn) return (wfjob, followOn) else: # check if tool has ResourceRequirement and if the resource requirement # involves an expression. if there_is_an_expression: job = CWLJobWrapper(tool, jobobj, **kwargs) else: job = CWLJob(tool, jobobj, **kwargs) return (job, job) class CWLScatter(Job): def __init__(self, step, cwljob, **kwargs): super(CWLScatter, self).__init__() self.step = step self.cwljob = cwljob self.executor_options = kwargs def flat_crossproduct_scatter(self, joborder, scatter_keys, outputs, postScatterEval): scatter_key = shortname(scatter_keys[0]) l = len(joborder[scatter_key]) for n in xrange(0, l): jo = copy.copy(joborder) jo[scatter_key] = joborder[scatter_key][n] if len(scatter_keys) == 1: jo = postScatterEval(jo) (subjob, followOn) = makeJob(self.step.embedded_tool, jo, **self.executor_options) self.addChild(subjob) outputs.append(followOn.rv()) else: self.flat_crossproduct_scatter(jo, scatter_keys[1:], outputs, postScatterEval) def nested_crossproduct_scatter(self, joborder, scatter_keys, postScatterEval): scatter_key = shortname(scatter_keys[0]) l = len(joborder[scatter_key]) outputs = [] for n in xrange(0, l): jo = copy.copy(joborder) jo[scatter_key] = joborder[scatter_key][n] if len(scatter_keys) == 1: jo = postScatterEval(jo) (subjob, followOn) = makeJob(self.step.embedded_tool, jo, **self.executor_options) self.addChild(subjob) outputs.append(followOn.rv()) else: outputs.append(self.nested_crossproduct_scatter(jo, scatter_keys[1:], postScatterEval)) return outputs def run(self, fileStore): cwljob = resolve_indirect(self.cwljob) if isinstance(self.step.tool["scatter"], string_types): scatter = [self.step.tool["scatter"]] else: scatter = self.step.tool["scatter"] scatterMethod = self.step.tool.get("scatterMethod", None) if len(scatter) == 1: scatterMethod = "dotproduct" outputs = [] valueFrom = {shortname(i["id"]): i["valueFrom"] for i in self.step.tool["inputs"] if "valueFrom" in i} def postScatterEval(io): shortio = {shortname(k): v for k, v in iteritems(io)} def valueFromFunc(k, v): if k in valueFrom: return cwltool.expression.do_eval( valueFrom[k], shortio, self.step.requirements, None, None, {}, context=v) else: return v return {k: valueFromFunc(k, v) for k,v in io.items()} if scatterMethod == "dotproduct": for i in xrange(0, len(cwljob[shortname(scatter[0])])): copyjob = copy.copy(cwljob) for sc in [shortname(x) for x in scatter]: copyjob[sc] = cwljob[sc][i] copyjob = postScatterEval(copyjob) (subjob, followOn) = makeJob(self.step.embedded_tool, copyjob, **self.executor_options) self.addChild(subjob) outputs.append(followOn.rv()) elif scatterMethod == "nested_crossproduct": outputs = self.nested_crossproduct_scatter(cwljob, scatter, postScatterEval) elif scatterMethod == "flat_crossproduct": self.flat_crossproduct_scatter(cwljob, scatter, outputs, postScatterEval) else: if scatterMethod: raise validate.ValidationException( "Unsupported complex scatter type '%s'" % scatterMethod) else: raise validate.ValidationException( "Must provide scatterMethod to scatter over multiple inputs") return outputs class CWLGather(Job): def __init__(self, step, outputs): super(CWLGather, self).__init__() self.step = step self.outputs = outputs def allkeys(self, obj, keys): if isinstance(obj, dict): for k in obj.keys(): keys.add(k) elif isinstance(obj, list): for l in obj: self.allkeys(l, keys) def extract(self, obj, k): if isinstance(obj, dict): return obj.get(k) elif isinstance(obj, list): cp = [] for l in obj: cp.append(self.extract(l, k)) return cp def run(self, fileStore): outobj = {} keys = set() self.allkeys(self.outputs, keys) for k in keys: outobj[k] = self.extract(self.outputs, k) return outobj class SelfJob(object): """Fake job object to facilitate implementation of CWLWorkflow.run()""" def __init__(self, j, v): self.j = j self.v = v def rv(self): return self.v def addChild(self, c): return self.j.addChild(c) def hasChild(self, c): return self.j.hasChild(c) def remove_pickle_problems(obj): """doc_loader does not pickle correctly, causing Toil errors, remove from objects. """ if hasattr(obj, "doc_loader"): obj.doc_loader = None if hasattr(obj, "embedded_tool"): obj.embedded_tool = remove_pickle_problems(obj.embedded_tool) if hasattr(obj, "steps"): obj.steps = [remove_pickle_problems(s) for s in obj.steps] return obj class CWLWorkflow(Job): """Traverse a CWL workflow graph and schedule a Toil job graph.""" def __init__(self, cwlwf, cwljob, **kwargs): super(CWLWorkflow, self).__init__() self.cwlwf = cwlwf self.cwljob = cwljob self.executor_options = kwargs self.cwlwf = remove_pickle_problems(self.cwlwf) def run(self, fileStore): cwljob = resolve_indirect(self.cwljob) # `promises` dict # from: each parameter (workflow input or step output) # that may be used as a "source" for a step input workflow output # parameter # to: the job that will produce that value. promises = {} # `jobs` dict from step id to job that implements that step. jobs = {} for inp in self.cwlwf.tool["inputs"]: promises[inp["id"]] = SelfJob(self, cwljob) alloutputs_fufilled = False while not alloutputs_fufilled: # Iteratively go over the workflow steps, scheduling jobs as their # dependencies can be fufilled by upstream workflow inputs or # step outputs. Loop exits when the workflow outputs # are satisfied. alloutputs_fufilled = True for step in self.cwlwf.steps: if step.tool["id"] not in jobs: stepinputs_fufilled = True for inp in step.tool["inputs"]: if "source" in inp: for s in aslist(inp["source"]): if s not in promises: stepinputs_fufilled = False if stepinputs_fufilled: jobobj = {} for inp in step.tool["inputs"]: key = shortname(inp["id"]) if "source" in inp: if inp.get("linkMerge") or len(aslist(inp["source"])) > 1: linkMerge = inp.get("linkMerge", "merge_nested") if linkMerge == "merge_nested": jobobj[key] = ( MergeInputsNested([(shortname(s), promises[s].rv()) for s in aslist(inp["source"])])) elif linkMerge == "merge_flattened": jobobj[key] = ( MergeInputsFlattened([(shortname(s), promises[s].rv()) for s in aslist(inp["source"])])) else: raise validate.ValidationException( "Unsupported linkMerge '%s'", linkMerge) else: jobobj[key] = ( shortname(inp["source"]), promises[inp["source"]].rv()) elif "default" in inp: d = copy.copy(inp["default"]) jobobj[key] = ("default", {"default": d}) if "valueFrom" in inp and "scatter" not in step.tool: if key in jobobj: jobobj[key] = StepValueFrom(inp["valueFrom"], jobobj[key], self.cwlwf.requirements) else: jobobj[key] = StepValueFrom(inp["valueFrom"], ("None", {"None": None}), self.cwlwf.requirements) if "scatter" in step.tool: wfjob = CWLScatter(step, IndirectDict(jobobj), **self.executor_options) followOn = CWLGather(step, wfjob.rv()) wfjob.addFollowOn(followOn) else: (wfjob, followOn) = makeJob(step.embedded_tool, IndirectDict(jobobj), **self.executor_options) jobs[step.tool["id"]] = followOn connected = False for inp in step.tool["inputs"]: for s in aslist(inp.get("source", [])): if not promises[s].hasChild(wfjob): promises[s].addChild(wfjob) connected = True if not connected: # workflow step has default inputs only, isn't connected to other jobs, # so add it as child of workflow. self.addChild(wfjob) for out in step.tool["outputs"]: promises[out["id"]] = followOn for inp in step.tool["inputs"]: for s in aslist(inp.get("source", [])): if s not in promises: alloutputs_fufilled = False # may need a test for out in self.cwlwf.tool["outputs"]: if "source" in out: if out["source"] not in promises: alloutputs_fufilled = False outobj = {} for out in self.cwlwf.tool["outputs"]: outobj[shortname(out["id"])] = (shortname(out["outputSource"]), promises[out["outputSource"]].rv()) return IndirectDict(outobj) cwltool.process.supportedProcessRequirements = ("DockerRequirement", "ExpressionEngineRequirement", "InlineJavascriptRequirement", "InitialWorkDirRequirement", "SchemaDefRequirement", "EnvVarRequirement", "CreateFileRequirement", "SubworkflowFeatureRequirement", "ScatterFeatureRequirement", "ShellCommandRequirement", "MultipleInputFeatureRequirement", "StepInputExpressionRequirement", "ResourceRequirement") def unsupportedInputCheck(p): """Check for file inputs we don't current support in Toil: - Directories - File literals """ if p.get("class") == "Directory": raise cwltool.process.UnsupportedRequirement("CWL Directory inputs not yet supported in Toil") if p.get("contents") and (not p.get("path") and not p.get("location")): raise cwltool.process.UnsupportedRequirement("CWL File literals not yet supported in Toil") def unsupportedRequirementsCheck(requirements): """Check for specific requirement cases we don't support. """ for r in requirements: if r["class"] == "InitialWorkDirRequirement": for l in r.get("listing", []): if isinstance(l, dict) and l.get("writable"): raise cwltool.process.UnsupportedRequirement("CWL writable InitialWorkDirRequirement not yet supported in Toil") def unsupportedDefaultCheck(tool): """Check for file-based defaults, which don't get staged correctly in Toil. """ for inp in tool["in"]: if isinstance(inp, dict) and "default" in inp: if isinstance(inp["default"], dict) and inp["default"].get("class") == "File": raise cwltool.process.UnsupportedRequirement("CWL default file inputs not yet supported in Toil") def main(args=None, stdout=sys.stdout): parser = ArgumentParser() Job.Runner.addToilOptions(parser) parser.add_argument("cwltool", type=str) parser.add_argument("cwljob", type=str, nargs="?", default=None) # Will override the "jobStore" positional argument, enables # user to select jobStore or get a default from logic one below. parser.add_argument("--jobStore", type=str) parser.add_argument("--conformance-test", action="store_true") parser.add_argument("--not-strict", action="store_true") parser.add_argument("--no-container", action="store_true") parser.add_argument("--quiet", dest="logLevel", action="store_const", const="ERROR") parser.add_argument("--basedir", type=str) parser.add_argument("--outdir", type=str, default=os.getcwd()) parser.add_argument("--version", action='version', version=baseVersion) parser.add_argument("--preserve-environment", type=str, nargs='+', help="Preserve specified environment variables when running CommandLineTools", metavar=("VAR1 VAR2"), default=("PATH",), dest="preserve_environment") # mkdtemp actually creates the directory, but # toil requires that the directory not exist, # so make it and delete it and allow # toil to create it again (!) workdir = tempfile.mkdtemp() os.rmdir(workdir) if args is None: args = sys.argv[1:] options = parser.parse_args([workdir] + args) use_container = not options.no_container setLoggingFromOptions(options) if options.logLevel: cwllogger.setLevel(options.logLevel) useStrict = not options.not_strict try: t = cwltool.load_tool.load_tool(options.cwltool, cwltool.workflow.defaultMakeTool, resolver=cwltool.resolver.tool_resolver, strict=useStrict) unsupportedRequirementsCheck(t.requirements) except cwltool.process.UnsupportedRequirement as e: logging.error(e) return 33 if options.conformance_test: loader = schema_salad.ref_resolver.Loader({}) else: jobloaderctx = {"path": {"@type": "@id"}, "format": {"@type": "@id"}} jobloaderctx.update(t.metadata.get("$namespaces", {})) loader = schema_salad.ref_resolver.Loader(jobloaderctx) if options.cwljob: uri = (options.cwljob if urlparse.urlparse(options.cwljob).scheme else "file://" + os.path.abspath(options.cwljob)) job, _ = loader.resolve_ref(uri, checklinks=False) else: job = {} try: cwltool.pathmapper.adjustDirObjs(job, unsupportedInputCheck) cwltool.pathmapper.adjustFileObjs(job, unsupportedInputCheck) except cwltool.process.UnsupportedRequirement as e: logging.error(e) return 33 cwltool.pathmapper.adjustDirObjs(job, pathToLoc) cwltool.pathmapper.adjustFileObjs(job, pathToLoc) if type(t) == int: return t fillInDefaults(t.tool["inputs"], job) if options.conformance_test: adjustFiles(job, lambda x: x.replace("file://", "")) stdout.write(json.dumps( cwltool.main.single_job_executor(t, job, basedir=options.basedir, tmpdir_prefix="tmp", conformance_test=True, use_container=use_container, preserve_environment=options.preserve_environment), indent=4)) return 0 if not options.basedir: options.basedir = os.path.dirname(os.path.abspath(options.cwljob or options.cwltool)) outdir = options.outdir with Toil(options) as toil: def importDefault(tool): cwltool.pathmapper.adjustDirObjs(tool, locToPath) cwltool.pathmapper.adjustFileObjs(tool, locToPath) adjustFiles(tool, lambda x: "file://%s" % x if not urlparse.urlparse(x).scheme else x) adjustFiles(tool, functools.partial(writeFile, toil.importFile, {}, {})) t.visit(importDefault) if options.restart: outobj = toil.restart() else: basedir = os.path.dirname(os.path.abspath(options.cwljob or options.cwltool)) builder = t._init_job(job, basedir=basedir, use_container=use_container) (wf1, wf2) = makeJob(t, {}, use_container=use_container, preserve_environment=options.preserve_environment, tmpdir=os.path.realpath(outdir), builder=builder) try: if isinstance(wf1, CWLWorkflow): [unsupportedDefaultCheck(s.tool) for s in wf1.cwlwf.steps] except cwltool.process.UnsupportedRequirement as e: logging.error(e) return 33 cwltool.pathmapper.adjustDirObjs(builder.job, locToPath) cwltool.pathmapper.adjustFileObjs(builder.job, locToPath) adjustFiles(builder.job, lambda x: "file://%s" % os.path.abspath(os.path.join(basedir, x)) if not urlparse.urlparse(x).scheme else x) cwltool.pathmapper.adjustDirObjs(builder.job, pathToLoc) cwltool.pathmapper.adjustFileObjs(builder.job, pathToLoc) cwltool.pathmapper.adjustFileObjs(builder.job, addFilePartRefs) adjustFiles(builder.job, functools.partial(writeFile, toil.importFile, {}, {})) wf1.cwljob = builder.job outobj = toil.start(wf1) outobj = resolve_indirect(outobj) try: adjustFilesWithSecondary(outobj, functools.partial(getFile, toil, outdir, index={}, existing={}, export=True, rename_collision=True)) cwltool.pathmapper.adjustFileObjs(outobj, pathToLoc) except cwltool.process.UnsupportedRequirement as e: logging.error(e) return 33 stdout.write(json.dumps(outobj, indent=4)) return 0