import threading from contextlib import contextmanager from logging import LogRecord, Logger from queue import Queue from typing import Optional, Generator @contextmanager def log_combiner_thread( *, source: Queue[Optional[LogRecord]], destination: Logger, ) -> Generator[threading.Thread, None, None]: """ Create a thread that consumes log messages from a multiprocessing queue and sends them to a logger. See https://docs.python.org/3/howto/logging-cookbook.html#logging-to-a-single-file-from-multiple-processes Args: source: The queue to consume from. destination: The logger to send messages to. """ thread = threading.Thread( target=consume_logging_queue, kwargs=dict( source=source, destination=destination, ), ) try: thread.start() yield thread finally: destination.debug("Waiting for multiprocess logging thread to stop...") source.put(None) # This tells the logging thread to stop. thread.join() # Wait for the logging thread to stop. destination.debug("Multiprocess logging thread stopped.") def consume_logging_queue( *, source: Queue[Optional[LogRecord]], destination: Logger, ) -> None: """ Consume log messages from a multiprocessing queue and send them to a logger. See https://docs.python.org/3/howto/logging-cookbook.html#logging-to-a-single-file-from-multiple-processes Args: source: The queue to consume from. destination: The logger to send messages to. """ while True: # This blocks until a record is available. record = source.get() # We send `None` as a sentinel to tell this function to stop. if record is None: break destination.handle(record)