"""Subprocess IPC communication test Determine the throughput and bandwidth of standard Python subprocess interprocess communication. """ import os import sys import time import pstats import logging import cProfile import argparse import subprocess logging.basicConfig(format="%(name)s: %(message)s") parser = argparse.ArgumentParser() parser.add_argument("-o", "--output") parser.add_argument("-i", "--iterations", default=100, type=int) parser.add_argument("-b", "--bytes", default=1000, type=int) parser.add_argument("-s", "--stats", action="store_true", help="Print additional stats") parser.add_argument("-p", "--plot", action="store_true") parser.add_argument("-q", "--quiet", action="store_true") opt = parser.parse_args() if not os.getenv("CHILD"): # Parent Process # # This block boots up another Python interpreter # so as to send messages back and forth. # ______________ # | | # | | # | PARENT | # | | # |______________| # log = logging.getLogger("<-- PY") log.setLevel(logging.INFO) log.info("Booting child..") popen = subprocess.Popen( # Open yourself, but run the below block [sys.executable, __file__], env=dict(os.environ, **{"CHILD": "1"}), stdin=subprocess.PIPE, stdout=subprocess.PIPE, bufsize=-1, universal_newlines=True, ) # Wait for child to get ready.. response = popen.stdout.readline() assert response == "Ready\n", "Was: '%s'" % response # Data the size of opt.bytes data = "0".zfill(opt.bytes) bytes_ = sys.getsizeof(data) log.info("Created data {:,} bytes (len {})".format( bytes_, len(data) )) write_per_iteration = list() rtrip_per_iteration = list() # roundtrip child_wait_times = list() times = list() if opt.stats: profile = cProfile.Profile() profile.enable() tot0 = time.clock() for i in range(opt.iterations): # Write write0 = time.clock() popen.stdin.write(data) popen.stdin.write("%d\n" % i) popen.stdin.flush() write1 = time.clock() # Read read0 = time.clock() response = popen.stdout.readline() read1 = time.clock() write_per_iteration.append((write1 - write0) * 1000) rtrip_per_iteration.append((read1 - read0) * 1000) times.append(read1) index, wait = response.rsplit("-", 1) response = int(index) wait = float(wait) child_wait_times.append(wait) assert response == i, "Was %d, expected %d" % (response, i) if not opt.quiet and (opt.iterations > 1000 and i % 100 == 0): sys.stderr.write("\r%d/%d.." % (i, opt.iterations)) if not opt.quiet: sys.stderr.write("\r%d/%d..\n" % (opt.iterations, opt.iterations)) if opt.stats: profile.disable() stats = pstats.Stats(profile).sort_stats("time") stats.print_stats(20) tot1 = time.clock() totdur = tot1 - tot0 # s # Send kill signal popen.kill() popen.wait() def plot(): import pygal fname = os.path.join(os.getcwd(), "plot.svg") log.info("Plotting to %s.." % fname) assert rtrip_per_iteration assert write_per_iteration assert child_wait_times plot = pygal.Line(width=2000, height=500) plot.title = "Time per iteration" plot.add("Roundtrip (ms)", rtrip_per_iteration, show_dots=False) plot.add("Write to child (ms)", write_per_iteration, show_dots=False) plot.add("Child wait (ms)", child_wait_times, show_dots=False) # plot.x_labels = times plot.render_to_file(fname) if opt.plot: try: plot() except ImportError: log.info("Plotting skipped, could not find pygal") iterations = opt.iterations bpi = bytes_ * 2 totb = opt.iterations * bpi # in + out bps = totb / (totdur) avgtime = (sum(rtrip_per_iteration) / len(rtrip_per_iteration)) mintime = min(rtrip_per_iteration) maxtime = max(rtrip_per_iteration) deltime = maxtime - mintime results = ( ("iterations", opt.iterations), ("bpi", bytes_ * 2), ("bps", bps), ("avgtime", avgtime), ("mintime", mintime), ("maxtime", maxtime), ("deltime", deltime), ("totb", totb), ("totdur", totdur), ) if opt.output: import json with open(opt.output, "w") as f: log.info("Writing results to '%s'" % opt.output) json.dump(dict(results), f, indent=2, sort_keys=True) print("""\ Iterations: {iterations:,} Bytes/iteration: {bpi:,} b/i Bytes/second: {bps:,.0f} b/s Avarage roundtrip time: {avgtime:.3f} ms Min roundtrip time: {mintime:.3f} ms Max roundtrip time: {maxtime:.3f} ms Delta roundtrip time: {deltime:.3f} ms Total bytes: {totb:,} b Total time: {totdur:.3f} s """.format(**locals())) else: # Child Process # # This block represents the additional Python interpreter which # is receiving messages via sys.stdin from the above parent. # _____________ # | | # | | # | CHILD | # | | # |_____________| # log = logging.getLogger("--> MO") log.setLevel(logging.INFO) log.info("Ready for action..") sys.stdout.write("Ready\n") sys.stdout.flush() t0 = None while True: line = sys.stdin.readline().rstrip() # Wait time t1 = time.clock() duration = "-%.6f\n" % ((t1 - (t0 or t1)) * 1000) sys.stdout.write(line) sys.stdout.write(duration) sys.stdout.flush() t0 = time.clock() log.info("Dying..")