dagster-io / dagster

An orchestration platform for the development, production, and observation of data assets.
https://dagster.io
Apache License 2.0
11.17k stars 1.4k forks source link

Cannot start backfills when gRPC server doesn't have access to Dagster database #12440

Open peay opened 1 year ago

peay commented 1 year ago

Dagster version

1.1.18

What's the issue?

We deploy a user code deployment as a container on kubernetes with the Helm chart. This container can't access the PostgreSQL database for Dagster for two reasons:

This was fine before since it didn't need any such access.

With 1.1.18, when we attempt to start a backfill, Dagster seems to invoke ExternalPartitionTags on the gRPC server of the user code deployment, which fails when trying to create a Dagster instance as it cannot connect to the database, and prevents the backfill from starting.

The user code deployment has the following exception chain when attempting to start a backfill from Dagit (here, with network policies lifted):

ERROR:grpc._server:Exception calling application: too many retries for DB connection
Traceback (most recent call last):
  File "/opt/pipelines/.venv/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 3361, in _wrap_pool_connect
    return fn()
  File "/opt/pipelines/.venv/lib/python3.10/site-packages/sqlalchemy/pool/base.py", line 327, in connect
    return _ConnectionFairy._checkout(self)
  File "/opt/pipelines/.venv/lib/python3.10/site-packages/sqlalchemy/pool/base.py", line 894, in _checkout
    fairy = _ConnectionRecord.checkout(pool)
  File "/opt/pipelines/.venv/lib/python3.10/site-packages/sqlalchemy/pool/base.py", line 493, in checkout
    rec = pool._do_get()
  File "/opt/pipelines/.venv/lib/python3.10/site-packages/sqlalchemy/pool/impl.py", line 256, in _do_get
    return self._create_connection()
  File "/opt/pipelines/.venv/lib/python3.10/site-packages/sqlalchemy/pool/base.py", line 273, in _create_connection
    return _ConnectionRecord(self)
  File "/opt/pipelines/.venv/lib/python3.10/site-packages/sqlalchemy/pool/base.py", line 388, in __init__
    self.__connect()
  File "/opt/pipelines/.venv/lib/python3.10/site-packages/sqlalchemy/pool/base.py", line 690, in __connect
    with util.safe_reraise():
  File "/opt/pipelines/.venv/lib/python3.10/site-packages/sqlalchemy/util/langhelpers.py", line 70, in __exit__
    compat.raise_(
  File "/opt/pipelines/.venv/lib/python3.10/site-packages/sqlalchemy/util/compat.py", line 211, in raise_
    raise exception
  File "/opt/pipelines/.venv/lib/python3.10/site-packages/sqlalchemy/pool/base.py", line 686, in __connect
    self.dbapi_connection = connection = pool._invoke_creator(self)
  File "/opt/pipelines/.venv/lib/python3.10/site-packages/sqlalchemy/engine/create.py", line 578, in connect
    return dialect.connect(*cargs, **cparams)
  File "/opt/pipelines/.venv/lib/python3.10/site-packages/sqlalchemy/engine/default.py", line 598, in connect
    return self.dbapi.connect(*cargs, **cparams)
  File "/opt/pipelines/.venv/lib/python3.10/site-packages/sqlalchemy_rdsiam/dbapi_psycopg2.py", line 60, in connect
    conn = _psycopg2_connect(**kwargs)
  File "/opt/pipelines/.venv/lib/python3.10/site-packages/psycopg2/__init__.py", line 122, in connect
    conn = _connect(dsn, connection_factory=connection_factory, **kwasync)
psycopg2.OperationalError: connection to server at "x" (y), port 5432 failed: FATAL:  PAM authentication failed for user "u"

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/opt/pipelines/.venv/lib/python3.10/site-packages/dagster_postgres/utils.py", line 104, in retry_pg_connection_fn
    return fn()
  File "/opt/pipelines/.venv/lib/python3.10/site-packages/dagster_postgres/run_storage/run_storage.py", line 76, in <lambda>
    table_names = retry_pg_connection_fn(lambda: db.inspect(self._engine).get_table_names())
  File "/opt/pipelines/.venv/lib/python3.10/site-packages/sqlalchemy/inspection.py", line 64, in inspect
    ret = reg(subject)
  File "/opt/pipelines/.venv/lib/python3.10/site-packages/sqlalchemy/engine/reflection.py", line 182, in _engine_insp
    return Inspector._construct(Inspector._init_engine, bind)
  File "/opt/pipelines/.venv/lib/python3.10/site-packages/sqlalchemy/engine/reflection.py", line 117, in _construct
    init(self, bind)
  File "/opt/pipelines/.venv/lib/python3.10/site-packages/sqlalchemy/engine/reflection.py", line 128, in _init_engine
    engine.connect().close()
  File "/opt/pipelines/.venv/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 3315, in connect
    return self._connection_cls(self, close_with_result=close_with_result)
  File "/opt/pipelines/.venv/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 96, in __init__
    else engine.raw_connection()
  File "/opt/pipelines/.venv/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 3394, in raw_connection
    return self._wrap_pool_connect(self.pool.connect, _connection)
  File "/opt/pipelines/.venv/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 3364, in _wrap_pool_connect
    Connection._handle_dbapi_exception_noconnection(
  File "/opt/pipelines/.venv/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 2198, in _handle_dbapi_exception_noconnection
    util.raise_(
  File "/opt/pipelines/.venv/lib/python3.10/site-packages/sqlalchemy/util/compat.py", line 211, in raise_
    raise exception
  File "/opt/pipelines/.venv/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 3361, in _wrap_pool_connect
    return fn()
  File "/opt/pipelines/.venv/lib/python3.10/site-packages/sqlalchemy/pool/base.py", line 327, in connect
    return _ConnectionFairy._checkout(self)
  File "/opt/pipelines/.venv/lib/python3.10/site-packages/sqlalchemy/pool/base.py", line 894, in _checkout
    fairy = _ConnectionRecord.checkout(pool)
  File "/opt/pipelines/.venv/lib/python3.10/site-packages/sqlalchemy/pool/base.py", line 493, in checkout
    rec = pool._do_get()
  File "/opt/pipelines/.venv/lib/python3.10/site-packages/sqlalchemy/pool/impl.py", line 256, in _do_get
    return self._create_connection()
  File "/opt/pipelines/.venv/lib/python3.10/site-packages/sqlalchemy/pool/base.py", line 273, in _create_connection
    return _ConnectionRecord(self)
  File "/opt/pipelines/.venv/lib/python3.10/site-packages/sqlalchemy/pool/base.py", line 388, in __init__
    self.__connect()
  File "/opt/pipelines/.venv/lib/python3.10/site-packages/sqlalchemy/pool/base.py", line 690, in __connect
    with util.safe_reraise():
  File "/opt/pipelines/.venv/lib/python3.10/site-packages/sqlalchemy/util/langhelpers.py", line 70, in __exit__
    compat.raise_(
  File "/opt/pipelines/.venv/lib/python3.10/site-packages/sqlalchemy/util/compat.py", line 211, in raise_
    raise exception
  File "/opt/pipelines/.venv/lib/python3.10/site-packages/sqlalchemy/pool/base.py", line 686, in __connect
    self.dbapi_connection = connection = pool._invoke_creator(self)
  File "/opt/pipelines/.venv/lib/python3.10/site-packages/sqlalchemy/engine/create.py", line 578, in connect
    return dialect.connect(*cargs, **cparams)
  File "/opt/pipelines/.venv/lib/python3.10/site-packages/sqlalchemy/engine/default.py", line 598, in connect
    return self.dbapi.connect(*cargs, **cparams)
  File "/opt/pipelines/.venv/lib/python3.10/site-packages/sqlalchemy_rdsiam/dbapi_psycopg2.py", line 60, in connect
    conn = _psycopg2_connect(**kwargs)
  File "/opt/pipelines/.venv/lib/python3.10/site-packages/psycopg2/__init__.py", line 122, in connect
    conn = _connect(dsn, connection_factory=connection_factory, **kwasync)
sqlalchemy.exc.OperationalError: (psycopg2.OperationalError) connection to server at "x" (y), port 5432 failed: FATAL:  PAM authentication failed for user "u"

(Background on this error at: https://sqlalche.me/e/14/e3q8)

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/opt/pipelines/.venv/lib/python3.10/site-packages/grpc/_server.py", line 443, in _call_behavior
    response_or_iterator = behavior(argument, context)
  File "/opt/pipelines/.venv/lib/python3.10/site-packages/dagster/_grpc/server.py", line 490, in ExternalPartitionTags
    with DagsterInstance.from_ref(instance_ref) if instance_ref else nullcontext() as instance:
  File "/opt/pipelines/.venv/lib/python3.10/site-packages/dagster/_core/instance/__init__.py", line 500, in from_ref
    unified_storage = instance_ref.storage
  File "/opt/pipelines/.venv/lib/python3.10/site-packages/dagster/_core/instance/ref.py", line 463, in storage
    return self.storage_data.rehydrate() if self.storage_data else None
  File "/opt/pipelines/.venv/lib/python3.10/site-packages/dagster/_serdes/config_class.py", line 101, in rehydrate
    return klass.from_config_value(self, check.not_none(result.value))
  File "/opt/pipelines/.venv/lib/python3.10/site-packages/dagster/_core/storage/legacy_storage.py", line 114, in from_config_value
    ).rehydrate(),
  File "/opt/pipelines/.venv/lib/python3.10/site-packages/dagster/_serdes/config_class.py", line 101, in rehydrate
    return klass.from_config_value(self, check.not_none(result.value))
  File "/opt/pipelines/.venv/lib/python3.10/site-packages/dagster_postgres/run_storage/run_storage.py", line 123, in from_config_value
    return PostgresRunStorage(
  File "/opt/pipelines/.venv/lib/python3.10/site-packages/dagster_postgres/run_storage/run_storage.py", line 76, in __init__
    table_names = retry_pg_connection_fn(lambda: db.inspect(self._engine).get_table_names())
  File "/opt/pipelines/.venv/lib/python3.10/site-packages/dagster_postgres/utils.py", line 116, in retry_pg_connection_fn
    raise DagsterPostgresException("too many retries for DB connection") from exc
dagster_postgres.utils.DagsterPostgresException: too many retries for DB connection

Single runs triggered from Dagit or schedules still work fine.

What did you expect to happen?

How to reproduce?

Add a NetworkPolicy covering a user code deployment that blocks egress traffic

Deployment type

Dagster Helm chart

Deployment details

No response

Additional information

See https://dagster.slack.com/archives/C01U954MEER/p1676652897955709

cc @clairelin135

Message from the maintainers

Impacted by this issue? Give it a 👍! We factor engagement into prioritization.

peay commented 1 year ago

Currently, an instance is always created, and passed to:

def get_partition_tags(
    repo_def: RepositoryDefinition,
    partition_set_name: str,
    partition_name: str,
    instance: Optional[DagsterInstance] = None,
):
    partition_set_def = repo_def.get_partition_set_def(partition_set_name)

    partition = partition_set_def.get_partition(partition_name, dynamic_partitions_store=instance)
    try:
        with user_code_error_boundary(
            PartitionExecutionError,
            lambda: f"Error occurred during the evaluation of the `tags_for_partition` function for {_get_target_for_partition_execution_error(partition_set_def)}",
        ):
            tags = partition_set_def.tags_for_partition(partition)
            return ExternalPartitionTagsData(name=partition.name, tags=tags)
    except Exception:
        return ExternalPartitionExecutionErrorData(
            serializable_error_info_from_exc_info(sys.exc_info())
        )

Maybe partition_set_def = repo_def.get_partition_set_def(partition_set_name) could be taken out of there, and checked to determine whether the partitions are dynamic? Only then would a DagsterInstance be created.

clairelin135 commented 1 year ago

Hi @peay. Thanks for sharing this--we'll try and resolve this. For now, I'd keep your dagster version on 1.1.7 (or the version that the backfills worked for you)

sryza commented 1 year ago

@clairelin135 how difficult should this be to address?

@gibsondan might be curious about this, because it relates to a discussion we were having about whether we can expect that code servers can access the instance

clairelin135 commented 1 year ago

I think it doesn't sound too difficult to address. We could just avoid creating an instance unless if the partition set's definition is a dynamic partitions definition. Or have some sort of configuration when set, avoids creating the instance.

I'm more curious how common deployments like this are--because if future features depend on code servers accessing the instance, perhaps we should just try to make the instance accessible.

peay commented 1 year ago

because if future features depend on code servers accessing the instance, perhaps we should just try to make the instance accessible.

In my case, that would certainly be doable if that's a long term requirement -- although it feels a bit odd to have this nice and clean gRPC user code server abstraction, and then having that server to connect to the main Dagster instance directly

peay commented 1 year ago

Thanks @clairelin135! I've upgraded but I am seeing a similar error for schedules now - it seems that getting schedule definitions hits CachingDynamicPartitionsLoader somehow as well, can it also be avoided in that code path?

dagster._core.errors.ScheduleExecutionError: Error occurred during the execution function for schedule hourly_schedule
  File "/opt/pipelines/.venv/lib/python3.10/site-packages/dagster/_grpc/impl.py", line 323, in get_external_schedule_execution
    with user_code_error_boundary(
  File "/usr/local/lib/python3.10/contextlib.py", line 153, in __exit__
    self.gen.throw(typ, value, traceback)
  File "/opt/pipelines/.venv/lib/python3.10/site-packages/dagster/_core/errors.py", line 211, in user_code_error_boundary
    raise error_cls(

The above exception was caused by the following exception:
dagster_postgres.utils.DagsterPostgresException: too many retries for DB connection
  File "/opt/pipelines/.venv/lib/python3.10/site-packages/dagster/_core/errors.py", line 204, in user_code_error_boundary
    yield
  File "/opt/pipelines/.venv/lib/python3.10/site-packages/dagster/_grpc/impl.py", line 329, in get_external_schedule_execution
    return schedule_def.evaluate_tick(schedule_context)
  File "/opt/pipelines/.venv/lib/python3.10/site-packages/dagster/_core/definitions/schedule_definition.py", line 767, in evaluate_tick
    CachingDynamicPartitionsLoader(context.instance) if context.instance_ref else None
  File "/opt/pipelines/.venv/lib/python3.10/site-packages/dagster/_core/definitions/schedule_definition.py", line 252, in instance
    DagsterInstance.from_ref(self._instance_ref)
  File "/opt/pipelines/.venv/lib/python3.10/site-packages/dagster/_core/instance/__init__.py", line 547, in from_ref
    unified_storage = instance_ref.storage
  File "/opt/pipelines/.venv/lib/python3.10/site-packages/dagster/_core/instance/ref.py", line 495, in storage
    return self.storage_data.rehydrate(as_type=DagsterStorage) if self.storage_data else None
  File "/opt/pipelines/.venv/lib/python3.10/site-packages/dagster/_serdes/config_class.py", line 109, in rehydrate
    return klass.from_config_value(self, check.not_none(result.value))
  File "/opt/pipelines/.venv/lib/python3.10/site-packages/dagster/_core/storage/legacy_storage.py", line 112, in from_config_value
    run_storage = ConfigurableClassData(
  File "/opt/pipelines/.venv/lib/python3.10/site-packages/dagster/_serdes/config_class.py", line 109, in rehydrate
    return klass.from_config_value(self, check.not_none(result.value))
  File "/opt/pipelines/.venv/lib/python3.10/site-packages/dagster_postgres/run_storage/run_storage.py", line 135, in from_config_value
    return PostgresRunStorage(
  File "/opt/pipelines/.venv/lib/python3.10/site-packages/dagster_postgres/run_storage/run_storage.py", line 90, in __init__
    table_names = retry_pg_connection_fn(lambda: db.inspect(self._engine).get_table_names())
  File "/opt/pipelines/.venv/lib/python3.10/site-packages/dagster_postgres/utils.py", line 129, in retry_pg_connection_fn
    raise DagsterPostgresException("too many retries for DB connection") from exc

The above exception was caused by the following exception:
sqlalchemy.exc.OperationalError: (psycopg2.OperationalError) connection to server at "dbhost" (10.10.10.10), port 5432 failed: FATAL:  PAM authentication failed for user "dagster"

(Background on this error at: https://sqlalche.me/e/14/e3q8)
  File "/opt/pipelines/.venv/lib/python3.10/site-packages/dagster_postgres/utils.py", line 117, in retry_pg_connection_fn
    return fn()
  File "/opt/pipelines/.venv/lib/python3.10/site-packages/dagster_postgres/run_storage/run_storage.py", line 90, in <lambda>
    table_names = retry_pg_connection_fn(lambda: db.inspect(self._engine).get_table_names())
  File "/opt/pipelines/.venv/lib/python3.10/site-packages/sqlalchemy/inspection.py", line 64, in inspect
    ret = reg(subject)
  File "/opt/pipelines/.venv/lib/python3.10/site-packages/sqlalchemy/engine/reflection.py", line 182, in _engine_insp
    return Inspector._construct(Inspector._init_engine, bind)
  File "/opt/pipelines/.venv/lib/python3.10/site-packages/sqlalchemy/engine/reflection.py", line 117, in _construct
    init(self, bind)
  File "/opt/pipelines/.venv/lib/python3.10/site-packages/sqlalchemy/engine/reflection.py", line 128, in _init_engine
    engine.connect().close()
  File "/opt/pipelines/.venv/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 3315, in connect
    return self._connection_cls(self, close_with_result=close_with_result)
  File "/opt/pipelines/.venv/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 96, in __init__
    else engine.raw_connection()
  File "/opt/pipelines/.venv/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 3394, in raw_connection
    return self._wrap_pool_connect(self.pool.connect, _connection)
  File "/opt/pipelines/.venv/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 3364, in _wrap_pool_connect
    Connection._handle_dbapi_exception_noconnection(
  File "/opt/pipelines/.venv/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 2198, in _handle_dbapi_exception_noconnection
    util.raise_(
  File "/opt/pipelines/.venv/lib/python3.10/site-packages/sqlalchemy/util/compat.py", line 211, in raise_
    raise exception
  File "/opt/pipelines/.venv/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 3361, in _wrap_pool_connect
    return fn()
  File "/opt/pipelines/.venv/lib/python3.10/site-packages/sqlalchemy/pool/base.py", line 327, in connect
    return _ConnectionFairy._checkout(self)
  File "/opt/pipelines/.venv/lib/python3.10/site-packages/sqlalchemy/pool/base.py", line 894, in _checkout
    fairy = _ConnectionRecord.checkout(pool)
  File "/opt/pipelines/.venv/lib/python3.10/site-packages/sqlalchemy/pool/base.py", line 493, in checkout
    rec = pool._do_get()
  File "/opt/pipelines/.venv/lib/python3.10/site-packages/sqlalchemy/pool/impl.py", line 256, in _do_get
    return self._create_connection()
  File "/opt/pipelines/.venv/lib/python3.10/site-packages/sqlalchemy/pool/base.py", line 273, in _create_connection
    return _ConnectionRecord(self)
  File "/opt/pipelines/.venv/lib/python3.10/site-packages/sqlalchemy/pool/base.py", line 388, in __init__
    self.__connect()
  File "/opt/pipelines/.venv/lib/python3.10/site-packages/sqlalchemy/pool/base.py", line 690, in __connect
    with util.safe_reraise():
  File "/opt/pipelines/.venv/lib/python3.10/site-packages/sqlalchemy/util/langhelpers.py", line 70, in __exit__
    compat.raise_(
  File "/opt/pipelines/.venv/lib/python3.10/site-packages/sqlalchemy/util/compat.py", line 211, in raise_
    raise exception
  File "/opt/pipelines/.venv/lib/python3.10/site-packages/sqlalchemy/pool/base.py", line 686, in __connect
    self.dbapi_connection = connection = pool._invoke_creator(self)
  File "/opt/pipelines/.venv/lib/python3.10/site-packages/sqlalchemy/engine/create.py", line 574, in connect
    return dialect.connect(*cargs, **cparams)
  File "/opt/pipelines/.venv/lib/python3.10/site-packages/sqlalchemy/engine/default.py", line 598, in connect
    return self.dbapi.connect(*cargs, **cparams)
  File "/opt/pipelines/.venv/lib/python3.10/site-packages/sqlalchemy_rdsiam/dbapi_psycopg2.py", line 60, in connect
    conn = _psycopg2_connect(**kwargs)
  File "/opt/pipelines/.venv/lib/python3.10/site-packages/psycopg2/__init__.py", line 122, in connect
    conn = _connect(dsn, connection_factory=connection_factory, **kwasync)
The above exception was caused by the following exception:
psycopg2.OperationalError: connection to server at "dbhost" (10.10.10.10), port 5432 failed: FATAL:  PAM authentication failed for user "dagster"
...
clairelin135 commented 1 year ago

ah thanks for raising this @peay. I'll fix this when I get the chance