PrefectHQ / prefect

Prefect is a workflow orchestration framework for building resilient data pipelines in Python.
https://prefect.io
Apache License 2.0
15.78k stars 1.54k forks source link

Excessive amount of AssertionError: feed_data before feed_eof in the log #6335

Open drfraser opened 2 years ago

drfraser commented 2 years ago

First check

Bug summary

Everything seems to be working fine, but when I looked in the orion.log file (SERVER_LEVEL:WARNING) I see the exception as below. Unfortunately I get the impression I can't dynamically change the logging level using SIGHUP/re-read configuration so I can't add any more context and I don't want to stop the Orion server etc.

If I could get some background info on why this might be happening over and over, I can debug it further. Perhaps my code is doing something incorrectly?

Reproduction

Need more info before I can provide code

Error

17:35:38.216 | ERROR   | asyncio - Exception in callback SubprocessStreamProtocol.pipe_data_received(1, b'INFO:     1...1.1" 200 OK\n')
handle: <Handle SubprocessStreamProtocol.pipe_data_received(1, b'INFO:     1...1.1" 200 OK\n')>
Traceback (most recent call last):
  File "/usr/lib/python3.8/asyncio/events.py", line 81, in _run
    self._context.run(self._callback, *self._args)
  File "/usr/lib/python3.8/asyncio/subprocess.py", line 73, in pipe_data_received
    reader.feed_data(data)
  File "/usr/lib/python3.8/asyncio/streams.py", line 472, in feed_data
    assert not self._eof, 'feed_data after feed_eof'
AssertionError: feed_data after feed_eof

Versions

Version:             2.0.3
API version:         0.8.0
Python version:      3.8.10
Git commit:          2f1cf4ac
Built:               Fri, Aug 5, 2022 3:57 PM
OS/Arch:             linux/x86_64
Profile:             default
Server type:         hosted

Additional context

No response

zanieb commented 2 years ago

This error comes from AnyIO's parsing of subprocess output streams. I've seen it locally a few times and have generally tried to squash it, but I'm not sure what the cause is.

prefect.utilities.processutils.run_process is the relevant implementation. If you run that directly in a script, can you reproduce this error?

drfraser commented 2 years ago

Some context: I tried to determine what flow was running when this assertion happened, but it seemed Prefect was idle at the time. And the assertion kept happening over and over, i.e. the timestamps were only seconds apart. So I have the impression there is some sort of cleanup not being done properly and thus the assertion kept getting triggered.

There was one flow that just failed hard - the last task is a prefect_shell_command which executes rsync. The rsync was successful but then the task failed and there was no exception info or anything useful at all. I forget the exact text of the message unfortunately but it was very terse. I think (I guess) this failure is related to the assertion error. Other executions of this rsync task have not failed though.

I have now restarted the Orion server to adjust logging info etc and will update this issue when I have more info. But I suspect I won't see the assertion again because it was triggered by this hard crash of a task yesterday.

drfraser commented 2 years ago

The flows have been running all day with no errors, so I think the cause of this AssertionError was the two failed tasks from the day before. One, as I mentioned, did everything it was supposed to, then crashed with this error and no stack trace:

Crash detected! Execution was interrupted by an unexpected exception.

No idea how to recreate this except by killing a subprocess/task

The other strange failure is described in the other issue I've filed, that of a missing orion.db-wal file. No idea how that occurred either and that flow has been running successfully all day. So no idea how to recreate this as well.

The last possible source of this error was the problem I had with using requests in a task that was called asynchronously. I will unfix my code and see if that failure is the source of this AssertionError, but I don't think it will be.

drfraser commented 2 years ago

I recreated the problem I had with my async code but it did not result in any AssertionErrors. Everything seems to be running as I'd expect and I'm not having any problems. So I am not sure what else I can do here to help.

zanieb commented 2 years ago

We'll have to just see if we get some more reports. Thanks for digging into it!

drfraser commented 2 years ago

Looking at the code, my suggestion would be this: in src/prefect/engine.py, change lines 1154/1155 and 1195/1196 to use logger.error.

Then end users like me, who are unlikely to initially set logging to DEBUG for the Prefect modules, will always end up getting the debug info needed in the log. If this had been the case, I could have more info for you and maybe debugged it myself.

nicholasjng commented 2 years ago

Hey, I hope it's ok for me to comment on this. I am facing the same problem, with my Prefect agent actually crashing at the end of the flow that I am running.

As I am trying to run a deployment with a schedule, this is really unfortunate, because the flow crashing the agent means that the schedule is not really possible to implement (unless you manually restart the agent every time, which I do not really want to have to do for obvious reasons).

EDIT: The flow actually runs to completion, and finally crashes after the last message ("Flow finished in state 'Completed()'") on a UnicodeError:

UnicodeDecodeError: 'utf-8' codec can't decode byte 0xdf in position 95: invalid continuation byte
An exception occurred.

FWIW, I am running Prefect 2.0.4 on Windows. Thanks in advance for some guidance!

drfraser commented 2 years ago

Can you set your logging level to DEBUG so the AssertionError messages comes with a stack trace? (see the lines of code I mention). It's really good you seem to have a repeatable error. As for the Unicode error, I'd say that is in your flow somewhere and maybe is the root cause of the AssertionError / some subtask crashing etc.

nicholasjng commented 2 years ago

Thank you for listing those references. I tried setting PREFECT_LOGGING_LEVEL="DEBUG" and PREFECT_LOGGING_ROOT_LEVEL="DEBUG" for the agent process, but did not get any more detailed information than this:

15:34:40.977 | ERROR   | asyncio - Exception in callback SubprocessStreamProtocol.pipe_data_received(2, bytearray(b"1...to date.\r\n"))
handle: <Handle SubprocessStreamProtocol.pipe_data_received(2, bytearray(b"1...to date.\r\n"))>
Traceback (most recent call last):
  File "C:\Users\nju\AppData\Local\Programs\Python\Python310\lib\asyncio\events.py", line 80, in _run
    self._context.run(self._callback, *self._args)
  File "C:\Users\nju\AppData\Local\Programs\Python\Python310\lib\asyncio\subprocess.py", line 72, in pipe_data_received
    reader.feed_data(data)
  File "C:\Users\nju\AppData\Local\Programs\Python\Python310\lib\asyncio\streams.py", line 457, in feed_data
    assert not self._eof, 'feed_data after feed_eof'
AssertionError: feed_data after feed_eof

Any idea where to set the debug level required to obtain the exc_info? I assumed the PREFECT_LOGGING_LEVEL was going to enable it.

EDIT: I'm not even confident anymore that I can obtain the exception anyway, since the flow does not actually show up as crashed - in fact, all flows show up as completed in my Orion dashboard. So it is something else crashing the agent:

Traceback (most recent call last):
  File "C:\Users\nju\Workspaces\python\venv\lib\site-packages\prefect\cli\_utilities.py", line 41, in wrapper
    return fn(*args, **kwargs)
  File "C:\Users\nju\Workspaces\python\venv\lib\site-packages\prefect\utilities\asyncutils.py", line 193, in wrapper
    return run_async_in_new_loop(async_fn, *args, **kwargs)
  File "C:\Users\nju\Workspaces\python\venv\lib\site-packages\prefect\utilities\asyncutils.py", line 140, in run_async_in_new_loop
    return anyio.run(partial(__fn, *args, **kwargs))
  File "C:\Users\nju\Workspaces\python\venv\lib\site-packages\anyio\_core\_eventloop.py", line 70, in run
    return asynclib.run(func, *args, **backend_options)
  File "C:\Users\nju\Workspaces\python\venv\lib\site-packages\anyio\_backends\_asyncio.py", line 292, in run
    return native_run(wrapper(), debug=debug)
  File "C:\Users\nju\AppData\Local\Programs\Python\Python310\lib\asyncio\runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "C:\Users\nju\AppData\Local\Programs\Python\Python310\lib\asyncio\base_events.py", line 646, in run_until_complete
    return future.result()
  File "C:\Users\nju\Workspaces\python\venv\lib\site-packages\anyio\_backends\_asyncio.py", line 287, in wrapper
    return await func(*args)
  File "C:\Users\nju\Workspaces\python\venv\lib\site-packages\prefect\cli\agent.py", line 93, in start
    async with OrionAgent(
  File "C:\Users\nju\Workspaces\python\venv\lib\site-packages\prefect\agent.py", line 271, in __aexit__
    await self.shutdown(*exc_info)
  File "C:\Users\nju\Workspaces\python\venv\lib\site-packages\prefect\agent.py", line 260, in shutdown
    await self.task_group.__aexit__(*exc_info)
  File "C:\Users\nju\Workspaces\python\venv\lib\site-packages\anyio\_backends\_asyncio.py", line 662, in __aexit__
    raise exceptions[0]
  File "C:\Users\nju\Workspaces\python\venv\lib\site-packages\prefect\infrastructure\submission.py", line 48, in submit_flow_run
    return await infrastructure.run(task_status=task_status)
  File "C:\Users\nju\Workspaces\python\venv\lib\site-packages\prefect\infrastructure\process.py", line 66, in run
    process = await run_process(
  File "C:\Users\nju\Workspaces\python\venv\lib\site-packages\prefect\utilities\processutils.py", line 76, in run_process
    await consume_process_output(
  File "C:\Users\nju\Workspaces\python\venv\lib\site-packages\prefect\utilities\processutils.py", line 90, in consume_process_output
    async with anyio.create_task_group() as tg:
  File "C:\Users\nju\Workspaces\python\venv\lib\site-packages\anyio\_backends\_asyncio.py", line 662, in __aexit__
    raise exceptions[0]
  File "C:\Users\nju\Workspaces\python\venv\lib\site-packages\prefect\utilities\processutils.py", line 104, in stream_text
    async for item in source:
  File "C:\Users\nju\Workspaces\python\venv\lib\site-packages\anyio\abc\_streams.py", line 31, in __anext__
    return await self.receive()
  File "C:\Users\nju\Workspaces\python\venv\lib\site-packages\anyio\streams\text.py", line 44, in receive
    decoded = self._decoder.decode(chunk)
  File "C:\Users\nju\AppData\Local\Programs\Python\Python310\lib\codecs.py", line 322, in decode
    (result, consumed) = self._buffer_decode(data, self.errors, final)
UnicodeDecodeError: 'utf-8' codec can't decode byte 0xdf in position 95: invalid continuation byte
An exception occurred.
drfraser commented 2 years ago

ok... the Unicode error. I had something like this once that I had to patch in someone else's Windows Python app I was using. it was a utf-8 file but there was still a bad character sequence that couldn't be translated right given the character set being used (it wasn't the usual WINDOWS-1252)

So the Unicode error has absolutely nothing to do with your flow? Then it might be some data getting sent to the Agent and then whatever tasks/flow the agent is managing get fouled up, thus the AssertionError. My impulse would be to stick in some extra debugging code in the agent related packages to dump as much process/thread info as possible. e.g. what exactly is the function being wrapped (re: first line of the stack trace) and its arguments? that would be a useful clue. Or patch the codecs.py file to output the data that makes it break.

FYI, I do not work for PrefectHQ, so I am only trying to help out but am not familiar with any of the code. Hopefully madkinsz will reply at some point

zanieb commented 2 years ago

We might need to open an issue upstream on AnyIO. @nicholasjng would you mind opening another issue here as "Agent crashes when subprocess exits on Windows"? A minimal example would be helpful. This issue is distinct from @drfraser's.

I'll see if I can find a good way to prevent these errors from appearing.

nicholasjng commented 2 years ago

We might need to open an issue upstream on AnyIO. @nicholasjng would you mind opening another issue here as "Agent crashes when subprocess exits on Windows"? A minimal example would be helpful. This issue is distinct from @drfraser's.

I'll see if I can find a good way to prevent these errors from appearing.

Done in #6446. Thanks for the PR related to the subprocess output parsing, by the way - I will install Prefect at HEAD when it's merged and check if that fixes the issue.

zanieb commented 2 years ago

Thanks! It seems unlikely to resolve your specific issue, but I've got a separate idea for it that I can hopefully get done soon :)

nicholasjng commented 2 years ago

I was able to solve those errors by enabling UTF-8 codecs globally for my agent's Python process on Windows. This was apparently not the case by default, since Windows still uses legacy encodings, see this paragraph in the Python docs. I guess anyIO's reported decoder errors were prompted by this implicit usage of ANSI encoding in Windows Python.

So, in short, setting PYTHONUTF8=1 in my Prefect agent's Powershell instance, as recommended in the docs, makes everything work smoothly. Maybe this also works for you @drfraser? (Even though you are on Linux, perhaps there is some other encoding problem present)

drfraser commented 2 years ago

That make sense... No, my problem was a one-off I could not replicate. But at least now it looks like PrefectHQ has a way to reliably recreate a situation that generates the AssertionError problem (i.e. turn off PYTHONUTF8)

zanieb commented 2 years ago

I believe the PYTHONUTF8=1 setting will not reproduce the EOF assertion error but the issue in #6446.

acookin commented 2 years ago

Leaving a comment here for any unfortunate soul who runs into the same issue as me.

I was seeing this AssertionError: feed_data after feed_eof error, as the reporter described, though neither the linked PR or https://github.com/PrefectHQ/prefect/issues/6446 fixed my error.

I was running prefect in a django command (which i do not recommend unless you have to), and this command was running in a container. I was seeing this error, and eventually discovered that the output stream was being wrapped by colorama.ansitowin32.StreamWrapper, but only when the container was not attached to a tty.

I found out django was initializing colorama for me, and the behavior from Django only makes this an issue when not attached to a tty, and uninstalling colorama fixed the issue.

ddelange commented 1 year ago

fwiw I started getting this AssertionError in https://github.com/ddelange/prefect/pull/14 when I removed:

     async with open_process(
         command=[
             "prefect",
             "orion",
             "start",
             "--host",
             "127.0.0.1",
             "--port",
             str(port),
             "--log-level",
             "INFO",
         ],
-        stdout=sys.stdout,
-        stderr=sys.stderr,
         env={**os.environ, **get_current_settings().to_environment_variables()},
     ) as process:
philastrophist commented 1 year ago

colorama

This fixed it all for me too. Thank you

egnor commented 1 year ago

For what it's worth, I can reproduce this very quickly on my machine (Ubuntu/Linux) with this flow (which uses prefect-shell; see https://github.com/PrefectHQ/prefect/issues/13073):

from prefect import flow
from prefect_shell import ShellOperation

@flow
def python_version():
    with ShellOperation(commands=["python3 --version"]) as op:
        process = op.trigger()
        process.wait_for_completion()
        result = process.fetch_result()

    return result

print(python_version())

I suspect everything around colorama and containers and Unicode errors and whatnot are red herrings, maybe tweaking the issue in and out of existence but not directly related. However https://github.com/PrefectHQ/prefect/issues/6335#issuecomment-1404632119 is more directly connected; clearly this issue is related to subprocess pipes, and when stdout=sys.stdout and stderr=sys.stderr, there are no such pipes.

This does seem to be an upstream issue in Python asyncio.

abrookins commented 11 months ago

There seems to be a timing issue that appears when this code is called synchronously but not when it's called async. You can run the same script that Daniel shared, but as coroutines (because Prefect supports either) and you won't get the error.

import asyncio
from prefect import flow
from prefect_shell import ShellOperation

@flow
async def python_version():
    async with ShellOperation(commands=["python3 --version"]) as op:
        process = await op.trigger()
        await process.wait_for_completion()
        result = await process.fetch_result()

    return result

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(python_version())

I captured some debug logs with threading info to check on a threading-related theory I had... and found that I was wrong. I'm out of time for now, but I'll leave the logs for future exploration.

Logs for the synchronous call failure scenario:

15:23:20.779 | DEBUG   | prefect.profiles - Using profile 'prd' | [MainThread]
15:23:21.282 | DEBUG   | prefect.client - Connecting to API at https://api.prefect.cloud/api/accounts/6550e075-ecda-452a-bec1-6b6906696fa5/workspaces/967f4e4d-5129-4e7f-8127-95074e7cc58f/ | [GlobalEventLoopThread]
15:23:22.317 | INFO    | prefect.engine - Created flow run 'fractal-skunk' for flow 'python-version' | [GlobalEventLoopThread]
15:23:22.317 | INFO    | 1b5b85cc-29a4-401d-9f4b-e3daf00fa791 - View at https://app.prefect.cloud/account/6550e075-ecda-452a-bec1-6b6906696fa5/workspace/967f4e4d-5129-4e7f-8127-95074e7cc58f/flow-runs/flow-run/1b5b85cc-29a4-401d-9f4b-e3daf00fa791 - [GlobalEventLoopThread]
15:23:22.318 | DEBUG   | 1b5b85cc-29a4-401d-9f4b-e3daf00fa791 - Starting 'ConcurrentTaskRunner'; submitted tasks will be run concurrently... - [GlobalEventLoopThread]
15:23:22.319 | DEBUG   | prefect.task_runner.concurrent - Starting task runner... | [GlobalEventLoopThread]
15:23:22.325 | DEBUG   | prefect.client - Connecting to API at https://api.prefect.cloud/api/accounts/6550e075-ecda-452a-bec1-6b6906696fa5/workspaces/967f4e4d-5129-4e7f-8127-95074e7cc58f/ | [GlobalEventLoopThread]
15:23:22.824 | DEBUG   | 1b5b85cc-29a4-401d-9f4b-e3daf00fa791 - Executing flow 'python-version' for flow run 'fractal-skunk'... - [GlobalEventLoopThread]
15:23:22.825 | DEBUG   | 1b5b85cc-29a4-401d-9f4b-e3daf00fa791 - Beginning execution... - [GlobalEventLoopThread]
15:23:22.831 | DEBUG   | 1b5b85cc-29a4-401d-9f4b-e3daf00fa791 - Writing the following commands to '/var/folders/4y/8hc40kys4nlbcs0k74x9xvg80000gn/T/prefect-gvz2x0ja.sh':
python3 --version - [MainThread]
15:23:22.846 | INFO    | 1b5b85cc-29a4-401d-9f4b-e3daf00fa791 - PID 57397 triggered with 1 commands running inside the '.' directory. - [MainThread]
15:23:22.861 | ERROR   | asyncio - Exception in callback SubprocessStreamProtocol.pipe_data_received(1, b'Python 3.9.9\n')
handle: <Handle SubprocessStreamProtocol.pipe_data_received(1, b'Python 3.9.9\n')> | [MainThread]
Traceback (most recent call last):
  File "/Users/andrew/.pyenv/versions/3.9.9/lib/python3.9/asyncio/events.py", line 80, in _run
    self._context.run(self._callback, *self._args)
  File "/Users/andrew/.pyenv/versions/3.9.9/lib/python3.9/asyncio/subprocess.py", line 73, in pipe_data_received
    reader.feed_data(data)
  File "/Users/andrew/.pyenv/versions/3.9.9/lib/python3.9/asyncio/streams.py", line 472, in feed_data
    assert not self._eof, 'feed_data after feed_eof'
AssertionError: feed_data after feed_eof
15:23:22.864 | DEBUG   | 1b5b85cc-29a4-401d-9f4b-e3daf00fa791 - Waiting for PID 57397 to complete. - [MainThread]
15:23:22.864 | INFO    | 1b5b85cc-29a4-401d-9f4b-e3daf00fa791 - PID 57397 completed with return code 0. - [MainThread]
15:23:22.866 | INFO    | 1b5b85cc-29a4-401d-9f4b-e3daf00fa791 - Successfully closed all open processes. - [MainThread]
15:23:23.178 | DEBUG   | prefect.task_runner.concurrent - Shutting down task runner... | [GlobalEventLoopThread]
15:23:23.179 | INFO    | 1b5b85cc-29a4-401d-9f4b-e3daf00fa791 - Finished in state Completed() - [GlobalEventLoopThread]

Logs for the asynchronous call success case:

15:26:27.787 | DEBUG   | prefect.profiles - Using profile 'prd' | [MainThread]
15:26:28.254 | DEBUG   | prefect.client - Connecting to API at https://api.prefect.cloud/api/accounts/6550e075-ecda-452a-bec1-6b6906696fa5/workspaces/967f4e4d-5129-4e7f-8127-95074e7cc58f/ | [GlobalEventLoopThread]
15:26:29.076 | INFO    | prefect.engine - Created flow run 'furry-llama' for flow 'python-version' | [GlobalEventLoopThread]
15:26:29.076 | INFO    | 0805f2aa-2b23-4ae2-b0c9-e39937d2c545 - View at https://app.prefect.cloud/account/6550e075-ecda-452a-bec1-6b6906696fa5/workspace/967f4e4d-5129-4e7f-8127-95074e7cc58f/flow-runs/flow-run/0805f2aa-2b23-4ae2-b0c9-e39937d2c545 - [GlobalEventLoopThread]
15:26:29.077 | DEBUG   | 0805f2aa-2b23-4ae2-b0c9-e39937d2c545 - Starting 'ConcurrentTaskRunner'; submitted tasks will be run concurrently... - [GlobalEventLoopThread]
15:26:29.078 | DEBUG   | prefect.task_runner.concurrent - Starting task runner... | [GlobalEventLoopThread]
15:26:29.082 | DEBUG   | prefect.client - Connecting to API at https://api.prefect.cloud/api/accounts/6550e075-ecda-452a-bec1-6b6906696fa5/workspaces/967f4e4d-5129-4e7f-8127-95074e7cc58f/ | [GlobalEventLoopThread]
15:26:29.380 | DEBUG   | 0805f2aa-2b23-4ae2-b0c9-e39937d2c545 - Executing flow 'python-version' for flow run 'furry-llama'... - [GlobalEventLoopThread]
15:26:29.381 | DEBUG   | 0805f2aa-2b23-4ae2-b0c9-e39937d2c545 - Beginning execution... - [GlobalEventLoopThread]
15:26:29.384 | DEBUG   | 0805f2aa-2b23-4ae2-b0c9-e39937d2c545 - Writing the following commands to '/var/folders/4y/8hc40kys4nlbcs0k74x9xvg80000gn/T/prefect-o6lrs1vm.sh':
python3 --version - [MainThread]
15:26:29.393 | INFO    | 0805f2aa-2b23-4ae2-b0c9-e39937d2c545 - PID 57783 triggered with 1 commands running inside the '.' directory. - [MainThread]
15:26:29.395 | DEBUG   | 0805f2aa-2b23-4ae2-b0c9-e39937d2c545 - Waiting for PID 57783 to complete. - [MainThread]
15:26:29.406 | INFO    | 0805f2aa-2b23-4ae2-b0c9-e39937d2c545 - PID 57783 stream output:
Python 3.9.9 - [MainThread]
15:26:29.408 | INFO    | 0805f2aa-2b23-4ae2-b0c9-e39937d2c545 - PID 57783 completed with return code 0. - [MainThread]
15:26:29.409 | INFO    | 0805f2aa-2b23-4ae2-b0c9-e39937d2c545 - Successfully closed all open processes. - [MainThread]
15:26:29.572 | DEBUG   | prefect.task_runner.concurrent - Shutting down task runner... | [GlobalEventLoopThread]
15:26:29.574 | INFO    | 0805f2aa-2b23-4ae2-b0c9-e39937d2c545 - Finished in state Completed() - [GlobalEventLoopThread]
ckeogh-loam commented 4 months ago

FWIW I'm also seeing this issue when using DbtCoreOperation, since it subclasses ShellOperation. @abrookins fix using coroutines works for me.

$ prefect --version
2.17.1