Created
August 31, 2011 12:11
-
-
Save ask/1183399 to your computer and use it in GitHub Desktop.
Revisions
-
ask revised this gist
Aug 31, 2011 . 1 changed file with 1 addition and 2 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -1,7 +1,6 @@ from collections import deque from celery.result import BaseAsyncResult, TaskSetResult from celery.task import chord, task, TaskSet def force_list(l): -
ask created this gist
Aug 31, 2011 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,50 @@ from collections import deque from celery.result import BaseAsyncResult, TaskSetResult from celery.task import TaskSet from celery.task import chord def force_list(l): if not isinstance(l, (list, tuple)): return [l] return l def traverse(start): stack = deque([start]) while stack: for subres in force_list(stack.popleft()): if isinstance(subres, TaskSetResult): stack.append(subres.join()) elif isinstance(subres, BaseAsyncResult): stack.append(subres.get()) else: yield subres @task def tA(): return tB.apply_async() @task def tB(): return TaskSet(tC.subtask((i, )) for i in xrange(30)).apply_async() @task def tC(i): return chord(tD.subtask((i, )) for i in xrange(i))(tS.subtask()) @task def tD(i): return i ** i @task def tS(numbers): return sum(numbers) def test(): for res in traverse(tA.apply_async()): print(res)