neondatabase / neon

Neon: Serverless Postgres. We separated storage and compute to offer autoscaling, code-like database branching, and scale to zero.
https://neon.tech
Apache License 2.0
14.87k stars 435 forks source link

Add an async equivalent of subprocess_capture #9473

Open tristan957 opened 1 week ago

tristan957 commented 1 week ago

subprocess_capture runs a command to completion synchronously. At times, however, we may need to spawn a server in order to communicate with it.

tristan957 commented 1 week ago

For posterity, I tried to do this myself but hit a road bump with the way we collect logs and generate output file paths:

diff --git a/test_runner/fixtures/utils.py b/test_runner/fixtures/utils.py
index 7ca6b3dd1..ce8e3280c 100644
--- a/test_runner/fixtures/utils.py
+++ b/test_runner/fixtures/utils.py
@@ -9,7 +9,6 @@
 import tarfile
 import threading
 import time
-from collections.abc import Iterable
 from hashlib import sha256
 from pathlib import Path
 from typing import TYPE_CHECKING, Any, Callable, TypeVar
@@ -27,7 +26,7 @@
 )

 if TYPE_CHECKING:
-    from collections.abc import Iterable
+    from collections.abc import Iterable, Iterator
     from typing import IO, Optional

     from fixtures.common_types import TimelineId
@@ -56,6 +55,7 @@
 # fmt: on

+@contextlib.contextmanager
 def subprocess_capture(
     capture_dir: Path,
     cmd: list[str],
@@ -64,10 +64,9 @@ def subprocess_capture(
     echo_stderr: bool = False,
     echo_stdout: bool = False,
     capture_stdout: bool = False,
-    timeout: Optional[float] = None,
     with_command_header: bool = True,
     **popen_kwargs: Any,
-) -> tuple[str, Optional[str], int]:
+) -> Iterator[subprocess.Popen[str]]:
     """Run a process and bifurcate its output to files and the `log` logger

     stderr and stdout are always captured in files.  They are also optionally
@@ -129,40 +128,60 @@ def run(self):

     captured = None
     try:
-        with open(stdout_filename, "wb") as stdout_f:
-            with open(stderr_filename, "wb") as stderr_f:
-                log.info(f'Capturing stdout to "{base}.stdout" and stderr to "{base}.stderr"')
+        with open(stdout_filename, "wb") as stdout_f, open(stderr_filename, "wb") as stderr_f:
+            log.info(f'Capturing stdout to "{base}.stdout" and stderr to "{base}.stderr"')

-                p = subprocess.Popen(
-                    cmd,
-                    **popen_kwargs,
-                    stdout=subprocess.PIPE,
-                    stderr=subprocess.PIPE,
-                )
-                stdout_handler = OutputHandler(
-                    p.stdout, stdout_f, echo=echo_stdout, capture=capture_stdout
-                )
-                stdout_handler.start()
-                stderr_handler = OutputHandler(p.stderr, stderr_f, echo=echo_stderr, capture=False)
-                stderr_handler.start()
-
-                r = p.wait(timeout=timeout)
+            p = subprocess.Popen(
+                cmd,
+                **popen_kwargs,
+                stdout=subprocess.PIPE,
+                stderr=subprocess.PIPE,
+            )
+            stdout_handler = OutputHandler(
+                p.stdout, stdout_f, echo=echo_stdout, capture=capture_stdout
+            )
+            stdout_handler.start()
+            stderr_handler = OutputHandler(p.stderr, stderr_f, echo=echo_stderr, capture=False)
+            stderr_handler.start()

+            try:
+                yield p
+            finally:
                 stdout_handler.join()
                 stderr_handler.join()

-                if check and r != 0:
-                    raise subprocess.CalledProcessError(r, " ".join(cmd))
-
-                if capture_stdout:
-                    captured = stdout_handler.captured
+            if check and p.returncode != 0:
+                raise subprocess.CalledProcessError(p.returncode, " ".join(cmd))
     finally:
         # Remove empty files if there is no output
         for filename in (stdout_filename, stderr_filename):
             if os.stat(filename).st_size == 0:
                 os.remove(filename)

-    return (basepath, captured, r)
+
+def subprocess_capture_wait(
+    capture_dir: Path,
+    cmd: list[str],
+    *,
+    check: bool = False,
+    echo_stderr: bool = False,
+    echo_stdout: bool = False,
+    capture_stdout: bool = False,
+    timeout: Optional[float] = None,
+    with_command_header: bool = True,
+) -> tuple[Optional[IO[str]], int]:
+    with subprocess_capture(
+        capture_dir,
+        cmd,
+        check=check,
+        echo_stderr=echo_stderr,
+        echo_stdout=echo_stdout,
+        capture_stdout=capture_stdout,
+        with_command_header=with_command_header,
+    ) as p:
+        p.wait(timeout=timeout)
+
+        return p.stdout, p.returncode

 _global_counter = 0
diff --git a/test_runner/regress/test_pg_waldump.py b/test_runner/regress/test_pg_waldump.py
index c98d39545..2f1ab21e8 100644
--- a/test_runner/regress/test_pg_waldump.py
+++ b/test_runner/regress/test_pg_waldump.py
@@ -1,5 +1,3 @@
-from __future__ import annotations
-
 import os
 import shutil