Created
August 4, 2022 23:13
-
-
Save seanlindo/a096af16dc34df4f4e6f31be3c2c5bae to your computer and use it in GitHub Desktop.
Revisions
-
seanlindo created this gist
Aug 4, 2022 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,17 @@ from dagster import IOManager, io_manager class MyIOManager(IOManager): def __init__(self): self.storage_dict = {} def handle_output(self, context, obj): self.storage_dict[(context.step_key, context.name)] = obj def load_input(self, context): return self.storage_dict[(context.upstream_output.step_key, context.upstream_output.name)] @io_manager def my_io_manager(_): return MyIOManager() This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,25 @@ from dagster import asset, with_resources, repository, define_asset_job, SourceAsset from example_io_manager import my_io_manager raw_users = SourceAsset(key="raw_users", io_manager_key="test_io_manager") @asset(io_manager_key="test_io_manager") def upstream_asset(raw_users): #TODO do some transformations transformedUsers = raw_users return transformedUsers #TODO what if there are many transformation steps? does each one have to be an asset? can they just be separate ops that work on the asset? @repository def repo(): return [ *with_resources( [ raw_users, upstream_asset ], resource_defs={ "test_io_manager": my_io_manager }, ), define_asset_job("process_users"), ]