Created
October 22, 2011 01:36
-
-
Save minrk/1305415 to your computer and use it in GitHub Desktop.
Revisions
-
minrk revised this gist
Oct 22, 2011 . 1 changed file with 5 additions and 3 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -22,7 +22,7 @@ def sleep_here(count, t): v = rc.load_balanced_view() amr = v.map(sleep_here, range(100), [ random.random() for i in range(100) ], chunksize=2) pending = set(amr.msg_ids) while pending: @@ -37,9 +37,11 @@ def sleep_here(count, t): pending = pending.difference(finished) for msg_id in finished: # we know these are done, so don't worry about blocking ar = rc.get_result(msg_id) print "job id %s finished on engine %i" % (msg_id, ar.engine_id) print "with stdout:" print ' ' + ar.stdout.replace('\n', '\n ').rstrip() print "and results:" # note that each job in a map always returns a list of length chunksize # even if chunksize == 1 -
minrk created this gist
Oct 22, 2011 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,47 @@ import time import random from IPython import parallel # create client & view rc = parallel.Client() dv = rc[:] # scatter 'id', so id=0,1,2 on engines 0,1,2 dv.scatter('id', rc.ids, flatten=True) print dv['id'] def sleep_here(count, t): """simple function that takes args, prints a short message, sleeps for a time, and returns the same args""" import time,sys print "hi from engine %i" % id sys.stdout.flush() time.sleep(t) return count,t v = rc.load_balanced_view() amr = v.map(sleep_here, range(100), [ random.random() for i in range(100) ]) pending = set(amr.msg_ids) while pending: try: rc.wait(pending, 1e-3) except parallel.TimeoutError: # ignore timeouterrors, since they only mean that at least one isn't done pass # finished is the set of msg_ids that are complete finished = pending.difference(rc.outstanding) # update pending to exclude those that just finished pending = pending.difference(finished) for msg_id in finished: # we know these are done, so don't worry about blocking ar = parallel.AsyncResult(rc, msg_id) print "job id %s finished on engine %i, with stdout:" % (msg_id, ar.engine_id) print ' ' + ar.stdout.replace('\n', '\n ').rstrip() # note that each job in a map always returns a list of length chunksize # even if chunksize == 1 for (count,t) in ar.result: print " item %i: slept for %.2fs" % (count, t)