dagster-io / dagster

An orchestration platform for the development, production, and observation of data assets.
https://dagster.io
Apache License 2.0
11.49k stars 1.45k forks source link

dagster_cloud_cli.core.errors.GraphQLStorageError: Error in GraphQL response: [{'message': 'Internal Server Error (Trace ID: 246616244862681833)', 'locations': [{'line': 13, 'column': 13}], 'path': ['runs', 'getRunRecords']}] #18947

Open xxkennyxu opened 9 months ago

xxkennyxu commented 9 months ago

Dagster version

dagster, version 1.2.2

What's the issue?

We have a sensor that checks ongoing runs and if there are more than X running, we should not be kicking off a new materialization.

    running_jobs = context.instance.get_run_records( 
        filters=RunsFilter(
            job_name="my_job",
            statuses=[
                DagsterRunStatus.STARTING,
                DagsterRunStatus.STARTED,
                DagsterRunStatus.QUEUED,
            ],
        )
    )

    if len(running_jobs) > 5:
        return SkipReason("Too many running jobs")

Occasionally we run into this exception:

Error
dagster._core.errors.SensorExecutionError: Error occurred during the execution of evaluation_fn for sensor my_sensor
  File "/usr/local/lib/python3.8/site-packages/dagster/_grpc/impl.py", line 370, in get_external_sensor_execution
    return sensor_def.evaluate_tick(sensor_context)
  File "/usr/local/lib/python3.8/contextlib.py", line 131, in __exit__
    self.gen.throw(type, value, traceback)
  File "/usr/local/lib/python3.8/site-packages/dagster/_core/errors.py", line 294, in user_code_error_boundary
    raise error_cls(

The above exception was caused by the following exception:

dagster_cloud_cli.core.errors.GraphQLStorageError: Error in GraphQL response: [{'message': 'Internal Server Error (Trace ID: 246616244862681833)', 'locations': [{'line': 13, 'column': 13}], 'path': ['runs', 'getRunRecords']}]
  File "/usr/local/lib/python3.8/site-packages/dagster/_core/errors.py", line 287, in user_code_error_boundary
    yield
  File "/usr/local/lib/python3.8/site-packages/dagster/_grpc/impl.py", line 370, in get_external_sensor_execution
    return sensor_def.evaluate_tick(sensor_context)
  File "/usr/local/lib/python3.8/site-packages/dagster/_core/definitions/sensor_definition.py", line 726, in evaluate_tick
    result = self._evaluation_fn(context)
  File "/usr/local/lib/python3.8/site-packages/dagster/_core/definitions/sensor_definition.py", line 1022, in _wrapped_fn
    raw_evaluation_result = fn(**context_param, **resource_args_populated)
  File "/usr/src/app/services/dagster/dags/sensors/defs/my_sensor.py", line 61, in my_sensor
    running_jobs = context.instance.get_run_records(  # type: ignore
  File "/usr/local/lib/python3.8/site-packages/dagster/_utils/__init__.py", line 670, in inner
    return func(*args, **kwargs)
  File "/usr/local/lib/python3.8/site-packages/dagster/_core/instance/__init__.py", line 1768, in get_run_records
    return self._run_storage.get_run_records(
  File "/usr/local/lib/python3.8/site-packages/dagster_cloud/storage/runs/storage.py", line 291, in get_run_records
    res = self._execute_query(
  File "/usr/local/lib/python3.8/site-packages/dagster_cloud/storage/runs/storage.py", line 175, in _execute_query
    res = self._graphql_client.execute(
  File "/usr/local/lib/python3.8/site-packages/dagster_cloud_cli/core/graphql_client.py", line 147, in execute
    raise GraphQLStorageError(str(e)) from e

The above exception was caused by the following exception:

dagster_cloud_cli.core.errors.GraphQLStorageError: Error in GraphQL response: [{'message': 'Internal Server Error (Trace ID: 246616244862681833)', 'locations': [{'line': 13, 'column': 13}], 'path': ['runs', 'getRunRecords']}]
  File "/usr/local/lib/python3.8/site-packages/dagster_cloud_cli/core/graphql_client.py", line 79, in execute
    return self._execute_retry(query, variable_values, headers)
  File "/usr/local/lib/python3.8/site-packages/dagster_cloud_cli/core/graphql_client.py", line 191, in _execute_retry
    raise GraphQLStorageError(f"Error in GraphQL response: {result['errors']}")

Any clue why we might be running into this exception - this triggers an alert on Slack for us each time

What did you expect to happen?

No response

How to reproduce?

No response

Deployment type

None

Deployment details

No response

Additional information

No response

Message from the maintainers

Impacted by this issue? Give it a 👍! We factor engagement into prioritization.

johannkm commented 9 months ago

Hi @xxkennyxu, it looks like the request is timing out. If you're only using len(running_jobs), you could switch to the method instance.get_runs_count which should be faster. If you need the run objects, there's a limit and cursor argument on get_run_records that you could experiment with

johannkm commented 9 months ago

Closing, but let me know if this doesn't work and I'll reopen

xxkennyxu commented 9 months ago

Hi @xxkennyxu, it looks like the request is timing out. If you're only using len(running_jobs), you could switch to the method instance.get_runs_count which should be faster. If you need the run objects, there's a limit and cursor argument on get_run_records that you could experiment with

Thanks! Let me try that :)

xxkennyxu commented 9 months ago

Getting the same error unfortunately :(

dagster_cloud_cli.core.errors.GraphQLStorageError: Error in GraphQL response: [{'message': 'Internal Server Error (Trace ID: 5143878769993498507)', 'locations': [{'line': 4, 'column': 13}], 'path': ['runs', 'getRunsCount']}]
gibsondan commented 9 months ago

Hey @xxkennyxu - can you try this workaround? Counterintuitively I think the additional job name filter is causing it to be less performant and sporadically time out, so doing that filter in memory might work better:

    run_records = context.instance.get_run_records( 
        filters=RunsFilter(
            statuses=[
                DagsterRunStatus.STARTING,
                DagsterRunStatus.STARTED,
                DagsterRunStatus.QUEUED,
            ],
        )
    )

    run_records = [run_record for run_record in run_records if run_record.dagster_run.job_name == "my_job"]

We'll look into improvements on our side for this, but that's a change you can make right away in the meantime.

gibsondan commented 9 months ago

(that workaround won't be good for all cases if the job name is significantly reducing the number of runs returned, but generally the number of in-progress runs across all jobs is capped at a reasonable value, so it should be an option here)

xxkennyxu commented 9 months ago

Yup this works for now as we don't have too many jobs in these states. Usually < 50.

Thanks for helping troubleshoot here - I haven't seen any other errors related to this since changing it! 🎉

Can you keep me posted when we're good to use the job_name filter again 🙏