from collections import defaultdict, OrderedDict import luigi from luigi.task import flatten, getpaths def topological_sorting(struct, outnodes_funct, transform_funct): struct = flatten(struct.keys()) if isinstance(struct, dict) else flatten(struct) visited = OrderedDict() def dvisit(root): if root in visited.keys(): return outnodes = flatten(outnodes_funct(root)) for o in outnodes: dvisit(o) visited.update({root: transform_funct(root)}) for root in struct: dvisit(root) return OrderedDict(reversed(visited.items())) def to_dag(struct, outnodes_funct): inv_dag = defaultdict(list) def inv_visit_function(root): outnodes = flatten(outnodes_funct(root)) for o in outnodes: inv_dag[o].append(root) return outnodes dag = topological_sorting(struct, outnodes_funct, inv_visit_function) return dag, inv_dag def clear_task_output(task): for output in flatten(task.output()): # This works for LocalTargetOutput # Add here your per class notion of 'clear' if output.exists(): output.remove() def clear_task_dag_output(struct, dag): def outnodes_funct(root): return dag[root] for root in flatten(struct): topological_sorting(root, outnodes_funct, clear_task_output) def task_outnodes_funct(task): return flatten(task.requires()) class DAG(object): def __init__(self, lasttask): # lasttask(s) should be the last task to be executed (no task depends on it) self.struct = lasttask self._build() def _build(self): self.dag, self.inv_dag = to_dag(self.struct, task_outnodes_funct) def clean_backward(self, tasks): # Clean (recursively) all dependencies of tasks return self._clean(tasks, direction='backward') def clean_forward(self, tasks): # Clean (recursively) all tasks that depend on those return self._clean(tasks, direction='forward') def clean_all(self, tasks): return self._clean(tasks, direction='all') def _clean(self, tasks, direction=None): if direction in ['all', 'backward']: clear_task_dag_output(tasks, self.dag) if direction in ['all', 'forward']: clear_task_dag_output(tasks, self.inv_dag)