executablebooks / MyST-NB

Parse and execute ipynb files in Sphinx
https://myst-nb.readthedocs.io
BSD 3-Clause "New" or "Revised" License
211 stars 84 forks source link

Fix incorrect output from prints originating from different processes #604

Closed basnijholt closed 2 months ago

basnijholt commented 5 months ago

In the PipeFunc documentation I have the following problem when executing code with a ProcessPoolExecutor:

image

With this fix it becomes:

image

The root of the issue is that nbconvert --execute produces this output:

  {
   "cell_type": "code",
   "execution_count": 47,
   "id": "92",
   "metadata": {
    "execution": {
     "iopub.execute_input": "2024-05-31T05:42:39.297713Z",
     "iopub.status.busy": "2024-05-31T05:42:39.297474Z",
     "iopub.status.idle": "2024-05-31T05:42:40.477462Z",
     "shell.execute_reply": "2024-05-31T05:42:40.475729Z"
    }
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "2024-05-30 22:42:39.410279 - Running double_it for x=3"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "2024-05-30 22:42:39.408318 - Running double_it for x=0"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "2024-05-30 22:42:39.410888 - Running double_it for x=1"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "2024-05-30 22:42:39.416024 - Running double_it for x=2"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "\n"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "\n"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "\n"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "2024-05-30 22:42:39.431485 - Running half_it for x=0"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "\n"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "2024-05-30 22:42:39.434285 - Running half_it for x=1"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "2024-05-30 22:42:39.433559 - Running half_it for x=2"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "2024-05-30 22:42:39.439223 - Running half_it for x=3"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "\n"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "\n"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "\n"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "\n"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "2024-05-30 22:42:40.459668 - Running take_sum"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "\n"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "14\n"
     ]
    }
   ],
   "source": [
    "from concurrent.futures import ProcessPoolExecutor\n",
    "import datetime\n",
    "import numpy as np\n",
    "import time\n",
    "from pipefunc import Pipeline, pipefunc\n",
    "\n",
    "\n",
    "@pipefunc(output_name=\"double\", mapspec=\"x[i] -> double[i]\")\n",
    "def double_it(x: int) -> int:\n",
    "    print(f\"{datetime.datetime.now()} - Running double_it for x={x}\")\n",
    "    time.sleep(1)\n",
    "    return 2 * x\n",
    "\n",
    "\n",
    "@pipefunc(output_name=\"half\", mapspec=\"x[i] -> half[i]\")\n",
    "def half_it(x: int) -> int:\n",
    "    print(f\"{datetime.datetime.now()} - Running half_it for x={x}\")\n",
    "    time.sleep(1)\n",
    "    return x // 2\n",
    "\n",
    "\n",
    "@pipefunc(output_name=\"sum\")\n",
    "def take_sum(half: np.ndarray, double: np.ndarray) -> int:\n",
    "    print(f\"{datetime.datetime.now()} - Running take_sum\")\n",
    "    return sum(half + double)\n",
    "\n",
    "\n",
    "pipeline = Pipeline([double_it, half_it, take_sum])\n",
    "inputs = {\"x\": [0, 1, 2, 3]}\n",
    "run_folder = \"my_run_folder\"\n",
    "executor = ProcessPoolExecutor(max_workers=8)  # use 8 processes\n",
    "results = pipeline.map(\n",
    "    inputs,\n",
    "    run_folder=run_folder,\n",
    "    parallel=True,\n",
    "    executor=executor,\n",
    "    storage=\"shared_memory_dict\",\n",
    ")\n",
    "print(results[\"sum\"].output)"
   ]
  },
welcome[bot] commented 5 months ago

Thanks for submitting your first pull request! You are awesome! :hugs:
If you haven't done so already, check out EBP's Code of Conduct and our Contributing Guide, as this will greatly help the review process.
Welcome to the EBP community! :tada:

basnijholt commented 5 months ago

@agoose77 the failing CI check is unrelated to these changes

basnijholt commented 2 months ago

@agoose77, friendly ping. Could you take a look at this?

bsipocz commented 2 months ago

I have a very similar problem in one of my repos, but this fix doesn't seem to work, I still get the fragmented output in the rendered HTML.

basnijholt commented 2 months ago

@bsipocz, did you set nb_merge_streams = True?

bsipocz commented 2 months ago

@bsipocz, did you set nb_merge_streams = True?

Yeap, I didn't have that, but discovered the option by following the link to your https://github.com/pipefunc/pipefunc/pull/125 PR. So thank you.

So now my issue is fixed even without this PR.

While this PR does pass all tests, and doesn't seem to break anything, I suppose it would be nice to add your failing case to the tests, too.

basnijholt commented 2 months ago

I added a test. Without the changes here the test will fail:

________________________________________________ test_merge_streams_parallel ________________________________________________

sphinx_run = <conftest.SphinxFixture object at 0x7e810491f9b0>
file_regression = <conftest.FileRegression object at 0x7e80b65fe3f0>

    @pytest.mark.sphinx_params(
        "merge_streams_parallel.ipynb",
        conf={"nb_execution_mode": "off", "nb_merge_streams": True},
    )
    def test_merge_streams_parallel(sphinx_run, file_regression):
        """Test configuring multiple concurrent stdout/stderr outputs to be merged."""
        sphinx_run.build()
        assert sphinx_run.warnings() == ""
        doctree = sphinx_run.get_resolved_doctree("merge_streams_parallel")
>       file_regression.check(doctree.pformat(), extension=".xml", encoding="utf-8")

/home/bas.nijholt/repos/MyST-NB/tests/test_render_outputs.py:116:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <conftest.FileRegression object at 0x7e80b65fe3f0>
data = '<document source="merge_streams_parallel" translation_progress="{\'total\': 0, \'translated\': 0}">\n    <container c...        \n                \n                \n                \n                \n                \n                \n'
kwargs = {'encoding': 'utf-8', 'extension': '.xml'}

    def check(self, data, **kwargs):
>       return self.file_regression.check(self._strip_ignores(data), **kwargs)
E       AssertionError: FILES DIFFER:
E       /home/bas.nijholt/.tmp/pytest-of-bas.nijholt/pytest-49/test_merge_streams_parallel0/test_render_outputs/test_merge_streams_parallel.xml
E       /home/bas.nijholt/.tmp/pytest-of-bas.nijholt/pytest-49/test_merge_streams_parallel0/test_render_outputs/test_merge_streams_parallel.obtained.xml
E       HTML DIFF: /home/bas.nijholt/.tmp/pytest-of-bas.nijholt/pytest-49/test_merge_streams_parallel0/test_render_outputs/test_merge_streams_parallel.obtained.diff.html
E       ---
E       +++
E       @@ -9,13 +9,13 @@
E                                pass
E                <container classes="cell_output" nb_element="cell_code_output">
E                    <literal_block classes="output stream" language="myst-ansi" linenos="False" xml:space="preserve">
E       +                000000000
E                        0
E       -                0
E       -                0
E       -                0
E       -                0
E       -                0
E       -                0
E       -                0
E       -                0
E       -                0
E       +
E       +
E       +
E       +
E       +
E       +
E       +
E       +

The notebook is executed via jupyter nbconvert --execute merge_streams_parallel.ipynb --to ipynb and I committed the executed ipynb.