dagster-io / dagster

An orchestration platform for the development, production, and observation of data assets.
https://dagster.io
Apache License 2.0
11.62k stars 1.46k forks source link

The dagster_k8s.execute_k8s_job function fails when trying to retrieve pod logs of pod containing multiple containers. #11853

Open dangal95 opened 1 year ago

dangal95 commented 1 year ago

Dagster version

1.1.7

What's the issue?

I am trying to launch a K8s job through a Dagster Op by calling the execute_k8s_job function from within that Op, as specified here. The Dagster Run is launched and the K8s job specified in the execute_k8s_job function is launched as a separate Job (as expected), I inspect the Pod and can see that all the containers I specified are running as expected. However, the Dagster Run fails automatically within a few seconds as it tries to retrieve logs from the Pod. It fails because no container is specified when it tries to retrieve the logs. Here is the full stack trace:

dagster._core.errors.DagsterExecutionStepExecutionError: Error occurred while executing op "deploy_op":
  File "/usr/local/lib/python3.7/site-packages/dagster/_core/execution/plan/execute_plan.py", line 266, in dagster_event_sequence_for_step
    for step_event in check.generator(step_events):
  File "/usr/local/lib/python3.7/site-packages/dagster/_core/execution/plan/execute_step.py", line 389, in core_dagster_event_sequence_for_step
    _step_output_error_checked_user_event_sequence(step_context, user_event_sequence)
  File "/usr/local/lib/python3.7/site-packages/dagster/_core/execution/plan/execute_step.py", line 94, in _step_output_error_checked_user_event_sequence
    for user_event in user_event_sequence:
  File "/usr/local/lib/python3.7/site-packages/dagster/_core/execution/plan/compute.py", line 177, in execute_core_compute
    for step_output in _yield_compute_results(step_context, inputs, compute_fn):
  File "/usr/local/lib/python3.7/site-packages/dagster/_core/execution/plan/compute.py", line 154, in _yield_compute_results
    user_event_generator,
  File "/usr/local/lib/python3.7/site-packages/dagster/_utils/__init__.py", line 460, in iterate_with_context
    return
  File "/usr/local/lib/python3.7/contextlib.py", line 130, in __exit__
    self.gen.throw(type, value, traceback)
  File "/usr/local/lib/python3.7/site-packages/dagster/_core/execution/plan/utils.py", line 91, in op_execution_error_boundary
    ) from e
The above exception was caused by the following exception:
kubernetes.client.exceptions.ApiException: (400)
Reason: Bad Request
HTTP response body: b'{"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"a container name must be specified for pod 70c9eb1f1777cc40bd1089e1b391bd42-crfg5, choose one of: [dagster api axon-synapse]","reason":"BadRequest","code":400}\n'
  File "/usr/local/lib/python3.7/site-packages/dagster/_core/execution/plan/utils.py", line 56, in op_execution_error_boundary
    yield
  File "/usr/local/lib/python3.7/site-packages/dagster/_utils/__init__.py", line 458, in iterate_with_context
    next_output = next(iterator)
  File "/usr/local/lib/python3.7/site-packages/dagster/_core/execution/plan/compute_generator.py", line 75, in _coerce_solid_compute_fn_to_iterator
    result = fn(context, **kwargs) if context_arg_provided else fn(**kwargs)
  File "/opt/dagster/app/dagster_src/graphs/axon_graph.py", line 111, in deploy_op
    execute_k8s_job(context, **context.op_config)
  File "/usr/local/lib/python3.7/site-packages/dagster/_annotations.py", line 108, in inner
    return target(*args, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/dagster_k8s/ops/k8s_job_op.py", line 305, in execute_k8s_job
    log_entry = next(log_stream)
  File "/usr/local/lib/python3.7/site-packages/kubernetes/watch/watch.py", line 163, in stream
    resp = func(*args, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/kubernetes/client/api/core_v1_api.py", line 23747, in read_namespaced_pod_log
    return self.read_namespaced_pod_log_with_http_info(name, namespace, **kwargs)  # noqa: E501
  File "/usr/local/lib/python3.7/site-packages/kubernetes/client/api/core_v1_api.py", line 23880, in read_namespaced_pod_log_with_http_info
    collection_formats=collection_formats)
  File "/usr/local/lib/python3.7/site-packages/kubernetes/client/api_client.py", line 353, in call_api
    _preload_content, _request_timeout, _host)
  File "/usr/local/lib/python3.7/site-packages/kubernetes/client/api_client.py", line 184, in __call_api
    _request_timeout=_request_timeout)
  File "/usr/local/lib/python3.7/site-packages/kubernetes/client/api_client.py", line 377, in request
    headers=headers)
  File "/usr/local/lib/python3.7/site-packages/kubernetes/client/rest.py", line 245, in GET
    query_params=query_params)
  File "/usr/local/lib/python3.7/site-packages/kubernetes/client/rest.py", line 235, in request
    raise ApiException(http_resp=r)

Also, the container name I specify in the container_config field for the main container (which is launched via the image field and not the pod_spec_config field. Is being disregarded and is always named "dagster" instead of the name I assign to it. Finally, even though the Dagster Run fails the job launched by the execute_k8s_job function keeps running.

What did you expect to happen?

In this case I expect the following to happen:

  1. A new execute_k8s_job function config parameter, such as "default_container_name" to be included so that we can define the container which the logs should be extracted from.
  2. When the Run fails, then the Job (execute_k8s_job) launched from within the Op should also be killed. Otherwise we think the pipeline failed but the job is still running in the background and this can produce unwanted results and consume resources without us knowing.
  3. Specifying a name field inside the container_config parameter for the "main" container (the one launched via the image parameter) should override the name "dagster" which is automatically assigned to the container.

How to reproduce?

Assuming you have an Op similar to this:

from dagster import op, graph, OpExecutionContext
from dagster_k8s import execute_k8s_job
from dagster_k8s.ops.k8s_job_op import K8S_JOB_OP_CONFIG

@op(config_schema=K8S_JOB_OP_CONFIG)
def deploy_op(context: OpExecutionContext):
    execute_k8s_job(context, **context.op_config)

@graph
def my_pipeline():
    deploy_op()

my_job = graph_def.to_job(name=job_name)

and a run config similar to this:

ops:
  deploy_op:
    config:
      container_config:
        name: main-container-name
      image: MAIN_CONTAINER_IMAGE_URL
      image_pull_policy: Always
      namespace: dagster
      pod_spec_config:
        containers:
        - image: SECOND_CONTAINER_IMAGE_URL
          image_pull_policy: Always
          name: second-container-name
        - image: THIRD_CONTAINER_IMAGE_URL
          image_pull_policy: Always
          name: third-container-name

You should be able to reproduce the error this way by running then running the job.

Deployment type

Dagster Helm chart

Deployment details

No response

Additional information

No response

Message from the maintainers

Impacted by this issue? Give it a 👍! We factor engagement into prioritization.

gibsondan commented 1 year ago

Fixed by https://github.com/dagster-io/dagster/pull/11916 - will go live this week, thanks for the report!

AndreaGiardini commented 1 year ago

@gibsondan Should we close this?