PrefectHQ / prefect

Prefect is a workflow orchestration framework for building resilient data pipelines in Python.
https://prefect.io
Apache License 2.0
15.95k stars 1.57k forks source link

Unexpected 64kB stdout limitation of ShellOperation.run() #13080

Open jorritfolmer opened 1 year ago

jorritfolmer commented 1 year ago

Expectation

It seems that stdout from ShellOperation.run() is somehow limited to 64kB. I haven't been able to find the source of this limitation. Is there some way to influence this limit? Or could this limitation be added to the documentation? Am I holding it wrong?

Below is a reproducer where @flow process_all_files() iterates over 1600 dummy entries fed through the useless use of cat for demonstration purposes:

Reproducer

  1. Generate dummy filelist:

    yes AAAAAAAAAAAABBBBBBBBBBBBBBCCCCCCCCCCCCCCCDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDD | head -1600 > filelist
  2. reproducer.py:

    from prefect import task, flow
    from prefect_shell import ShellOperation
    
    import regex
    import subprocess
    
    def get_file_list(dir):
        output = ShellOperation(
            commands=["cat filelist"],
            env={"dir": dir},
            stream_output=False,
        ).run()
        return output
    
    def get_file_list_v2(dir):
        result = subprocess.run(["cat", "filelist"], stdout=subprocess.PIPE)
        output = result.stdout.decode("utf-8").rstrip().split("\n")
        return output
    
    def get_metadata_from_filepath(filepath):
        m = regex.search(r"([A]+)([B]+)([C]+)([D]+)", filepath)
        try:
            one = m.group(1)
        except (ValueError, IndexError, AttributeError) as e:
            raise
        return one
    
    @task
    def do_something_with_file(i, filepath):
        one = get_metadata_from_filepath(filepath)
        print(i, one)
    
    @flow
    def process_all_files(dir):
        files = get_file_list(dir)
        print(len('\n'.join(files))) # Seems to be intact here
        i = 1
        for filepath in files:
            do_something_with_file(i, filepath) # However after iteration 752 we get a partial filepath
            i = i + 1
    
    if __name__ == "__main__":
        process_all_files("somedir")
  3. python reproducer.py

mahiki commented 1 year ago

This is a severe limitation on my own usage, I'm trying test some scripts without building docker containers for the purpose.

egnor commented 1 year ago

The problem is not a 64K limit, the problem is buggy line-splitting that happens to sometimes fail around 64K chunk sizes.

Here's a much better self-contained repro case:

from prefect import flow
from prefect_shell import ShellOperation

@flow
def yes_flow():
    long_line = "A" * 12 + "B" * 14 + "C" * 15 + "D" * 45
    output_lines = ShellOperation(
        commands=[f"yes {long_line} | head -1600"],
        stream_output=False,
    ).run()

    if len(output_lines) != 1600:
        print(f"Expected 1600 lines of output, got {len(output_lines)}")

    for i, line in enumerate(output_lines):
        if line != long_line:
            print(f"Bad line {i}: {line}")

if __name__ == "__main__":
    yes_flow()

I get this output (amid Prefect logging messages):

Expected 1600 lines of output, got 1602
Bad line 753: AAAAAAAAAAAABBBBBBBBBBBBB
Bad line 754: BCCCCCCCCCCCCCCCDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDD
Bad line 1507: AAAAAAAAAAAABBBBBBBBBBBBBBCCCCCCCCCCCCCCCDDDDDDDDD
Bad line 1508: DDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDD

As you can see, it's not that it's limited to 64K, it's that some extra line breaks are sneaking in, and that's throwing off your code @jorritfolmer. Of course this is still a bug, random extra line breaks are no good! That is because of this code:

async for output in TextReceiveStream(source):
    text = output.rstrip()
    if self._shell_operation.stream_output:
        self.logger.info(f"PID {self.pid} stream output:{os.linesep}{text}")
    self._output.extend(text.split(os.linesep))

That's clearly buggy. TextReceiveStream, like the byte streams they wrap, receive chunks arbitrarily divided, not on line boundaries:

Byte streams (“Streams” in Trio lingo) are objects that receive and/or send chunks of bytes. They are modelled after the limitations of the stream sockets, meaning the boundaries are not respected. In practice this means that if, for example, you call .send(b'hello ') and then .send(b'world'), the other end will receive the data chunked in any arbitrary way, like (b'hello' and b' world'), b'hello world' or (b'hel', b'lo wo', b'rld').

Given a pile of rapid output, it happens to be splitting it into 64K chunks, which do not in general align on line breaks, so independently splitting each chunk like that is not going to work in general. (It will work more often than it deserves to, because programs often "burst" a few lines at a time. And, when it doesn't work, it will show up as occasional weird glitches, so a lot of the times people won't notice or will write it off.)

jorritfolmer commented 1 year ago

Thanks for your clear explanation! The patch below seems to fix this issue by concatenating the chunks as strings before splitting on newline into a list at output.

--- a/commands.py
+++ b/commands.py
@@ -128,7 +128,7 @@
     def __init__(self, shell_operation: "ShellOperation", process: Process):
         self._shell_operation = shell_operation
         self._process = process
-        self._output = []
+        self._output = ""

     @property
     def pid(self) -> int:
@@ -158,7 +158,7 @@
             text = output.rstrip()
             if self._shell_operation.stream_output:
                 self.logger.info(f"PID {self.pid} stream output:{os.linesep}{text}")
-            self._output.extend(text.split(os.linesep))
+            self._output += text

     @sync_compatible
     async def wait_for_completion(self) -> None:
@@ -191,7 +191,7 @@
         """
         if self._process.returncode is None:
             self.logger.info("Process is still running, result may be incomplete.")
-        return self._output
+        return self._output.split(os.linesep)