from functools import partial from queue import SimpleQueue def imap(exc, func, src, size=None, return_exc=False): if size is None: size = exc._max_workers if size < 1: raise ValueError("imap hang if size is less than one.") results = SimpleQueue() futures = {} def complete(slot, fut): try: result = slot, fut.result(), None except BaseException as err: result = slot, None, err results.put(result) def results_get(): slot, out, err = results.get() if err is not None: if return_exc: out = err else: raise err return slot, out try: src_iter = iter(src) free_slot = 0 while True: if free_slot < size: slot = free_slot free_slot += 1 else: slot, out = results_get() yield out try: data = next(src_iter) except StopIteration: futures.pop(slot, None) break else: futures[slot] = fut = exc.submit(func, data) fut.add_done_callback(partial(complete, slot)) while futures: slot, out = results_get() futures.pop(slot, None) yield out finally: for fut in futures.values(): fut.cancel() # Testing code from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor import os from time import sleep from threading import Lock def printl(*args, _lock=Lock()): with _lock: print(*args) def add_one(x): printl(f'run: {x} pid={os.getpid()}') if x % 2 == 0: sleep(1) return x + 1 def yields(n=25): for x in range(n): printl(f'get: {x} pid={os.getpid()}') yield x outs = set() with ThreadPoolExecutor(7) as tpe: for o in imap(tpe, add_one, yields(25), return_exc=True): printl(f'got: {o} pid={os.getpid()}') outs.add(o) printl(f"FIN: {sorted(outs) == list(range(1, 26))}")