Skip to content

Instantly share code, notes, and snippets.

@tetron
Created July 23, 2017 12:28
Show Gist options
  • Select an option

  • Save tetron/accd49db26ba8e93dafd36ec631ae4e8 to your computer and use it in GitHub Desktop.

Select an option

Save tetron/accd49db26ba8e93dafd36ec631ae4e8 to your computer and use it in GitHub Desktop.

Revisions

  1. tetron created this gist Jul 23, 2017.
    811 changes: 811 additions & 0 deletions gistfile1.txt
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,811 @@
    # Implement support for Common Workflow Language (CWL) for Toil.
    #
    # Copyright (C) 2015 Curoverse, Inc
    # Copyright (C) 2016 UCSC Computational Genomics Lab
    #
    # Licensed under the Apache License, Version 2.0 (the "License");
    # you may not use this file except in compliance with the License.
    # You may obtain a copy of the License at
    #
    # http://www.apache.org/licenses/LICENSE-2.0
    #
    # Unless required by applicable law or agreed to in writing, software
    # distributed under the License is distributed on an "AS IS" BASIS,
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    # See the License for the specific language governing permissions and
    # limitations under the License.

    from toil.job import Job
    from toil.common import Toil
    from toil.version import baseVersion
    from toil.lib.bioio import setLoggingFromOptions

    from argparse import ArgumentParser
    import cwltool.errors
    import cwltool.load_tool
    import cwltool.main
    import cwltool.workflow
    import cwltool.expression
    import cwltool.builder
    import cwltool.resolver
    import cwltool.stdfsaccess
    from cwltool.pathmapper import adjustFiles
    from cwltool.process import shortname, adjustFilesWithSecondary, fillInDefaults, compute_checksums
    from cwltool.utils import aslist
    import schema_salad.validate as validate
    import schema_salad.ref_resolver
    import os
    import tempfile
    import json
    import sys
    import logging
    import copy
    import functools

    # Python 3 compatibility imports
    from six.moves import xrange
    from six import iteritems, string_types
    import six.moves.urllib.parse as urlparse

    cwllogger = logging.getLogger("cwltool")

    # The job object passed into CWLJob and CWLWorkflow
    # is a dict mapping to tuple of (key, dict)
    # the final dict is derived by evaluating each
    # tuple looking up the key in the supplied dict.
    #
    # This is necessary because Toil jobs return a single value (a dict)
    # but CWL permits steps to have multiple output parameters that may
    # feed into multiple other steps. This transformation maps the key in the
    # output object to the correct key of the input object.

    class IndirectDict(dict):
    pass

    class MergeInputs(object):
    def __init__(self, sources):
    self.sources = sources
    def resolve(self):
    raise NotImplementedError()

    class MergeInputsNested(MergeInputs):
    def resolve(self):
    return [v[1][v[0]] for v in self.sources]

    class MergeInputsFlattened(MergeInputs):
    def resolve(self):
    r = []
    for v in self.sources:
    v = v[1][v[0]]
    if isinstance(v, list):
    r.extend(v)
    else:
    r.append(v)
    return r

    class StepValueFrom(object):
    def __init__(self, expr, inner, req):
    self.expr = expr
    self.inner = inner
    self.req = req

    def do_eval(self, inputs, ctx):
    return cwltool.expression.do_eval(self.expr, inputs, self.req,
    None, None, {}, context=ctx)

    def resolve_indirect_inner(d):
    if isinstance(d, IndirectDict):
    r = {}
    for k, v in d.items():
    if isinstance(v, MergeInputs):
    r[k] = v.resolve()
    else:
    r[k] = v[1][v[0]]
    return r
    else:
    return d

    def resolve_indirect(d):
    inner = IndirectDict() if isinstance(d, IndirectDict) else {}
    needEval = False
    for k, v in iteritems(d):
    if isinstance(v, StepValueFrom):
    inner[k] = v.inner
    needEval = True
    else:
    inner[k] = v
    res = resolve_indirect_inner(inner)
    if needEval:
    ev = {}
    for k, v in iteritems(d):
    if isinstance(v, StepValueFrom):
    ev[k] = v.do_eval(res, res[k])
    else:
    ev[k] = res[k]
    return ev
    else:
    return res

    def getFile(fileStore, dir, fileTuple, index=None, export=False, primary=None, rename_collision=False,
    existing={}):
    """Extract input file from Toil jobstore.

    Uses standard filestore to retrieve file, then provides a symlink to it
    for running. If export is True (for final outputs), it gets copied to
    the final location.

    Keeps track of files being used locally with 'existing'
    """
    # File literal outputs with no path, from writeFile
    if fileTuple is None:
    raise cwltool.process.UnsupportedRequirement("CWL expression file inputs not yet supported in Toil")
    fileStoreID, fileName = fileTuple

    if rename_collision is False:
    if primary:
    dir = os.path.dirname(primary)
    else:
    dir = tempfile.mkdtemp(dir=dir)

    dstPath = os.path.join(dir, fileName)
    if rename_collision:
    n = 1
    while os.path.exists(dstPath):
    n += 1
    stem, ext = os.path.splitext(dstPath)
    stem = "%s_%s" % (stem, n)
    dstPath = stem + ext

    if export:
    fileStore.exportFile(fileStoreID, "file://" + dstPath)
    else:
    srcPath = fileStore.readGlobalFile(fileStoreID)
    if srcPath != dstPath:
    if os.path.exists(dstPath):
    if index.get(dstPath, None) != fileStoreID:
    raise Exception("Conflicting filesStoreID %s and %s both trying to link to %s" % (index.get(dstPath, None), fileStoreID, dstPath))
    else:
    os.symlink(srcPath, dstPath)
    existing[srcPath] = dstPath
    index[dstPath] = fileStoreID
    return dstPath

    def writeFile(writeFunc, index, existing, x):
    """Write output files back into Toil jobstore.

    'existing' is a set of files retrieved as inputs from getFile. This ensures
    they are mapped back as the same name if passed through.
    """
    # Toil fileStore references are tuples of pickle and internal file
    if isinstance(x, tuple):
    return x
    # File literal outputs with no path, we don't write these and will fail
    # with unsupportedRequirement when retrieving later with getFile
    elif x.startswith("_:"):
    return None
    else:
    if x not in index:
    x = existing.get(x, x)
    if not urlparse.urlparse(x).scheme:
    rp = os.path.realpath(x)
    else:
    rp = x
    try:
    index[x] = (writeFunc(rp), os.path.basename(x))
    except Exception as e:
    cwllogger.error("Got exception '%s' while copying '%s'", e, x)
    raise
    return index[x]

    def computeFileChecksums(fs_access, f):
    # File literal inputs with no path, no checksum
    if isinstance(f, dict) and f.get("location", "").startswith("_:"):
    return f
    else:
    return compute_checksums(fs_access, f)

    def addFilePartRefs(p):
    """Provides new v1.0 functionality for referencing file parts.
    """
    if p.get("class") == "File" and p.get("path"):
    dirname, basename = os.path.split(p["path"])
    nameroot, nameext = os.path.splitext(basename)
    for k, v in [("dirname", dirname,), ("basename", basename),
    ("nameroot", nameroot), ("nameext", nameext)]:
    p[k] = v
    return p

    def locToPath(p):
    """Back compatibility -- handle converting locations into paths.
    """
    if "path" not in p and "location" in p:
    p["path"] = p["location"].replace("file:", "")
    return p

    def pathToLoc(p):
    """Associate path with location.

    v1.0 should be specifying location but older YAML uses path
    -- this provides back compatibility.
    """
    if "path" in p:
    p["location"] = p["path"]
    return p

    class ResolveIndirect(Job):
    def __init__(self, cwljob):
    super(ResolveIndirect, self).__init__()
    self.cwljob = cwljob

    def run(self, fileStore):
    return resolve_indirect(self.cwljob)

    class CWLJobWrapper(Job):
    def __init__(self, tool, cwljob, **kwargs):
    super(CWLJob, self).__init__(cores=1,
    memory=1,
    disk=1,
    unitName=unitName)
    self.tool = tool
    self.cwljob = cwljob
    self.kwargs = kwargs

    def run(self):
    if 'builder' in self.kwargs:
    builder = self.kwargs["builder"]
    else:
    builder = cwltool.builder.Builder()
    builder.job = {}
    builder.requirements = []
    builder.outdir = None
    builder.tmpdir = None
    builder.timeout = 0
    builder.resources = {}
    req = tool.evalResources(builder, {})
    realjob = CWLJob(cores=req["cores"],
    memory=(req["ram"]*1024*1024),
    disk=((req["tmpdirSize"]*1024*1024) + (req["outdirSize"]*1024*1024)),
    unitName=unitName)
    self.addChild(realjob)
    return realjob.rv()


    class CWLJob(Job):
    """Execute a CWL tool wrapper."""

    def __init__(self, tool, cwljob, **kwargs):
    if 'builder' in kwargs:
    builder = kwargs["builder"]
    else:
    builder = cwltool.builder.Builder()
    builder.job = {}
    builder.requirements = []
    builder.outdir = None
    builder.tmpdir = None
    builder.timeout = 0
    builder.resources = {}
    req = tool.evalResources(builder, {})
    self.cwltool = remove_pickle_problems(tool)
    # pass the default of None if basecommand is empty
    unitName = self.cwltool.tool.get("baseCommand", None)
    if isinstance(unitName, (list, tuple)):
    unitName = ' '.join(unitName)
    super(CWLJob, self).__init__(cores=req["cores"],
    memory=(req["ram"]*1024*1024),
    disk=((req["tmpdirSize"]*1024*1024) + (req["outdirSize"]*1024*1024)),
    unitName=unitName)
    #super(CWLJob, self).__init__()
    self.cwljob = cwljob
    try:
    self.jobName = str(self.cwltool.tool['id'])
    except KeyError:
    # fall back to the Toil defined class name if the tool doesn't have an identifier
    pass
    self.executor_options = kwargs

    def run(self, fileStore):
    cwljob = resolve_indirect(self.cwljob)
    fillInDefaults(self.cwltool.tool["inputs"], cwljob)

    inpdir = os.path.join(fileStore.getLocalTempDir(), "inp")
    outdir = os.path.join(fileStore.getLocalTempDir(), "out")
    tmpdir = os.path.join(fileStore.getLocalTempDir(), "tmp")
    os.mkdir(inpdir)
    os.mkdir(outdir)
    os.mkdir(tmpdir)

    # Copy input files out of the global file store, ensure path/location synchronized
    index = {}
    existing = {}
    adjustFilesWithSecondary(cwljob, functools.partial(getFile, fileStore, inpdir, index=index,
    existing=existing))
    cwltool.pathmapper.adjustFileObjs(cwljob, pathToLoc)
    cwltool.pathmapper.adjustFileObjs(cwljob, addFilePartRefs)

    # Run the tool
    opts = copy.deepcopy(self.executor_options)
    # Exports temporary directory for batch systems that reset TMPDIR
    os.environ["TMPDIR"] = os.path.realpath(opts.pop("tmpdir", None) or tmpdir)
    (output, status) = cwltool.main.single_job_executor(self.cwltool, cwljob,
    basedir=os.getcwd(),
    outdir=outdir,
    tmpdir=tmpdir,
    tmpdir_prefix="tmp",
    make_fs_access=cwltool.stdfsaccess.StdFsAccess,
    **opts)
    if status != "success":
    raise cwltool.errors.WorkflowException(status)
    cwltool.pathmapper.adjustDirObjs(output, locToPath)
    cwltool.pathmapper.adjustFileObjs(output, locToPath)
    cwltool.pathmapper.adjustFileObjs(output, functools.partial(computeFileChecksums,
    cwltool.stdfsaccess.StdFsAccess(outdir)))
    # Copy output files into the global file store.
    adjustFiles(output, functools.partial(writeFile, fileStore.writeGlobalFile, {}, existing))
    return output


    def makeJob(tool, jobobj, **kwargs):
    if tool.tool["class"] == "Workflow":
    wfjob = CWLWorkflow(tool, jobobj, **kwargs)
    followOn = ResolveIndirect(wfjob.rv())
    wfjob.addFollowOn(followOn)
    return (wfjob, followOn)
    else:
    # check if tool has ResourceRequirement and if the resource requirement
    # involves an expression.
    if there_is_an_expression:
    job = CWLJobWrapper(tool, jobobj, **kwargs)
    else:
    job = CWLJob(tool, jobobj, **kwargs)
    return (job, job)


    class CWLScatter(Job):
    def __init__(self, step, cwljob, **kwargs):
    super(CWLScatter, self).__init__()
    self.step = step
    self.cwljob = cwljob
    self.executor_options = kwargs

    def flat_crossproduct_scatter(self, joborder, scatter_keys, outputs, postScatterEval):
    scatter_key = shortname(scatter_keys[0])
    l = len(joborder[scatter_key])
    for n in xrange(0, l):
    jo = copy.copy(joborder)
    jo[scatter_key] = joborder[scatter_key][n]
    if len(scatter_keys) == 1:
    jo = postScatterEval(jo)
    (subjob, followOn) = makeJob(self.step.embedded_tool, jo, **self.executor_options)
    self.addChild(subjob)
    outputs.append(followOn.rv())
    else:
    self.flat_crossproduct_scatter(jo, scatter_keys[1:], outputs, postScatterEval)

    def nested_crossproduct_scatter(self, joborder, scatter_keys, postScatterEval):
    scatter_key = shortname(scatter_keys[0])
    l = len(joborder[scatter_key])
    outputs = []
    for n in xrange(0, l):
    jo = copy.copy(joborder)
    jo[scatter_key] = joborder[scatter_key][n]
    if len(scatter_keys) == 1:
    jo = postScatterEval(jo)
    (subjob, followOn) = makeJob(self.step.embedded_tool, jo, **self.executor_options)
    self.addChild(subjob)
    outputs.append(followOn.rv())
    else:
    outputs.append(self.nested_crossproduct_scatter(jo, scatter_keys[1:], postScatterEval))
    return outputs

    def run(self, fileStore):
    cwljob = resolve_indirect(self.cwljob)

    if isinstance(self.step.tool["scatter"], string_types):
    scatter = [self.step.tool["scatter"]]
    else:
    scatter = self.step.tool["scatter"]

    scatterMethod = self.step.tool.get("scatterMethod", None)
    if len(scatter) == 1:
    scatterMethod = "dotproduct"
    outputs = []

    valueFrom = {shortname(i["id"]): i["valueFrom"] for i in self.step.tool["inputs"] if "valueFrom" in i}
    def postScatterEval(io):
    shortio = {shortname(k): v for k, v in iteritems(io)}
    def valueFromFunc(k, v):
    if k in valueFrom:
    return cwltool.expression.do_eval(
    valueFrom[k], shortio, self.step.requirements,
    None, None, {}, context=v)
    else:
    return v
    return {k: valueFromFunc(k, v) for k,v in io.items()}

    if scatterMethod == "dotproduct":
    for i in xrange(0, len(cwljob[shortname(scatter[0])])):
    copyjob = copy.copy(cwljob)
    for sc in [shortname(x) for x in scatter]:
    copyjob[sc] = cwljob[sc][i]
    copyjob = postScatterEval(copyjob)
    (subjob, followOn) = makeJob(self.step.embedded_tool, copyjob, **self.executor_options)
    self.addChild(subjob)
    outputs.append(followOn.rv())
    elif scatterMethod == "nested_crossproduct":
    outputs = self.nested_crossproduct_scatter(cwljob, scatter, postScatterEval)
    elif scatterMethod == "flat_crossproduct":
    self.flat_crossproduct_scatter(cwljob, scatter, outputs, postScatterEval)
    else:
    if scatterMethod:
    raise validate.ValidationException(
    "Unsupported complex scatter type '%s'" % scatterMethod)
    else:
    raise validate.ValidationException(
    "Must provide scatterMethod to scatter over multiple inputs")

    return outputs


    class CWLGather(Job):
    def __init__(self, step, outputs):
    super(CWLGather, self).__init__()
    self.step = step
    self.outputs = outputs

    def allkeys(self, obj, keys):
    if isinstance(obj, dict):
    for k in obj.keys():
    keys.add(k)
    elif isinstance(obj, list):
    for l in obj:
    self.allkeys(l, keys)

    def extract(self, obj, k):
    if isinstance(obj, dict):
    return obj.get(k)
    elif isinstance(obj, list):
    cp = []
    for l in obj:
    cp.append(self.extract(l, k))
    return cp

    def run(self, fileStore):
    outobj = {}
    keys = set()
    self.allkeys(self.outputs, keys)

    for k in keys:
    outobj[k] = self.extract(self.outputs, k)

    return outobj


    class SelfJob(object):
    """Fake job object to facilitate implementation of CWLWorkflow.run()"""

    def __init__(self, j, v):
    self.j = j
    self.v = v

    def rv(self):
    return self.v

    def addChild(self, c):
    return self.j.addChild(c)

    def hasChild(self, c):
    return self.j.hasChild(c)

    def remove_pickle_problems(obj):
    """doc_loader does not pickle correctly, causing Toil errors, remove from objects.
    """
    if hasattr(obj, "doc_loader"):
    obj.doc_loader = None
    if hasattr(obj, "embedded_tool"):
    obj.embedded_tool = remove_pickle_problems(obj.embedded_tool)
    if hasattr(obj, "steps"):
    obj.steps = [remove_pickle_problems(s) for s in obj.steps]
    return obj

    class CWLWorkflow(Job):
    """Traverse a CWL workflow graph and schedule a Toil job graph."""

    def __init__(self, cwlwf, cwljob, **kwargs):
    super(CWLWorkflow, self).__init__()
    self.cwlwf = cwlwf
    self.cwljob = cwljob
    self.executor_options = kwargs
    self.cwlwf = remove_pickle_problems(self.cwlwf)

    def run(self, fileStore):
    cwljob = resolve_indirect(self.cwljob)

    # `promises` dict
    # from: each parameter (workflow input or step output)
    # that may be used as a "source" for a step input workflow output
    # parameter
    # to: the job that will produce that value.
    promises = {}

    # `jobs` dict from step id to job that implements that step.
    jobs = {}

    for inp in self.cwlwf.tool["inputs"]:
    promises[inp["id"]] = SelfJob(self, cwljob)

    alloutputs_fufilled = False
    while not alloutputs_fufilled:
    # Iteratively go over the workflow steps, scheduling jobs as their
    # dependencies can be fufilled by upstream workflow inputs or
    # step outputs. Loop exits when the workflow outputs
    # are satisfied.

    alloutputs_fufilled = True

    for step in self.cwlwf.steps:
    if step.tool["id"] not in jobs:
    stepinputs_fufilled = True
    for inp in step.tool["inputs"]:
    if "source" in inp:
    for s in aslist(inp["source"]):
    if s not in promises:
    stepinputs_fufilled = False
    if stepinputs_fufilled:
    jobobj = {}

    for inp in step.tool["inputs"]:
    key = shortname(inp["id"])
    if "source" in inp:
    if inp.get("linkMerge") or len(aslist(inp["source"])) > 1:
    linkMerge = inp.get("linkMerge", "merge_nested")
    if linkMerge == "merge_nested":
    jobobj[key] = (
    MergeInputsNested([(shortname(s), promises[s].rv())
    for s in aslist(inp["source"])]))
    elif linkMerge == "merge_flattened":
    jobobj[key] = (
    MergeInputsFlattened([(shortname(s), promises[s].rv())
    for s in aslist(inp["source"])]))
    else:
    raise validate.ValidationException(
    "Unsupported linkMerge '%s'", linkMerge)
    else:
    jobobj[key] = (
    shortname(inp["source"]), promises[inp["source"]].rv())
    elif "default" in inp:
    d = copy.copy(inp["default"])
    jobobj[key] = ("default", {"default": d})

    if "valueFrom" in inp and "scatter" not in step.tool:
    if key in jobobj:
    jobobj[key] = StepValueFrom(inp["valueFrom"],
    jobobj[key],
    self.cwlwf.requirements)
    else:
    jobobj[key] = StepValueFrom(inp["valueFrom"],
    ("None", {"None": None}),
    self.cwlwf.requirements)

    if "scatter" in step.tool:
    wfjob = CWLScatter(step, IndirectDict(jobobj), **self.executor_options)
    followOn = CWLGather(step, wfjob.rv())
    wfjob.addFollowOn(followOn)
    else:
    (wfjob, followOn) = makeJob(step.embedded_tool, IndirectDict(jobobj),
    **self.executor_options)

    jobs[step.tool["id"]] = followOn

    connected = False
    for inp in step.tool["inputs"]:
    for s in aslist(inp.get("source", [])):
    if not promises[s].hasChild(wfjob):
    promises[s].addChild(wfjob)
    connected = True
    if not connected:
    # workflow step has default inputs only, isn't connected to other jobs,
    # so add it as child of workflow.
    self.addChild(wfjob)

    for out in step.tool["outputs"]:
    promises[out["id"]] = followOn

    for inp in step.tool["inputs"]:
    for s in aslist(inp.get("source", [])):
    if s not in promises:
    alloutputs_fufilled = False

    # may need a test
    for out in self.cwlwf.tool["outputs"]:
    if "source" in out:
    if out["source"] not in promises:
    alloutputs_fufilled = False

    outobj = {}
    for out in self.cwlwf.tool["outputs"]:
    outobj[shortname(out["id"])] = (shortname(out["outputSource"]), promises[out["outputSource"]].rv())

    return IndirectDict(outobj)


    cwltool.process.supportedProcessRequirements = ("DockerRequirement",
    "ExpressionEngineRequirement",
    "InlineJavascriptRequirement",
    "InitialWorkDirRequirement",
    "SchemaDefRequirement",
    "EnvVarRequirement",
    "CreateFileRequirement",
    "SubworkflowFeatureRequirement",
    "ScatterFeatureRequirement",
    "ShellCommandRequirement",
    "MultipleInputFeatureRequirement",
    "StepInputExpressionRequirement",
    "ResourceRequirement")

    def unsupportedInputCheck(p):
    """Check for file inputs we don't current support in Toil:

    - Directories
    - File literals
    """
    if p.get("class") == "Directory":
    raise cwltool.process.UnsupportedRequirement("CWL Directory inputs not yet supported in Toil")
    if p.get("contents") and (not p.get("path") and not p.get("location")):
    raise cwltool.process.UnsupportedRequirement("CWL File literals not yet supported in Toil")

    def unsupportedRequirementsCheck(requirements):
    """Check for specific requirement cases we don't support.
    """
    for r in requirements:
    if r["class"] == "InitialWorkDirRequirement":
    for l in r.get("listing", []):
    if isinstance(l, dict) and l.get("writable"):
    raise cwltool.process.UnsupportedRequirement("CWL writable InitialWorkDirRequirement not yet supported in Toil")

    def unsupportedDefaultCheck(tool):
    """Check for file-based defaults, which don't get staged correctly in Toil.
    """
    for inp in tool["in"]:
    if isinstance(inp, dict) and "default" in inp:
    if isinstance(inp["default"], dict) and inp["default"].get("class") == "File":
    raise cwltool.process.UnsupportedRequirement("CWL default file inputs not yet supported in Toil")

    def main(args=None, stdout=sys.stdout):
    parser = ArgumentParser()
    Job.Runner.addToilOptions(parser)
    parser.add_argument("cwltool", type=str)
    parser.add_argument("cwljob", type=str, nargs="?", default=None)

    # Will override the "jobStore" positional argument, enables
    # user to select jobStore or get a default from logic one below.
    parser.add_argument("--jobStore", type=str)
    parser.add_argument("--conformance-test", action="store_true")
    parser.add_argument("--not-strict", action="store_true")
    parser.add_argument("--no-container", action="store_true")
    parser.add_argument("--quiet", dest="logLevel", action="store_const", const="ERROR")
    parser.add_argument("--basedir", type=str)
    parser.add_argument("--outdir", type=str, default=os.getcwd())
    parser.add_argument("--version", action='version', version=baseVersion)
    parser.add_argument("--preserve-environment", type=str, nargs='+',
    help="Preserve specified environment variables when running CommandLineTools",
    metavar=("VAR1 VAR2"),
    default=("PATH",),
    dest="preserve_environment")

    # mkdtemp actually creates the directory, but
    # toil requires that the directory not exist,
    # so make it and delete it and allow
    # toil to create it again (!)
    workdir = tempfile.mkdtemp()
    os.rmdir(workdir)

    if args is None:
    args = sys.argv[1:]

    options = parser.parse_args([workdir] + args)

    use_container = not options.no_container

    setLoggingFromOptions(options)
    if options.logLevel:
    cwllogger.setLevel(options.logLevel)

    useStrict = not options.not_strict
    try:
    t = cwltool.load_tool.load_tool(options.cwltool, cwltool.workflow.defaultMakeTool,
    resolver=cwltool.resolver.tool_resolver, strict=useStrict)
    unsupportedRequirementsCheck(t.requirements)
    except cwltool.process.UnsupportedRequirement as e:
    logging.error(e)
    return 33

    if options.conformance_test:
    loader = schema_salad.ref_resolver.Loader({})
    else:
    jobloaderctx = {"path": {"@type": "@id"}, "format": {"@type": "@id"}}
    jobloaderctx.update(t.metadata.get("$namespaces", {}))
    loader = schema_salad.ref_resolver.Loader(jobloaderctx)

    if options.cwljob:
    uri = (options.cwljob if urlparse.urlparse(options.cwljob).scheme
    else "file://" + os.path.abspath(options.cwljob))
    job, _ = loader.resolve_ref(uri, checklinks=False)
    else:
    job = {}

    try:
    cwltool.pathmapper.adjustDirObjs(job, unsupportedInputCheck)
    cwltool.pathmapper.adjustFileObjs(job, unsupportedInputCheck)
    except cwltool.process.UnsupportedRequirement as e:
    logging.error(e)
    return 33

    cwltool.pathmapper.adjustDirObjs(job, pathToLoc)
    cwltool.pathmapper.adjustFileObjs(job, pathToLoc)

    if type(t) == int:
    return t

    fillInDefaults(t.tool["inputs"], job)

    if options.conformance_test:
    adjustFiles(job, lambda x: x.replace("file://", ""))
    stdout.write(json.dumps(
    cwltool.main.single_job_executor(t, job, basedir=options.basedir,
    tmpdir_prefix="tmp",
    conformance_test=True, use_container=use_container,
    preserve_environment=options.preserve_environment), indent=4))
    return 0

    if not options.basedir:
    options.basedir = os.path.dirname(os.path.abspath(options.cwljob or options.cwltool))

    outdir = options.outdir

    with Toil(options) as toil:
    def importDefault(tool):
    cwltool.pathmapper.adjustDirObjs(tool, locToPath)
    cwltool.pathmapper.adjustFileObjs(tool, locToPath)
    adjustFiles(tool, lambda x: "file://%s" % x if not urlparse.urlparse(x).scheme else x)
    adjustFiles(tool, functools.partial(writeFile, toil.importFile, {}, {}))
    t.visit(importDefault)

    if options.restart:
    outobj = toil.restart()
    else:
    basedir = os.path.dirname(os.path.abspath(options.cwljob or options.cwltool))
    builder = t._init_job(job, basedir=basedir, use_container=use_container)
    (wf1, wf2) = makeJob(t, {}, use_container=use_container,
    preserve_environment=options.preserve_environment,
    tmpdir=os.path.realpath(outdir), builder=builder)
    try:
    if isinstance(wf1, CWLWorkflow):
    [unsupportedDefaultCheck(s.tool) for s in wf1.cwlwf.steps]
    except cwltool.process.UnsupportedRequirement as e:
    logging.error(e)
    return 33

    cwltool.pathmapper.adjustDirObjs(builder.job, locToPath)
    cwltool.pathmapper.adjustFileObjs(builder.job, locToPath)
    adjustFiles(builder.job, lambda x: "file://%s" % os.path.abspath(os.path.join(basedir, x))
    if not urlparse.urlparse(x).scheme else x)
    cwltool.pathmapper.adjustDirObjs(builder.job, pathToLoc)
    cwltool.pathmapper.adjustFileObjs(builder.job, pathToLoc)
    cwltool.pathmapper.adjustFileObjs(builder.job, addFilePartRefs)
    adjustFiles(builder.job, functools.partial(writeFile, toil.importFile, {}, {}))
    wf1.cwljob = builder.job
    outobj = toil.start(wf1)

    outobj = resolve_indirect(outobj)

    try:
    adjustFilesWithSecondary(outobj, functools.partial(getFile, toil, outdir, index={}, existing={},
    export=True, rename_collision=True))
    cwltool.pathmapper.adjustFileObjs(outobj, pathToLoc)
    except cwltool.process.UnsupportedRequirement as e:
    logging.error(e)
    return 33

    stdout.write(json.dumps(outobj, indent=4))

    return 0