Created
          August 31, 2011 12:11 
        
      - 
      
- 
        Save ask/1183399 to your computer and use it in GitHub Desktop. 
    traverse celery results
  
        
  
    
      This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
      Learn more about bidirectional Unicode characters
    
  
  
    
  | from collections import deque | |
| from celery.result import BaseAsyncResult, TaskSetResult | |
| from celery.task import chord, task, TaskSet | |
| def force_list(l): | |
| if not isinstance(l, (list, tuple)): | |
| return [l] | |
| return l | |
| def traverse(start): | |
| stack = deque([start]) | |
| while stack: | |
| for subres in force_list(stack.popleft()): | |
| if isinstance(subres, TaskSetResult): | |
| stack.append(subres.join()) | |
| elif isinstance(subres, BaseAsyncResult): | |
| stack.append(subres.get()) | |
| else: | |
| yield subres | |
| @task | |
| def tA(): | |
| return tB.apply_async() | |
| @task | |
| def tB(): | |
| return TaskSet(tC.subtask((i, )) for i in xrange(30)).apply_async() | |
| @task | |
| def tC(i): | |
| return chord(tD.subtask((i, )) for i in xrange(i))(tS.subtask()) | |
| @task | |
| def tD(i): | |
| return i ** i | |
| @task | |
| def tS(numbers): | |
| return sum(numbers) | |
| def test(): | |
| for res in traverse(tA.apply_async()): | |
| print(res) | 
  
    Sign up for free
    to join this conversation on GitHub.
    Already have an account?
    Sign in to comment