dagster-io / dagster

An orchestration platform for the development, production, and observation of data assets.
https://dagster.io
Apache License 2.0
11.69k stars 1.48k forks source link

GCS IO manager connection error "AttributeError: 'NoneType' object has no attribute 'get_client'" #22109

Open nvjrepo opened 5 months ago

nvjrepo commented 5 months ago

Dagster version

1.7.7

What's the issue?

python verion: 3.11.4 I got below error when setting up GCS as I/O manager:

dagster._core.errors.DagsterExecutionHandleOutputError: Error occurred while handling output "result" of step "asset1":

  File "/Users/namxuan97/.pyenv/versions/3.11.4/lib/python3.11/site-packages/dagster/_core/execution/plan/execute_plan.py", line 282, in dagster_event_sequence_for_step
    for step_event in check.generator(step_events):
  File "/Users/namxuan97/.pyenv/versions/3.11.4/lib/python3.11/site-packages/dagster/_core/execution/plan/execute_step.py", line 532, in core_dagster_event_sequence_for_step
    for evt in _type_check_and_store_output(step_context, user_event):
  File "/Users/namxuan97/.pyenv/versions/3.11.4/lib/python3.11/site-packages/dagster/_core/execution/plan/execute_step.py", line 587, in _type_check_and_store_output
    for evt in _store_output(step_context, step_output_handle, output):
  File "/Users/namxuan97/.pyenv/versions/3.11.4/lib/python3.11/site-packages/dagster/_core/execution/plan/execute_step.py", line 810, in _store_output
    for elt in iterate_with_context(
  File "/Users/namxuan97/.pyenv/versions/3.11.4/lib/python3.11/site-packages/dagster/_utils/__init__.py", line 466, in iterate_with_context
    with context_fn():
  File "/Users/namxuan97/.pyenv/versions/3.11.4/lib/python3.11/contextlib.py", line 155, in __exit__
    self.gen.throw(typ, value, traceback)
  File "/Users/namxuan97/.pyenv/versions/3.11.4/lib/python3.11/site-packages/dagster/_core/execution/plan/utils.py", line 84, in op_execution_error_boundary
    raise error_cls(

The above exception was caused by the following exception:
AttributeError: 'NoneType' object has no attribute 'get_client'

  File "/Users/namxuan97/.pyenv/versions/3.11.4/lib/python3.11/site-packages/dagster/_core/execution/plan/utils.py", line 54, in op_execution_error_boundary
    yield
  File "/Users/namxuan97/.pyenv/versions/3.11.4/lib/python3.11/site-packages/dagster/_utils/__init__.py", line 468, in iterate_with_context
    next_output = next(iterator)
                  ^^^^^^^^^^^^^^
  File "/Users/namxuan97/.pyenv/versions/3.11.4/lib/python3.11/site-packages/dagster/_core/execution/plan/execute_step.py", line 800, in _gen_fn
    gen_output = output_manager.handle_output(output_context, output.value)
                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/namxuan97/.pyenv/versions/3.11.4/lib/python3.11/site-packages/dagster_gcp/gcs/io_manager.py", line 169, in handle_output
    self._internal_io_manager.handle_output(context, obj)
    ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/namxuan97/.pyenv/versions/3.11.4/lib/python3.11/site-packages/pydantic/main.py", line 789, in __getattr__
    return super().__getattribute__(item)  # Raises AttributeError if appropriate
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/namxuan97/.pyenv/versions/3.11.4/lib/python3.11/site-packages/dagster/_utils/cached_method.py", line 104, in _cached_method_wrapper
    result = method(self, *args, **kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/namxuan97/.pyenv/versions/3.11.4/lib/python3.11/site-packages/dagster_gcp/gcs/io_manager.py", line 162, in _internal_io_manager
    bucket=self.gcs_bucket, client=self.gcs.get_client(), prefix=self.gcs_prefix
                                   ^^^^^^^^^^^^^^^^^^^

Below is my script:

import os
from dagster import asset, Definitions
from dagster_gcp_pandas import BigQueryPandasIOManager
import pandas as pd
from dagster_gcp.gcs.io_manager import GCSPickleIOManager, GCSResource

@asset
def asset1(context):
    # Create a sample DataFrame
    df = pd.DataFrame({"a": range(10), "b": range(10, 20)})
    context.log.info("Asset1 created dataframe")
    return df

@asset
def asset2(context, asset1):
    # Process the DataFrame from asset1
    context.log.info("Asset2 received dataframe")
    return asset1[:5]

defs = Definitions(
    assets=[asset1, asset2],
    resources={
        "io_manager": GCSPickleIOManager(
            gcs_bucket="pacc-dagster-prod",
            gcs_prefix="test"
        ),
        "gcs": GCSResource(project="abc")
    }
)

Please help if you know where is the issue. Thank you

What did you expect to happen?

No response

How to reproduce?

No response

Deployment type

None

Deployment details

No response

Additional information

No response

Message from the maintainers

Impacted by this issue? Give it a 👍! We factor engagement into prioritization.

JasperHG90 commented 5 months ago

@nvjrepo I think that you have not set the GOOGLE_APPLICATION_CREDENTIALS environment variable, which is what the google-cloud-storage library is looking for to authenticate with GCS

nvjrepo commented 5 months ago

@JasperHG90 Thank you for replying, i did assign an SA path for var GOOGLE_APPLICATION_CREDENTIALS, but it still return the same error. Things work well with BigQueryPandasIOManager, just fail with GCS

JasperHG90 commented 5 months ago

Ugh, my bad. That error has nothing to do with your credentials.

slopp commented 5 months ago
Just an update, looking at the source i noticed the gcs= parameter, and i added the gcs resource in there gcs=GCSResource(project=EnvVar(DAGSTER_GCP_PROJECT_ID_KEY)), - this worked. this does not match the documentation: https://docs.dagster.io/_apidocs/libraries/dagster-gcp#gcs-i-o-manager
(I assume since it's supposed to automatically depend on the gcs resource?)

@jamiedemaria could you take a look at this and see if perhaps the resource dependency is not working as expected?

jamiedemaria commented 5 months ago

I think the docs might just be wrong here. For the newer resources, you need to pass resource dependencies as parameters to the top level resource. So like this:

defs = Definitions(
    assets=[asset1, asset2],
    resources={
        "io_manager": GCSPickleIOManager(
            gcs_bucket="pacc-dagster-prod",
            gcs_prefix="test",
            gcs=GCSResource(project="abc")
        ),
    }
)
jamiedemaria commented 5 months ago

I can fix the docs

Edit - PR to do so https://github.com/dagster-io/dagster/pull/22470