""" An example of a script that does CPU-bound work (checksum calculation) followed by IO-bound work (upload to server) in a performant manner. Inspiration: https://stackoverflow.com/questions/21159103/what-kind-of-problems-if-any-would-there-be-combining-asyncio-with-multiproces#29147750 """ import asyncio import datetime import hashlib import multiprocessing import random import time import typing from concurrent.futures import ProcessPoolExecutor # Logging colors green = "\033[92m" blue = "\033[96m" clear = "\033[0m" def log(filename, msg, color=clear): """ Mock logger (works in multiprocessing scenario) """ timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3] print( f"{timestamp:<24} {multiprocessing.current_process().name:<15} {filename:<8} {color}{msg}{clear}" ) def generate_checksum(filepath: str) -> str: """ Blocking checksum generation. Should run in its own process. """ log(filepath, "Starting checksum...", color=blue) delay = random.randint(1, 50) / 10 time.sleep(delay) # Pretend this is fast + expensive calculation log(filepath, f"Completed checksum after {delay} seconds.", color=blue) return hashlib.sha256(random.randbytes(100)).hexdigest() async def upload(filepath: str, checksum: str) -> None: """ Non-blocking upload. Should run as coroutine. """ async with max_concurrent_uploads: log(filepath, "Starting upload...", color=green) delay = random.randint(1, 10) await asyncio.sleep(delay) # Pretend this is slow upload log( filepath, f"Completed upload after {delay} seconds.", color=green, ) async def process_file(filepath: str): checksum = await loop.run_in_executor(pool, generate_checksum, filepath) await upload(filepath, checksum) return filepath async def main(filenames: typing.List[str]): start = time.time() for i, task in enumerate( asyncio.as_completed([process_file(filename) for filename in filenames]) ): # get the next result filepath = await task log(filepath, f"{int(i * 100 / len(filenames))}% done") print(f"Complete in {time.time() - start:.1f} seconds") if __name__ == "__main__": random.seed(2) filecount = 100 cpu_count = multiprocessing.cpu_count() max_concurrent_uploads = asyncio.Semaphore(cpu_count * 10) pool = ProcessPoolExecutor(max_workers=cpu_count) loop = asyncio.get_event_loop() loop.run_until_complete(main([f"{filename}.txt" for filename in range(filecount)]))