from dagster import IOManager, io_manager class MyIOManager(IOManager): def __init__(self): self.storage_dict = {} def handle_output(self, context, obj): self.storage_dict[(context.step_key, context.name)] = obj def load_input(self, context): return self.storage_dict[(context.upstream_output.step_key, context.upstream_output.name)] @io_manager def my_io_manager(_): return MyIOManager()