apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
35.21k stars 13.76k forks source link

airflow-webserver crashes if log files are larger than webserver available RAM #31105

Open getaaron opened 1 year ago

getaaron commented 1 year ago

Apache Airflow version

2.6.0

What happened

I tried to view a 400MB dag log file in the web server UI.

The available RAM on the Kubernetes pod is 50MB.

The Airflow webserver crashed.

What you think should happen instead

Either

  1. The log file should be truncated and an error shown that there's not enough RAM, or
  2. The log file should not be fully realized into RAM and should be streamed to frontend instead

How to reproduce

  1. create airflow on k8s with 50Mi requested pod sizes
  2. make a dag that generates 400 megabyte log files
  3. attempt to view the log file in the airflow UI

Operating System

Debian GNU/Linux 11 (bullseye)

Versions of Apache Airflow Providers

apache-airflow==2.6.0 apache-airflow-providers-cncf-kubernetes==6.1.0 apache-airflow-providers-oracle==3.0.0 apache-airflow-providers-slack==7.2.0

Deployment

Official Apache Airflow Helm Chart

Deployment details

No response

Anything else

No response

Are you willing to submit PR?

Code of Conduct

boring-cyborg[bot] commented 1 year ago

Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval.

getaaron commented 1 year ago

Haven't worked in airflow before, I could potentially open a PR if someone can describe what the fix should be

potiuk commented 1 year ago

Haven't worked in airflow before, I could potentially open a PR if someone can describe what the fix should be

I think you pretty well described it :) .

What you think should happen instead Either

The log file should be truncated and an error shown that there's not enough RAM, or The log file should not be fully realized into RAM and should be streamed to frontend instead

One of those seems to be a good solution, just the place where airflow loads the file in memory has to be find and one of the solutions above appied (there are many different ways of loading logs - depending on the view you used and state of the task and log configuration (for example if there is remote logging or streaming it is different than when local logs are used and different when celery logs are used).

I marked it as good-first-issue for you whoever would like to fix it , it's likely a bit more than first issue though and requires quite a bit of digging in the code, but if someone knows a bit about Python/webserver/flask it should not be that difficult to find the right place.

getaaron commented 1 year ago

@SkandaPrasad-S I’m not picking it up so you should feel free to grab it

On Mon, May 8, 2023 at 10:25 AM SkandaPrasad-S @.***> wrote:

Is anyone picking this up? I can take this up as as I have experience in flask and webserver. Can I give it a go?

— Reply to this email directly, view it on GitHub https://github.com/apache/airflow/issues/31105#issuecomment-1538566024, or unsubscribe https://github.com/notifications/unsubscribe-auth/AAGAYSOFUBC36HKFOBL5PUTXFEF5BANCNFSM6AAAAAAXXSKOG4 . You are receiving this because you authored the thread.Message ID: @.***>

killua1zoldyck commented 1 year ago

@potiuk if no one's working on this and if this has not been fixed yet I can work on this.

potiuk commented 1 year ago

Feel free!

killua1zoldyck commented 1 year ago

@potiuk The current log-streaming logic pulls the whole file into memory before moving to the specific log_pos based on the metadata after combining all the logs and sorting them based on timestamp. https://github.com/apache/airflow/blob/24e3d6ce57eae1784066ed5678369e61637285a4/airflow/utils/log/file_task_handler.py#L334-L350 So, I don't think we can change the logic to make it streaming. Is my thinking correct?

potiuk commented 1 year ago

I have not looked at it befor so I have no idea. I was kind of hoping that whoever takes on the task will be able to propose the solution - possibly involving chnaging the way how it is done.

potiuk commented 1 year ago

The thing with any change here is that it starts with things being done one way, where the change to fix it is to reimplement it to be done differently :D

killua1zoldyck commented 1 year ago

Yeah, I thought of a way where we could maintain separate log_pos for different streams. This way we do not have to pull the whole file into memory just to get the final few lines. But I have some reservations about this with regard to whether the log order would change. I will look into this.

getaaron commented 1 year ago

Yeah, I thought of a way where we could maintain separate log_pos for different streams. This way we do not have to pull the whole file into memory just to get the final few lines. But I have some reservations about this with regard to whether the log order would change. I will look into this.

If the separate log files are already sorted (I assume they are) then you can use a k-way merge to produce a sorted combined list without loading the whole thing into memory: https://en.m.wikipedia.org/wiki/K-way_merge_algorithm

If they're not already sorted (unlikely) then you could sort them individually, then use the k-way merge sort.

killua1zoldyck commented 1 year ago

This seems great. Thank you!

potiuk commented 1 year ago

Yeah. Agree with @getaaron.

This one is not that eeasy because of interleaving logs from different sources. I looked at it and really what you would have to do is two do either of those:

or (probably easier)

Maybe a better solution will be to introduce some hard-limits on the size of logs that you can get to memory and "hard-stop" if any of the sources will attempt to get logs bigger than the size - returning "log too large to show" instead?. Then the thing to add is an optional max_lenght or max_size that could be passed to the methods returning arrays and implement it in all the implementations to raise a specific exception if the returned array would be too big.

This is not really usable to swift through the 400 MB log in Airflow UI. And I don't think we have good mechanism to keep such log in memory of the browser, so I am not even sure if we could show such log at all in Airflow UI even if the backend could handle it

killua1zoldyck commented 1 year ago

We can assign hard-limits and that would solve this current issue. We can just sum filesizes before loading any of them (or most of them, I do not know we can get all of their sizes) into memory. But, I believe even for file sizes smaller than that we need not load the whole file into memory and sort for every auto-tailing call.

  • instead of storing the logs in in-memory lists, stream them to temporary files and read them from there (and then indeed k-way merge would be better

We can do this. Now, this will be in sorted order however if the task is still running new logs could have come in the time we sent our response. Even for this, we need to maintain a log position for different log streams and call the reading methods with appropriate metadata to update this temp file. And for some of the methods we still need to load the whole file into memory like the HDFS-one. For them we could filter out after loading into memory. I believe we can do this as it could reduce memory-usage and network congestion and we are sending metadata back-and-forth anyways might as well send the log positions of a few more files. What do you think?

potiuk commented 1 year ago

We can assign hard-limits and that would solve this current issue. We can just sum filesizes before loading any of them (or most of them, I do not know we can get all of their sizes) into memory.

Or count rows if can't get the sizes - that should be easier.

But, I believe even for file sizes smaller than that we need not load the whole file into memory and sort for every auto-tailing call.

Correct - not having to join the files in-memory will defininitely decrease the memory reuirements for webserver even for smaller files.

We can do this. Now, this will be in sorted order however if the task is still running new logs could have come in the time we sent our response. Even for this, we need to maintain a log position for different log streams and call the reading methods with appropriate metadata to update this temp file.

Agree. I think the log pos in metadata in this case will be a bit tricky and should be "per stream"/ "user". And there will be cases where someone just aut-tails the logs (in which cases it is fine to keep returned data in memory), but for cases when someone reads log from the beginning, the size might still be substantial (so keeping it in a file makes sense).

And we should likely have a separate path for cases like S3 if we want to further optimize it - for cases where remote log is never streaming because for them we can either have a full log file or nothing (this is object, not file storage so, for S3 we will not even see a log until it is complete). In this case we should not worry about "tailing" the log. And we could use some caching (at the expense of isolation) so that the log from s3 is downloaded only once per "webserver" and kept for some time (and reused between userss/sessions) so that if few people look at the same task log or hit refresh button, the "remote" reading of that file will not happen over and over again.

And for some of the methods we still need to load the whole file into memory like the HDFS-one. For them we could filter out after loading into memory.

Hmm. Not sure how HDFS works in thie case - and not how much "streaming" vs. "static" - i.e. cannot change once it is published - caching it should help as well. Even if it is "appendable" but we cannot stream it, maybe there is a way to store a hash or mtime of the file and only pull it if it changed ? But even in this case - I guess we do not need to pull the whole file in memory - maybe we would need to update HDFS hook for that, but I can't imagine the case where we have to load the whole file to memory in order to pull it from remote.

I believe we can do this as it could reduce memory-usage and network congestion and we are sending metadata back-and-forth anyways might as well send the log positions of a few more files. What do you think?

Yeah. I think we could do actually both - save logs on disks and also have some limits, becuase that's not only memory but networking + I/O + time that is saved this way. Plus if we do some caching, we can also optimise cases where various people look at the same logs.

Overall I think it is all "doable" and even if we don't implement and handle all cases, it can be gradually optimized.

I'd love to hear also what @dstandish has to say about it :) if he has time to read all the discussion that is :)

killua1zoldyck commented 1 year ago

I will work on these inputs for now. Thank you!

getaaron commented 11 months ago

@killua1zoldyck how's it going? let us know if you get stuck on anything

killua1zoldyck commented 11 months ago

Thank you! I've started working on it, but I will reach out if I need any help.

killua1zoldyck commented 11 months ago

Hey @getaaron , I have implemented the k-way-merge and it works fine. For now, I have specified the limit as 50 lines per call and it keeps calling till the end_of_log is reached. The metadata that gets sent back-and-forth looks something like this

{
  "metadata": {
    "end_of_log": false,
    "executor_logs": {
      "log_pos": 0
    },
    "immediate_tail": true,
    "local_logs": {
      "/root/airflow/logs/dag_id=log_generator_dag/run_id=manual__2023-07-09T19:52:37.154177+00:00/task_id=generate_logs_5mb/attempt=1.log": {
        "done": false,
        "lines": 200,
        "need": 50,
        "prev_position": 14269
      }
    },
    "remote_logs": {
      "log_pos": 0
    },
    "served_logs": {
      "log_pos": 0
    }
  }
}

Where previously it looked something like this

{
  "metadata": {
    "end_of_log": false,
    "log_pos: 100,
  }
}

I thought this would only affect the Airflow UI piece that calls the /get_logs_with_metadata. However, I just realized that it is used in https://airflow.apache.org/docs/apache-airflow/stable/stable-rest-api-ref.html#operation/get_log. Should we be creating a separate flow for auto streaming logs from the UI and have the same for the rest api?

getaaron commented 11 months ago

Hmm, does it mean you’re doing the stream merge on the client (browser) side?

I haven’t seen this API before but I wonder if there’s a way to supplement the full_content: false param with a reverse: true?

Is your goal with the updated UI to support full pagination through all the logs, or just show the most recent ~1,000 lines?

killua1zoldyck commented 11 months ago

Hmm, does it mean you’re doing the stream merge on the client (browser) side?

Currently the merging is happening in the server side but it reads all the logs into memory and merging them before truncating already read parts. I have changed the reading and merging based on what was already read and this we get using the metadata (In the server side).

I haven’t seen this API before but I wonder if there’s a way to supplement the full_content: false param with a reverse: true?

I don't think it has a reverse: true parameter. https://github.com/apache/airflow/blob/main/airflow/api_connexion/endpoints/log_endpoint.py . If we know the total character length of our logs before hand we can send that as a token. This too would read the whole file and then truncate the end so the memory would still be an issue.

Is your goal with the updated UI to support full pagination through all the logs, or just show the most recent ~1,000 lines?

I was thinking full pagination through all the logs till the end. The one I have implemented now does that by reading 50 lines from a file at a time.

killua1zoldyck commented 11 months ago

If we know the total character length of our logs before hand we can send that as a token. This too would read the whole file and then truncate the end so the memory would still be an issue.

No. we can't do this.

killua1zoldyck commented 10 months ago

I won't be able to continue working on this issue for now. If anyone else is available, feel free to take it over. Apologies for any inconvenience!

potiuk commented 10 months ago

No worries. Thanks for attempting to fix it - it's not an easy one to deal with.

yatink commented 10 months ago

Hi everyone, I'm new to airflow and I arrived here because this issue had the good first issue label on it (which may not be accurate after reading the discussion 😅).

I'm willing to pick this one up since it seems like an interesting problem. The challenge for me is going to be able to reproduce the issue first and see it for myself before I start working on it.

What I can do is, I can start working on reproing it, after I've been able to do that, I can assign the issue to myself and work on a fix (that way if someone more familiar with the codebase has bandwidth to pick it up, they can still do so).

Just for reference, is there an existing PR associated with it ? From the discussion, it seemed like some work was already done on it...

potiuk commented 10 months ago

Yep. It's not a "Good first issue" -> removed it :)

If there were a PR - it would have been linked. I tink @killua1zoldyck was mostly experimented on his own changes and it never materialized in a form of PR. But maybe @killua1zoldyck has something that can be shared.

yatink commented 10 months ago

Ok cool, I'd like to keep the plugging away at it nonetheless, since it does sound like an interesting problem. I'll work on reproducing the problem and post back here when I'm ready to claim the issue. Wish me luck!

potiuk commented 10 months ago

🤞

killua1zoldyck commented 10 months ago

Hi, @yatink I did implement the k way merge suggested (although there were a few issues with my implementation). I didn't notice that there was a resource log_endpoint.py and my implementation meant that it was to be changed.

yatink commented 10 months ago

@killua1zoldyck, can you put your k-way merge implementation into a gist (or similar) temporarily so that I can reference them ? If not, no worries.

yatink commented 10 months ago

The way that I'm thinking about solving this issue is to do so in 2 steps:

  1. First, stop the webserver from crashing and pop a reasonable error message instead
  2. Continue down the path that @killua1zoldyck had started and implement the k way merge to enable it to handle large log files

I'm on the fence about whether they should be tracked as separate issues or not. I'll be able to give a more informed opinion after I've made more progress.

@potiuk and/or @getaaron, let me know if step 1 is a reasonable short term approach or whether I should just skip it and do step 2.

potiuk commented 10 months ago

@potiuk and/or @getaaron, let me know if step 1 is a reasonable short term approach or whether I should just skip it and do step 2.

Yes. This is exactly how I would have imagined it too. It's quite likely as well that we will stop at task 1 if we find 2 too much of complexity. I have a feeling that trying to implement k-way merge sort in this case is not giving "the bang for the bucks". If you have such a huge log, finding anything meaningful in Airlfow UI for it will be next to impossible, search and everything there will also struggle if you try to scroll through it and you would likely have to implement a full-fledged search and likely other APIs to make use of such a huge log.

In this case just posting a message "Your log is definitely too large to handle" and pointing the user to "Consider fine-tuning your tasks to make your logs smaller or configure remote logging and choose some dedicated logging solution such to store your logs" is quite likely 9X% of the effect with 1% of the effort.

yatink commented 9 months ago

Sorry for the delay, I got airflow on kube set up on my laptop, so I'm going to pick up this issue and attempt to implement option 1 👇🏽 .

First, stop the webserver from crashing and pop a reasonable error message instead

Do you all have any large dag files lying around, preferably one that's larger than 400MB ? 😄 That's all that I'm missing (in theory) to reproduce the issue.

killua1zoldyck commented 9 months ago

Sorry for the delay, I got airflow on kube set up on my laptop, so I'm going to pick up this issue and attempt to implement option 1 👇🏽 .

First, stop the webserver from crashing and pop a reasonable error message instead

Do you all have any large dag files lying around, preferably one that's larger than 400MB ? 😄 That's all that I'm missing (in theory) to reproduce the issue.

We can just have a PythonOperator that takes as an argument the size of logs that it has to produce and it can log, say 400 MB of random value.

yatink commented 9 months ago

Hmmm...it appears that I don't have the privs to assign this issue to myself. Can someone help me with that (either by giving me privs or assigning this issue to me directly) ?

potiuk commented 9 months ago

Assigned. Good Luck :)

mujiannan commented 5 months ago

Even a small log file can destroy the gunicorn in webserver.
Run a task in kubernetes, log to s3, then you will go into a `interleave' condition, viewing logs of the running task from webui cause a gunicorn "cpu 100%" and crashing soon.

potiuk commented 5 months ago

Even a small log file can destroy the gunicorn in webserver. Run a task in kubernetes, log to s3, then you will go into a `interleave' condition, viewing logs of the running task from webui cause a gunicorn "cpu 100%" and crashing soon.

If you can provide some actionable details - like size of your log for example details of memory allocated for your processes, then that would be much more actionable than "small log crashing server"

potiuk commented 5 months ago

Also versiin of packages that causes it - would also be useful.