Skip to content

Instantly share code, notes, and snippets.

@seanlindo
Created August 4, 2022 23:13
Show Gist options
  • Select an option

  • Save seanlindo/a096af16dc34df4f4e6f31be3c2c5bae to your computer and use it in GitHub Desktop.

Select an option

Save seanlindo/a096af16dc34df4f4e6f31be3c2c5bae to your computer and use it in GitHub Desktop.

Revisions

  1. seanlindo created this gist Aug 4, 2022.
    17 changes: 17 additions & 0 deletions example_io_manager.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,17 @@
    from dagster import IOManager, io_manager


    class MyIOManager(IOManager):
    def __init__(self):
    self.storage_dict = {}

    def handle_output(self, context, obj):
    self.storage_dict[(context.step_key, context.name)] = obj

    def load_input(self, context):
    return self.storage_dict[(context.upstream_output.step_key, context.upstream_output.name)]


    @io_manager
    def my_io_manager(_):
    return MyIOManager()
    25 changes: 25 additions & 0 deletions example_job.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,25 @@
    from dagster import asset, with_resources, repository, define_asset_job, SourceAsset
    from example_io_manager import my_io_manager

    raw_users = SourceAsset(key="raw_users", io_manager_key="test_io_manager")

    @asset(io_manager_key="test_io_manager")
    def upstream_asset(raw_users):
    #TODO do some transformations
    transformedUsers = raw_users

    return transformedUsers

    #TODO what if there are many transformation steps? does each one have to be an asset? can they just be separate ops that work on the asset?

    @repository
    def repo():
    return [
    *with_resources(
    [ raw_users, upstream_asset ],
    resource_defs={
    "test_io_manager": my_io_manager
    },
    ),
    define_asset_job("process_users"),
    ]