import asyncio import random import time from concurrent.futures import ProcessPoolExecutor from multiprocessing.managers import BaseManager, NamespaceProxy class CustomManager(BaseManager): pass class ObjectProxy(NamespaceProxy): _exposed_ = ('__getattribute__', '__setattr__') class SomeClass: a = 5 b = 10 def fill_parameters(self, a=None, b=None): self.a = a or SomeClass.a self.b = b or SomeClass.b async def some_class_modificator(shared_some_class): while True: shared_some_class.a = random.randint(1, 100) shared_some_class.b = random.randint(1, 100) print(f'some_class_modificator: {shared_some_class.a=}, {shared_some_class.b=}') await asyncio.sleep(1) def async_context(shared_some_class): asyncio.run(some_class_modificator(shared_some_class)) def sync_context(shared_some_class): while True: print(f'sync_context: {shared_some_class.a=}, {shared_some_class.b=}') time.sleep(5) PROCESS_FUNCTIONS = ( async_context, sync_context ) def main(): CustomManager.register('SomeClass', SomeClass, ObjectProxy) with CustomManager() as manager: shared_some_class = manager.SomeClass() # noqa shared_some_class.fill_parameters(a=3, b=22) with ProcessPoolExecutor(max_workers=2) as executor: for func in PROCESS_FUNCTIONS: executor.submit(func, shared_some_class) if __name__ == '__main__': main()