dagster-io / dagster

An orchestration platform for the development, production, and observation of data assets.
https://dagster.io
Apache License 2.0
11.12k stars 1.39k forks source link

Creating an IO Manager called `io_manager` does not overwrite the default one #19262

Open laisdeghaide opened 7 months ago

laisdeghaide commented 7 months ago

Dagster version

1.5.13

What's the issue?

Hi There, I tried to replace the default io_manager with s3_pickle_io_manager. I did exactly (or I think so) the same this is described in the docs

But I'm getting this error: "dagster._core.errors.DagsterInvalidDefinitionError: Conflicting versions of resource with key 'io_manager' were provided to different assets. When constructing a job, all resource definitions provided to assets must match by reference equality for a given key."

What did you expect to happen?

Be able to replace the default io_manager with s3_pickle_io_manager as described in the documentation as possible.

How to reproduce?

Create a resource called io_manager.

"io_manager": S3PickleIOManager(
        s3_resource=S3Resource(),
        s3_bucket="arado-datapond-raw",
        s3_prefix="dagster/storage",
    )

Provide this resource to your assets with:

defs = create_repository_using_definitions_args(
  name="dagarado",
  assets=all_assets,
  resources=resources_by_deployment_name[get_deployment_name()],
  jobs=all_jobs,
  executor=in_process_executor,
  schedules=all_schedules,
  sensors=all_sensors,
  asset_checks=all_checks,
)

Deployment type

Local

Deployment details

No response

Additional information

No response

Message from the maintainers

Impacted by this issue? Give it a 👍! We factor engagement into prioritization.

jamiedemaria commented 7 months ago

hey @laisdeghaide do you have any instances in your code where you're directly setting the resource_defs or io_manager_def parameters on the @asset decorator? those might be the source of the conflicts.

(copied this from my response to the other discussion, but moving here so that further discussion is contained to one issue)

dennisgera commented 7 months ago

Hello @jamiedemaria , we have included the definition for the io_manager key in all of our assets and jobs that use the resource_defs parameter, besides defining our io_manager key in our project definitions. However, the same error still appears.

Could this be the cause of another issue?

jamiedemaria commented 7 months ago

I'm not sure I fully understand. Could you share a code snippet of one of your assets and how you're setting the io manager on it?

laisdeghaide commented 7 months ago

Besides redefining the io_manager in the project definitions, we also included the io_manager in other assets/jobs where we directly set a resource_defs, example:

def build_io_manager() -> S3PickleIOManager:
    s3_kwargs = get_s3_env()
    s3_resource = (S3Resource(**s3_kwargs),) if s3_kwargs else (S3Resource(),)

    io_manager = S3PickleIOManager(
        s3_resource=s3_resource,
        s3_bucket="arado-datapond-raw",
        s3_prefix="dagster/storage",
    )

    return io_manager
@job(
    name="scrape_bhz_ceasa",
    description="Scrape reference prices from Ceasa BHZ",
    config=partitioned_config,
    resource_defs={"dbt": dbt_resource, "io_manager": build_io_manager()},
)
def scrape_bhz_ceasa():
    load_dbt(load_raw(transform(extract())))

But we still get the error of duplicate keys.

jamiedemaria commented 7 months ago

Ok I think that is where the issue is. Basically dagster has a constraint that the resources/io_managers attached to a specific key need to be the exact same object, not just the same type of object

I think if you did this

io_manager = build_io_manager()

@job(
    name="scrape_bhz_ceasa",
    description="Scrape reference prices from Ceasa BHZ",
    config=partitioned_config,
    resource_defs={"dbt": dbt_resource, "io_manager": io_manager},
)
def scrape_bhz_ceasa():
    load_dbt(load_raw(transform(extract())))

def resources_by_deployment_name(deployment_name):
     if deployment_name == "local":
         return {"io_manager": io_manager, ...}  # note that io_manager is the exact same object as created at the top of the snippet
    ...

defs = create_repository_using_definitions_args(
  name="dagarado",
  assets=all_assets,
  resources=resources_by_deployment_name[get_deployment_name()], 
  jobs=all_jobs,
  executor=in_process_executor,
  schedules=all_schedules,
  sensors=all_sensors,
  asset_checks=all_checks,
)

it should work

laisdeghaide commented 7 months ago

Hmmm, I also tried this way and the same error persists

jamiedemaria commented 7 months ago

ok let me try to replicate

jamiedemaria commented 7 months ago

Ok I was able to replicate! Is there a reason you need to set "io_manager" specifically on the job in addition to in the Definitions?

In this simple example I put together with assets:

class MyIOManager(IOManager):
    def handle_output(self, context, obj):
        context.log.info("STORING THE OBJECT IN A CUSTOM IO MANAGER")

    def load_input(self, context):
        return 1

io_mgr = MyIOManager()

@asset(
    resource_defs={"io_manager": io_mgr}
)
def my_asset():
    return 1

@asset
def other_asset(my_asset):
    return 1 + my_asset

defs = Definitions(
    assets=[my_asset, other_asset],
    resources={"io_manager": io_mgr}
)

I got the same error as you, but if i do this

@asset
def my_asset():
    return 1

@asset
def other_asset(my_asset):
    return 1 + my_asset

defs = Definitions(
    assets=[my_asset, other_asset],
    resources={"io_manager": MyIOManager()}
)

I get no error and the correct IO manager is used. Would you be able to just remove "io_manager": io_mgr" from the resource_defs on the job?

laisdeghaide commented 7 months ago

This was the first solution I tried, but weirdly I got the same error. It seems like my project has a hidden io_manager or something that it's conflicting with my new definition of io_manager, but I can't find it

jamiedemaria commented 7 months ago

That would also be my guess as to what's going on. Hopefully a bit more digging through your code reveals something!