Open MattyKuzyk opened 4 months ago
Small followup, we are printing the PipesMessageHandler
object and unsurprisingly for our long running failed jobs we can see it didn't receive a close message:
{'_context': <dagster._core.execution.context.compute.OpExecutionContext object at 0x7f5acc04b700>, '_message_reader': <app.extensions.custom_pipe_log_reader.PipesK8sPodLogsMessageReader object at 0x7f5ad5fc59c0>, '_result_queue': <queue.Queue object at 0x7f5acc0583d0>, '_extra_msg_queue': <queue.Queue object at 0x7f5acc0584c0>, '_received_opened_msg': True, '_received_closed_msg': False, '_opened_payload': None}
Message queue length is: 0
Trying this out with the PipesK8sPodLogsMessageReader
would obviously be ideal but I want to stress that it errors out because it's not expecting there to be multiple containers. I can share the code from our extension if necessary but it's fairly straightforward.
EDIT: Just to make sure, we tried launching the sleep
process again using the default PipesK8sPodLogsMessageReader
, the only override we had to make was passing in a container="sidecar"
here. We still experienced the same error described above after 4 hours.
We suspected the Threads we were using in our custom log reader code was squashing an error from core_api.read_namespaced_pod_log
, but I still can't see any exception from Kubernetes even using the default log reader.
We have learned a lot and have found what we believe to the crux of the issue. In the run
function of the PipesK8sClient
, the main thread is blocked by consume_pod_logs
which in turn calls core_api.read_namespaced_pod_log
. Once the connection times out due to the Kubelet timeout mentioned above, that stream will simply end and unblock the main thread. No exception is thrown by Kubernetes.
At this point, the main thread will continue to client.wait_for_pod
. No logs will be listened to by Dagster again, and it will never receive any asset checks or close messages. The container will eventually exit and then Dagster will shut it down in the finally
block.
To mitigate this, we are overriding the PipesK8sClient
as well as the PipesK8sLogsMessageReader
and trying to spin the log reading out into its own thread that will keep refreshing the stream until it receives a signal from the main thread that the pod has terminated. Refreshing the log stream is tricky as read_namespaced_pod_log
does not offer a good interface for ensuring you don't duplicate or drop logs in between streams. If we duplicate the asset check message for instance, Dagster gets very upset.
hi @MattyKuzyk sorry for the delayed response here, and thanks for all you work to diagnose the issue. @alangenfeld is this something you would be able to take a look at?
Echoing Jamie thanks for the detailed report and follow up here. Did not know about this 4 hour timeout.
If you didn't have the multi-container constraint i would suggest using one of the blob storage message reader/writers but I speculate tweaking that to merge across multiple containers would be non trivial.
Refreshing the log stream is tricky as read_namespaced_pod_log does not offer a good interface for ensuring you don't duplicate or drop logs in between streams. If we duplicate the asset check message for instance, Dagster gets very upset.
Ya agree this is not as easy as I hoped. Looking at https://github.com/kubernetes-client/python/blob/master/kubernetes/docs/CoreV1Api.md#read_namespaced_pod_log I am thinking
timestamps=True
to get timestamps we can track. Unsure if we should print them or just strip themsince_seconds
to prevent having to re-fetch all logs on reconnect Sorry for the late reply, I was out last week.
We are working on a custom extension that would resolve this issue. We already have a custom extension for multi container support. It will probably be in a rough state for a while but maybe we'll have time to polish it up for an upstream PR sometime this month.
Dagster version
1.6.13
What's the issue?
Our asset launches pods via
PipesK8sClient.run()
:On the runner side, when the ETL process is finished, we report an asset materialization to Dagster Pipes:
We also have an asset check called
main_container_success
we report back to Dagster.This works fantastic for us and we run these workloads many times a day without issue. However, when the run exceeds 4 hours, it will consistently fail even if the code exits successfully. Here is a screenshot showing a
busybox
pod simply runningsleep
we used to test this after seeing the transient issue in production workloads:Note that all the runs over 4 hours are failure, all under are success.
What did you expect to happen?
We expect the materialization to be reported as a success for pipes pods that run longer than 4 hours.
How to reproduce?
sleep 15000
viaDagsterK8sPipesClient.run()
Deployment type
Dagster Helm chart
Deployment details
PipesK8sPodLogsMessageReader
. It extracts logs viacore_api.read_namespaced_pod_log
for each container in the pod (we have init containers and sidecars) and merges the streams together. This is needed as multi-container log reading is not supported by default. See some of this PR discussion for contextAdditional information
Successful run (note the
ASSET_CHECK_EVALUATION
that's not present above):Message from the maintainers
Impacted by this issue? Give it a 👍! We factor engagement into prioritization.