Skip to content

Instantly share code, notes, and snippets.

@minrk
Created October 22, 2011 01:36
Show Gist options
  • Select an option

  • Save minrk/1305415 to your computer and use it in GitHub Desktop.

Select an option

Save minrk/1305415 to your computer and use it in GitHub Desktop.

Revisions

  1. minrk revised this gist Oct 22, 2011. 1 changed file with 5 additions and 3 deletions.
    8 changes: 5 additions & 3 deletions customresults.py
    Original file line number Diff line number Diff line change
    @@ -22,7 +22,7 @@ def sleep_here(count, t):

    v = rc.load_balanced_view()

    amr = v.map(sleep_here, range(100), [ random.random() for i in range(100) ])
    amr = v.map(sleep_here, range(100), [ random.random() for i in range(100) ], chunksize=2)

    pending = set(amr.msg_ids)
    while pending:
    @@ -37,9 +37,11 @@ def sleep_here(count, t):
    pending = pending.difference(finished)
    for msg_id in finished:
    # we know these are done, so don't worry about blocking
    ar = parallel.AsyncResult(rc, msg_id)
    print "job id %s finished on engine %i, with stdout:" % (msg_id, ar.engine_id)
    ar = rc.get_result(msg_id)
    print "job id %s finished on engine %i" % (msg_id, ar.engine_id)
    print "with stdout:"
    print ' ' + ar.stdout.replace('\n', '\n ').rstrip()
    print "and results:"

    # note that each job in a map always returns a list of length chunksize
    # even if chunksize == 1
  2. minrk created this gist Oct 22, 2011.
    47 changes: 47 additions & 0 deletions customresults.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,47 @@
    import time
    import random

    from IPython import parallel

    # create client & view
    rc = parallel.Client()
    dv = rc[:]

    # scatter 'id', so id=0,1,2 on engines 0,1,2
    dv.scatter('id', rc.ids, flatten=True)
    print dv['id']


    def sleep_here(count, t):
    """simple function that takes args, prints a short message, sleeps for a time, and returns the same args"""
    import time,sys
    print "hi from engine %i" % id
    sys.stdout.flush()
    time.sleep(t)
    return count,t

    v = rc.load_balanced_view()

    amr = v.map(sleep_here, range(100), [ random.random() for i in range(100) ])

    pending = set(amr.msg_ids)
    while pending:
    try:
    rc.wait(pending, 1e-3)
    except parallel.TimeoutError:
    # ignore timeouterrors, since they only mean that at least one isn't done
    pass
    # finished is the set of msg_ids that are complete
    finished = pending.difference(rc.outstanding)
    # update pending to exclude those that just finished
    pending = pending.difference(finished)
    for msg_id in finished:
    # we know these are done, so don't worry about blocking
    ar = parallel.AsyncResult(rc, msg_id)
    print "job id %s finished on engine %i, with stdout:" % (msg_id, ar.engine_id)
    print ' ' + ar.stdout.replace('\n', '\n ').rstrip()

    # note that each job in a map always returns a list of length chunksize
    # even if chunksize == 1
    for (count,t) in ar.result:
    print " item %i: slept for %.2fs" % (count, t)