Skip to content

Instantly share code, notes, and snippets.

@saillinux
Created December 20, 2013 11:57
Show Gist options
  • Select an option

  • Save saillinux/8053779 to your computer and use it in GitHub Desktop.

Select an option

Save saillinux/8053779 to your computer and use it in GitHub Desktop.

Revisions

  1. saillinux created this gist Dec 20, 2013.
    198 changes: 198 additions & 0 deletions dag_scheduler.pl
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,198 @@
    use strict;
    use warnings;
    use Graph;
    use Data::Dumper;
    use JSON;
    use Capture::Tiny ':all';

    use constant {
    WAITING => 0,
    RUNNING => 1,
    DONE => 2,
    FAIL => 3,
    };

    my %nodes = (
    "Task1" => {
    "action" => "curl",
    "params" => ["http://query.yahooapis.com/v1/public/yql?q=select%20*%20from%20yahoo.finance.quote%20where%20symbol%20in%20(%22TWTR%22%2C%22FB%22%2C%22TSLA%22%2C%22XOM%22)&format=json&diagnostics=true&env=store%3A%2F%2Fdatatables.org%2Falltableswithkeys&callback="],
    "start_time" => 0,
    "end_time" => 0,
    "state" => WAITING,
    },
    "Task2" => {
    "action" => \&retrieve_stock,
    "params" => ["TWTR", "Change"],
    "start_time" => 0,
    "end_time" => 0,
    "state" => WAITING,
    },
    "Task3" => {
    "action" => \&retrieve_stock,
    "params" => ["FB", "Change"],
    "start_time" => 0,
    "end_time" => 0,
    "state" => WAITING,
    },
    "Task4" => {
    "action" => \&aggregator,
    "params" => [],
    "start_time" => 0,
    "end_time" => 0,
    "state" => WAITING,
    },
    );

    my %edges = (
    "Task1" => [ "Task2", "Task3" ],
    "Task2" => [ "Task4" ],
    "Task3" => [ "Task4" ],
    "Task4" => [ ],
    );


    my $g0 = Graph->new; # there is a song called zero g love in Macross

    # add each task to the graph as node
    foreach my $task (keys %nodes) {
    $g0->add_vertex($task);
    }

    # connect each task
    foreach my $task (keys %edges) {
    foreach my $dep (@{$edges{$task}}) {
    $g0->add_edge($task, $dep);
    }
    }

    print "INFO: The graph is $g0\n";

    validate($g0);

    scheduler($g0);

    # print Dumper(\%nodes);

    exit(0);


    sub validate {
    my $DAG = shift;

    unless ($DAG->is_dag) {
    print "FATAL: This graph is not a directed and acyclic graph so exiting...\n";
    exit;
    }

    if ($DAG->is_cyclic) {
    print "FATAL: This graph contains a cycle which forms a loop in execution.\n";
    exit;
    }

    my @heads = ();
    my @tasks = $DAG->vertices;
    foreach my $task ( @tasks ) {
    my $in_degree = $DAG->in_degree($task);
    unless ($in_degree) {
    push @heads, $task;
    }
    }

    if (@heads > 1) {
    print "FATAL: There is more than one execution start points\n";
    exit;
    }
    }

    sub scheduler {
    my $DAG = shift;

    my @ts = $DAG->topological_sort;

    foreach my $task ( @ts ) {
    if ( $DAG->in_degree($task) ) {
    print "INFO: check whether predecessors of [$task] were executed successfully\n";
    foreach my $predecessor ( $DAG->predecessors($task) ) {
    if ( $nodes{$predecessor}->{'state'} == FAIL ) {
    print "FATAL: The predecessor [$predecessor] of $task was failed so exiting...\n";
    exit;
    } elsif ( $nodes{$predecessor}->{'state'} == DONE ) {
    print "INFO: The predecessor [$predecessor] of $task ran successful so continuing...\n";
    } else {
    print "FATAL: something went wrong exiting...\n";
    exit;
    }
    }
    } else {
    print "INFO: $task is the head, starting this task now\n";
    }

    my $node = $nodes{$task};
    print "INFO: running task [$task]\n";

    $node->{'state'} = RUNNING;
    $node->{'start_time'} = time();

    my $action = $nodes{$task}->{'action'};
    my @params = @{$nodes{$task}->{'params'}};
    my @predecessors = $DAG->predecessors($task);

    if ( ref $action eq 'CODE' ) {
    $action->($task, {
    "preds" => \@predecessors,
    "params" => \@params,
    });
    } else {
    @$node{'stdout', 'stderr', 'exit'} = capture {
    system $action, @params;
    };
    }

    $node->{'end_time'} = time();

    unless ($node->{'exit'}) {
    $node->{'state'} = DONE;
    } else {
    $node->{'state'} = FAIL;
    }
    }
    };

    sub retrieve_stock {
    my ($self, $args) = @_;

    my $task = $args->{"preds"}[0];
    my ($stock, $field) = @{$args->{"params"}};

    my $json = decode_json($nodes{$task}->{'stdout'});
    my @quotes = @{$json->{'query'}{'results'}{'quote'}};

    foreach my $entry ( @quotes ) {
    if ($entry->{'symbol'} eq $stock) {
    $nodes{$self}->{'stdout'} = $entry->{$field};
    $nodes{$self}->{'exit'} = 0;
    }
    }

    unless (exists $nodes{$self}->{'stdout'}) {
    $nodes{$self}->{'exit'} = 1;
    }
    }

    sub aggregator {
    my ($self, $args) = @_;

    my %changes = ();
    foreach my $task (@{$args->{"preds"}}) {
    my $stock = $nodes{$task}->{'params'}[0];
    my $change = $nodes{$task}->{'stdout'};
    $changes{$stock} = $change;
    }

    my @sorted = sort { $changes{$b} <=> $changes{$a} } keys %changes;

    my $winner = $sorted[0];
    $nodes{$self}->{'stdout'} = $winner;

    print "OUTPUT: The winner is $winner by change $changes{$winner}\n";
    }