Skip to content

Instantly share code, notes, and snippets.

@ask
Created August 31, 2011 12:11
Show Gist options
  • Select an option

  • Save ask/1183399 to your computer and use it in GitHub Desktop.

Select an option

Save ask/1183399 to your computer and use it in GitHub Desktop.

Revisions

  1. ask revised this gist Aug 31, 2011. 1 changed file with 1 addition and 2 deletions.
    3 changes: 1 addition & 2 deletions traverse.py
    Original file line number Diff line number Diff line change
    @@ -1,7 +1,6 @@
    from collections import deque
    from celery.result import BaseAsyncResult, TaskSetResult
    from celery.task import TaskSet
    from celery.task import chord
    from celery.task import chord, task, TaskSet


    def force_list(l):
  2. ask created this gist Aug 31, 2011.
    50 changes: 50 additions & 0 deletions traverse.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,50 @@
    from collections import deque
    from celery.result import BaseAsyncResult, TaskSetResult
    from celery.task import TaskSet
    from celery.task import chord


    def force_list(l):
    if not isinstance(l, (list, tuple)):
    return [l]
    return l


    def traverse(start):
    stack = deque([start])
    while stack:
    for subres in force_list(stack.popleft()):
    if isinstance(subres, TaskSetResult):
    stack.append(subres.join())
    elif isinstance(subres, BaseAsyncResult):
    stack.append(subres.get())
    else:
    yield subres

    @task
    def tA():
    return tB.apply_async()

    @task
    def tB():
    return TaskSet(tC.subtask((i, )) for i in xrange(30)).apply_async()

    @task
    def tC(i):
    return chord(tD.subtask((i, )) for i in xrange(i))(tS.subtask())


    @task
    def tD(i):
    return i ** i


    @task
    def tS(numbers):
    return sum(numbers)



    def test():
    for res in traverse(tA.apply_async()):
    print(res)