pytest-dev / execnet

distributed Python deployment and communication
https://execnet.readthedocs.io
MIT License
80 stars 42 forks source link

couldnt load message header, expected 9 bytes, got 0 #143

Closed tamaskakuszi closed 3 years ago

tamaskakuszi commented 3 years ago

Hey.

I have the issue above mentioned in the title. I'm using pytest 6.2.4 and xdist 2.3.0. Using kubernetes, a linux agent with debian 10 in jenkins. Trying to run pytest tests in parallel and getting this error when I debugging the issue. I did some logs into the files, activated EXECNET_DEBUG=2, I'm trying with the following message pytest -m xxx -n 2 --max-worker-restart=2 (to avoid the maximum 28 worker limit).

Can you please at least suggest how to solve this issue? Thank you

`platform linux -- Python 3.7.3, pytest-6.2.4, py-1.10.0, pluggy-0.13.1 rootdir: /MyNameNecklace, configfile: pytest.ini plugins: forked-1.3.0, xdist-2.3.0, allure-pytest-2.7.1 gw0 C / gw1 IDATA: b'N\x00\x00!\x8f"""\n This module is executed in remote subprocesses and helps to\n control a remote testing session and relay back information.\n It assumes that \'py\' is importable and does not have dependencies\n on the rest of the xdist code. This means that the xdist-plugin\n needs not to be installed in remote environments.\n"""\n\nimport sys\nimport os\nimport time\n\nimport py\nimport pytest\nfrom execnet.gateway_base import dumps, DumpError\n\nfrom _pytest.config import _prepareconfig, Config\n\n\nclass WorkerInteractor:\n def init(self, config, channel):\n self.config = config\n self.workerid = config.workerinput.get("workerid", "?")\n self.testrunuid = config.workerinput["testrunuid"]\n self.log = py.log.Producer("worker-%s" % self.workerid)\n if not config.option.debug:\n py.log.setconsumer(self.log._keywords, None)\n self.channel = channel\n config.pluginmanager.register(self)\n\n def sendevent(self, name, **kwargs):\n self.log("sending", name, kwargs)\n self.channel.send((name, kwargs))\n\n def pytest_internalerror(self, excrepr):\n formatted_error = str(excrepr)\n for line in formatted_error.split("\n"):\n self.log("IERROR>", line)\n interactor.sendevent("internal_error", formatted_error=formatted_error)\n\n def pytest_sessionstart(self, session):\n self.session = session\n workerinfo = getinfodict()\n self.sendevent("workerready", workerinfo=workerinfo)\n\n @pytest.hookimpl(hookwrapper=True)\n def pytest_sessionfinish(self, exitstatus):\n

in pytest 5.0+, exitstatus is an IntEnum object\n self.config.workeroutput["exitstatus"] = int(exitstatus)\n yield\n self.sendevent("workerfinished", workeroutput=self.config.workeroutput)\n\n def pytest_collection(self, session):\n self.sendevent("collectionstart")\n\n def pytest_runtestloop(self, session):\n self.log("entering main loop")\n torun = []\n while 1:\n try:\n name, kwargs = self.channel.receive()\n except EOFError:\n return True\n self.log("received command", name, kwargs)\n if name == "runtests":\n torun.extend(kwargs["indices"])\n

elif name == "runtests_all":\n torun.extend(range(len(session.items)))\n self.log("items to run:", torun)\n # only run if we have an item and a next item\n while len(torun) >= 2:\n self.run_one_test(torun)\n if name == "shutdown":\n if torun:\n self.run_one_test(torun)\n break\n return True\n\n def run_one_test(self, torun):\n items = self.session.items\n self.item_index = torun.pop(0)\n item = items[self.item_index]\n if torun:\n nextitem = items[torun[0]]\n else:\n nextitem = None\n\n start = time.time()\n self.config.hook.pytest_runtest_protocol(item=item, nextitem=nextitem)\n duration = time.time() - start\n self.sendevent(\n "runtest_protocol_complete", item_index=self.item_index, duration=duration\n )\n\n def pytest_collection_finish(self, session):\n try:\n topdir = str(self.config.rootpath)\n except AttributeError: # pytest <= 6.1.0\n topdir = str(self.config.rootdir)\n\n self.sendevent(\n "collectionfinish",\n topdir=topdir,\n ids=[item.nodeid for item in session.items],\n )\n\n def pytest_runtest_logstart(self, nodeid, location):\n self.sendevent("logstart", nodeid=nodeid, location=location)\n\n def pytest_runtest_logfinish(self, nodeid, location):\n self.sendevent("logfinish", nodeid=nodeid, location=location)\n\n def pytest_runtest_logreport(self, report):\n data = self.config.hook.pytest_report_to_serializable(\n config=self.config, report=report\n )\n data["item_index"] = self.item_index\n data["worker_id"] = self.workerid\n data["testrun_uid"] = self.testrunuid\n assert self.session.items[self.item_index].nodeid == report.nodeid\n self.sendevent("testreport", data=data)\n\n def pytest_collectreport(self, report):\n # send only reports that have not passed to controller as optimization (#330)\n if not report.passed:\n data = self.config.hook.pytest_report_to_serializable(\n config=self.config, report=report\n )\n self.sendevent("collectreport", data=data)\n\n def pytest_warning_recorded(self, warning_message, when, nodeid, location):\n self.sendevent(\n "warning_recorded",\n warning_message_data=serialize_warning_message(warning_message),\n when=when,\n nodeid=nodeid,\n location=location,\n )\n\n\ndef serialize_warning_message(warning_message):\n if isinstance(warning_message.message, Warning):\n message_module = type(warning_message.message).module\n message_class_name = type(warning_message.message).name\n message_str = str(warning_message.message)\n # check now if we can serialize the warning arguments (#349)\n # if not, we will just use the exception message on the controller node\n try:\n dumps(warning_message.message.args)\n except DumpError:\n message_args = None\n else:\n message_args = warning_message.message.args\n else:\n message_str = warning_message.message\n message_module = None\n message_class_name = None\n message_args = None\n if warning_message.category:\n category_module = warning_message.category.module\n category_class_name = warning_message.category.name\n else:\n category_module = None\n category_class_name = None\n\n result = {\n "message_str": message_str,\n "message_module": message_module,\n "message_class_name": message_class_name,\n "message_args": message_args,\n "category_module": category_module,\n "category_class_name": category_class_name,\n }\n # access private _WARNING_DETAILS because the attributes vary between Python versions\n for attr_name in warning_message._WARNING_DETAILS:\n if attr_name in ("message", "category"):\n continue\n attr = getattr(warning_message, attr_name)\n # Check if we can serialize the warning detail, marking None otherwise\n # Note that we need to define the attr (even as None) to allow deserializing\n try:\n dumps(attr)\n except DumpError:\n result[attr_name] = repr(attr)\n else:\n result[attr_name] = attr\n return result\n\n\ndef getinfodict():\n import platform\n\n return dict(\n version=sys.version,\n version_info=tuple(sys.version_info),\n sysplatform=sys.platform,\n platform=platform.platform(),\n executable=sys.executable,\n cwd=os.getcwd(),\n )\n\n\ndef remote_initconfig(option_dict, args):\n option_dict["plugins"].append("no:terminal")\n return Config.fromdictargs(option_dict, args)\n\n\ndef setup_config(config, basetemp):\n config.option.looponfail = False\n config.option.usepdb = False\n config.option.dist = "no"\n config.option.distload = False\n config.option.numprocesses = None\n config.option.maxprocesses = None\n config.option.basetemp = basetemp\n\n\nif name == "channelexec":\n channel = channel # noqa\n workerinput, args, option_dict, change_sys_path = channel.receive()\n\n if change_sys_path is None:\n importpath = os.getcwd()\n sys.path.insert(0, importpath)\n os.environ["PYTHONPATH"] = (\n importpath + os.pathsep + os.environ.get("PYTHONPATH", "")\n )\n else:\n sys.path = change_sys_path\n\n os.environ["PYTEST_XDIST_TESTRUNUID"] = workerinput["testrunuid"]\n os.environ["PYTEST_XDIST_WORKER"] = workerinput["workerid"]\n os.environ["PYTEST_XDIST_WORKER_COUNT"] = str(workerinput["workercount"])\n\n if hasattr(Config, "InvocationParams"):\n config = _prepareconfig(args, None)\n else:\n config = remote_initconfig(option_dict, args)\n config.args = args\n\n setup_config(config, option_dict.get("basetemp"))\n config._parser.prog = os.path.basename(workerinput["mainargv"][0])\n config.workerinput = workerinput\n config.workeroutput = {}\n interactor = WorkerInteractor(config, channel)\n config.hook.pytest_cmdline_main(config=config)\nN\x00\x00\x006/usr/local/lib/python3.7/dist-packages/xdist/remote.pyLJ@\x00\x00\x00\x04Q' SETUP SEND --------------- DATA: b"JN\x00\x00\x00\x08workeridN\x00\x00\x00\x03gw0PN\x00\x00\x00\x0bworkercountF\x00\x00\x00\x02PN\x00\x00\x00\ntestrunuidN\x00\x00\x00 4aac3410757c43ac89115a8a092c4d36PN\x00\x00\x00\x08mainargvK\x00\x00\x00\x06F\x00\x00\x00\x00N\x00\x00\x00\x15/usr/local/bin/pytestPF\x00\x00\x00\x01N\x00\x00\x00\x02-mPF\x00\x00\x00\x02N\x00\x00\x00\trun_smokePF\x00\x00\x00\x03N\x00\x00\x00\x02-nPF\x00\x00\x00\x04N\x00\x00\x00\x012PF\x00\x00\x00\x05N\x00\x00\x00\x16--max-worker-restart=2PPK\x00\x00\x00\x05F\x00\x00\x00\x00N\x00\x00\x00\x02-mPF\x00\x00\x00\x01N\x00\x00\x00\trun_smokePF\x00\x00\x00\x02N\x00\x00\x00\x02-nPF\x00\x00\x00\x03N\x00\x00\x00\x012PF\x00\x00\x00\x04N\x00\x00\x00\x16--max-worker-restart=2PJN\x00\x00\x00\x08basetempN\x00\x00\x00'/tmp/pytest-of-root/pytest-11/popen-gw0PK\x00\x00\x00\x08F\x00\x00\x00\x00N\x00\x00\x00\x0e/usr/local/binPF\x00\x00\x00\x01N\x00\x00\x00\x15/usr/lib/python37.zipPF\x00\x00\x00\x02N\x00\x00\x00\x12/usr/lib/python3.7PF\x00\x00\x00\x03N\x00\x00\x00\x1e/usr/lib/python3.7/lib-dynloadPF\x00\x00\x00\x04N\x00\x00\x00&/usr/local/lib/python3.7/dist-packagesPF\x00\x00\x00\x05N\x00\x00\x00L/usr/local/lib/python3.7/dist-packages/allure_python_commons-2.7.1-py3.7.eggPF\x00\x00\x00\x06N\x00\x00\x00D/usr/local/lib/python3.7/dist-packages/allure_pytest-2.7.1-py3.7.eggPF\x00\x00\x00\x07N\x00\x00\x00\x1e/usr/lib/python3/dist-packagesP@\x00\x00\x00\x04Q" SETUP SENT --------------- [71] creating workergateway on <execnet.gateway_base.Popen2IO object at 0x7f86908f3e10> [71] gw1-worker [serve] spawning receiver thread gw0 C / gw1 C[71] gw1-worker [serve] integrating as primary thread [71] gw1-worker [receiver-thread] RECEIVERTHREAD: starting to run DATA: b'N\x00\x00!\x8f"""\n This module is executed in remote subprocesses and helps to\n control a remote testing session and relay back information.\n It assumes that \'py\' is importable and does not have dependencies\n on the rest of the xdist code. This means that the xdist-plugin\n needs not to be installed in remote environments.\n"""\n\nimport sys\nimport os\nimport time\n\nimport py\nimport pytest\nfrom execnet.gateway_base import dumps, DumpError\n\nfrom _pytest.config import _prepareconfig, Config\n\n\nclass WorkerInteractor:\n def init(self, config, channel):\n self.config = config\n self.workerid = config.workerinput.get("workerid", "?")\n self.testrunuid = config.workerinput["testrunuid"]\n self.log = py.log.Producer("worker-%s" % self.workerid)\n if not config.option.debug:\n py.log.setconsumer(self.log._keywords, None)\n self.channel = channel\n config.pluginmanager.register(self)\n\n def sendevent(self, name, **kwargs):\n self.log("sending", name, kwargs)\n self.channel.send((name, kwargs))\n\n def pytest_internalerror(self, excrepr):\n formatted_error = str(excrepr)\n for line in formatted_error.split("\n"):\n self.log("IERROR>", line)\n interactor.sendevent("internal_error", formatted_error=formatted_error)\n\n def pytest_sessionstart(self, session):\n self.session = session\n workerinfo = getinfodict()\n self.sendevent("workerready", workerinfo=workerinfo)\n\n @pytest.hookimpl(hookwrapper=True)\n def pytest_sessionfinish(self, exitstatus):\n # in pytest 5.0+, exitstatus is an IntEnum object\n self.config.workeroutput["exitstatus"] = int(exitstatus)\n yield\n self.sendevent("workerfinished", workeroutput=self.config.workeroutput)\n\n def pytest_collection(self, session):\n self.sendevent("collectionstart")\n\n def pytest_runtestloop(self, session):\n self.log("entering main loop")\n torun = []\n while 1:\n try:\n name, kwargs = self.channel.receive()\n except EOFError:\n return True\n self.log("received command", name, kwargs)\n if name == "runtests":\n torun.extend(kwargs["indices"])\n elif name == "runtests_all":\n torun.extend(range(len(session.items)))\n self.log("items to run:", torun)\n # only run if we have an item and a next item\n while len(torun) >= 2:\n self.run_one_test(torun)\n if name == "shutdown":\n if torun:\n self.run_one_test(torun)\n break\n return True\n\n def run_one_test(self, torun):\n items = self.session.items\n self.item_index = torun.pop(0)\n item = items[self.item_index]\n if torun:\n nextitem = items[torun[0]]\n else:\n nextitem = None\n\n start = time.time()\n self.config.hook.pytest_runtest_protocol(item=item, nextitem=nextitem)\n duration = time.time() - start\n self.sendevent(\n "runtest_protocol_complete", item_index=self.item_index, duration=duration\n )\n\n def pytest_collection_finish(self, session):\n try:\n topdir = str(self.config.rootpath)\n except AttributeError: # pytest <= 6.1.0\n topdir = str(self.config.rootdir)\n\n self.sendevent(\n "collectionfinish",\n topdir=topdir,\n ids=[item.nodeid for item in session.items],\n )\n\n def pytest_runtest_logstart(self, nodeid, location):\n self.sendevent("logstart", nodeid=nodeid, location=location)\n\n def pytest_runtest_logfinish(self, nodeid, location):\n self.sendevent("logfinish", nodeid=nodeid, location=location)\n\n def pytest_runtest_logreport(self, report):\n data = self.config.hook.pytest_report_to_serializable(\n config=self.config, report=report\n )\n data["item_index"] = self.item_index\n data["worker_id"] = self.workerid\n data["testrun_uid"] = self.testrunuid\n assert self.session.items[self.item_index].nodeid == report.nodeid\n self.sendevent("testreport", data=data)\n\n def pytest_collectreport(self, report):\n # send only reports that have not passed to controller as optimization (#330)\n if not report.passed:\n data = self.config.hook.pytest_report_to_serializable(\n config=self.config, report=report\n )\n self.sendevent("collectreport", data=data)\n\n def pytest_warning_recorded(self, warning_message, when, nodeid, location):\n self.sendevent(\n "warning_recorded",\n warning_message_data=serialize_warning_message(warning_message),\n when=when,\n nodeid=nodeid,\n location=location,\n )\n\n\ndef serialize_warning_message(warning_message):\n if isinstance(warning_message.message, Warning):\n message_module = type(warning_message.message).module\n message_class_name = type(warning_message.message).name\n message_str = str(warning_message.message)\n # check now if we can serialize the warning arguments (#349)\n # if not, we will just use the exception message on the controller node\n try:\n dumps(warning_message.message.args)\n except DumpError:\n message_args = None\n else:\n message_args = warning_message.message.args\n else:\n message_str = warning_message.message\n message_module = None\n message_class_name = None\n message_args = None\n if warning_message.category:\n category_module = warning_message.category.module\n category_class_name = warning_message.category.name\n else:\n category_module = None\n category_class_name = None\n\n result = {\n "message_str": message_str,\n "message_module": message_module,\n "message_class_name": message_class_name,\n "message_args": message_args,\n "category_module": category_module,\n "category_class_name": category_class_name,\n }\n # access private _WARNING_DETAILS because the attributes vary between Python versions\n for attr_name in warning_message._WARNING_DETAILS:\n if attr_name in ("message", "category"):\n continue\n attr = getattr(warning_message, attr_name)\n

Check if we can serialize the warning detail, marking None otherwise\n # Note that we need to define the attr (even as None) to allow deserializing\n try:\n dumps(attr)\n except DumpError:\n

result[attr_name] = repr(attr)\n else:\n result[attr_name] = attr\n return result\n\n\ndef getinfodict():\n import platform\n\n return dict(\n version=sys.version,\n version_info=tuple(sys.version_info),\n sysplatform=sys.platform,\n platform=platform.platform(),\n executable=sys.executable,\n cwd=os.getcwd(),\n )\n\n\ndef remote_initconfig(option_dict, args):\n option_dict["plugins"].append("no:terminal")\n return Config.fromdictargs(option_dict, args)\n\n\ndef setup_config(config, basetemp):\n config.option.looponfail = False\n config.option.usepdb = False\n config.option.dist = "no"\n config.option.distload = False\n config.option.numprocesses = None\n config.option.maxprocesses = None\n config.option.basetemp = basetemp\n\n\nif name == "channelexec":\n channel = channel # noqa\n workerinput, args, option_dict, change_sys_path = channel.receive()\n\n if change_sys_path is None:\n importpath = os.getcwd()\n sys.path.insert(0, importpath)\n os.environ["PYTHONPATH"] = (\n importpath + os.pathsep + os.environ.get("PYTHONPATH", "")\n )\n else:\n sys.path = change_sys_path\n\n os.environ["PYTEST_XDIST_TESTRUNUID"] = workerinput["testrunuid"]\n os.environ["PYTEST_XDIST_WORKER"] = workerinput["workerid"]\n os.environ["PYTEST_XDIST_WORKER_COUNT"] = str(workerinput["workercount"])\n\n if hasattr(Config, "InvocationParams"):\n config = _prepareconfig(args, None)\n else:\n config = remote_initconfig(option_dict, args)\n config.args = args\n\n setup_config(config, option_dict.get("basetemp"))\n config._parser.prog = os.path.basename(workerinput["mainargv"][0])\n config.workerinput = workerinput\n config.workeroutput = {}\n interactor = WorkerInteractor(config, channel)\n config.hook.pytest_cmdline_main(config=config)\nN\x00\x00\x006/usr/local/lib/python3.7/dist-packages/xdist/remote.pyLJ@\x00\x00\x00\x04Q' SETUP SEND --------------- [71] gw1-worker [receiver-thread] received DATA: b"JN\x00\x00\x00\x08workeridN\x00\x00\x00\x03gw1PN\x00\x00\x00\x0bworkercountF\x00\x00\x00\x02PN\x00\x00\x00\ntestrunuidN\x00\x00\x00 4aac3410757c43ac89115a8a092c4d36PN\x00\x00\x00\x08mainargvK\x00\x00\x00\x06F\x00\x00\x00\x00N\x00\x00\x00\x15/usr/local/bin/pytestPF\x00\x00\x00\x01N\x00\x00\x00\x02-mPF\x00\x00\x00\x02N\x00\x00\x00\trun_smokePF\x00\x00\x00\x03N\x00\x00\x00\x02-nPF\x00\x00\x00\x04N\x00\x00\x00\x012PF\x00\x00\x00\x05N\x00\x00\x00\x16--max-worker-restart=2PPK\x00\x00\x00\x05F\x00\x00\x00\x00N\x00\x00\x00\x02-mPF\x00\x00\x00\x01N\x00\x00\x00\trun_smokePF\x00\x00\x00\x02N\x00\x00\x00\x02-nPF\x00\x00\x00\x03N\x00\x00\x00\x012PF\x00\x00\x00\x04N\x00\x00\x00\x16--max-worker-restart=2PJN\x00\x00\x00\x08basetempN\x00\x00\x00'/tmp/pytest-of-root/pytest-11/popen-gw1PK\x00\x00\x00\x08F\x00\x00\x00\x00N\x00\x00\x00\x0e/usr/local/binPF\x00\x00\x00\x01N\x00\x00\x00\x15/usr/lib/python37.zipPF\x00\x00\x00\x02N\x00\x00\x00\x12/usr/lib/python3.7PF\x00\x00\x00\x03N\x00\x00\x00\x1e/usr/lib/python3.7/lib-dynloadPF\x00\x00\x00\x04N\x00\x00\x00&/usr/local/lib/python3.7/dist-packagesPF\x00\x00\x00\x05N\x00\x00\x00L/usr/local/lib/python3.7/dist-packages/allure_python_commons-2.7.1-py3.7.eggPF\x00\x00\x00\x06N\x00\x00\x00D/usr/local/lib/python3.7/dist-packages/allure_pytest-2.7.1-py3.7.eggPF\x00\x00\x00\x07N\x00\x00\x00\x1e/usr/lib/python3/dist-packagesP@\x00\x00\x00\x04Q" SETUP SENT --------------- [71] gw1-worker [receiver-thread] received [71] gw1-worker execution starts[1]: '"""\n This module is executed in remote subpro [gw0] Python 3.7.3 (default, Jan 22 2021, 20:04:44) -- [GCC 8.3.0] gw0 ok / gw1 C[71] gw1-worker sent [gw1] Python 3.7.3 (default, Jan 22 2021, 20:04:44) -- [GCC 8.3.0] gw0 ok / gw1 ok[71] gw1-worker sent [gw0] node down: Not properly terminated, couldnt load message header, expected 9 bytes, got 0

replacing crashed worker gw0 [118] creating workergateway on <execnet.gateway_base.Popen2IO object at 0x7f500916ae48> [118] gw2-worker [serve] spawning receiver thread [118] gw2-worker [serve] integrating as primary thread [118] gw2-worker [receiver-thread] RECEIVERTHREAD: starting to run DATA: b'N\x00\x00\x01\xb7\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\ndef rinfo_source(channel):\n import sys\n import os\n\n channel.send(\n dict(\n executable=sys.executable,\n version_info=sys.version_info[:5],\n platform=sys.platform,\n cwd=os.getcwd(),\n pid=os.getpid(),\n )\n )\nN\x00\x00\x009/usr/local/lib/python3.7/dist-packages/execnet/gateway.pyN\x00\x00\x00\x0crinfo_sourceJ@\x00\x00\x00\x04Q' [118] gw2-worker [receiver-thread] received

[118] gw2-worker calling rinfo_source( {}) [118] gw2-worker sent [118] gw2-worker execution finished [118] gw2-worker sent [118] gw2-worker 1 sent channel close message [gw2] linux Python 3.7.3 cwd: /MyNameNecklace/Tests gw2 C / gw1 okDATA: b'N\x00\x00!\x8f"""\n This module is executed in remote subprocesses and helps to\n control a remote testing session and relay back information.\n It assumes that \'py\' is importable and does not have dependencies\n on the rest of the xdist code. This means that the xdist-plugin\n needs not to be installed in remote environments.\n"""\n\nimport sys\nimport os\nimport time\n\nimport py\nimport pytest\nfrom execnet.gateway_base import dumps, DumpError\n\nfrom _pytest.config import _prepareconfig, Config\n\n\nclass WorkerInteractor:\n def init(self, config, channel):\n self.config = config\n self.workerid = config.workerinput.get("workerid", "?")\n self.testrunuid = config.workerinput["testrunuid"]\n self.log = py.log.Producer("worker-%s" % self.workerid)\n if not config.option.debug:\n py.log.setconsumer(self.log._keywords, None)\n self.channel = channel\n config.pluginmanager.register(self)\n\n def sendevent(self, name, kwargs):\n self.log("sending", name, kwargs)\n self.channel.send((name, kwargs))\n\n def pytest_internalerror(self, excrepr):\n formatted_error = str(excrepr)\n for line in formatted_error.split("\n"):\n self.log("IERROR>", line)\n interactor.sendevent("internal_error", formatted_error=formatted_error)\n\n def pytest_sessionstart(self, session):\n self.session = session\n workerinfo = getinfodict()\n self.sendevent("workerready", workerinfo=workerinfo)\n\n @pytest.hookimpl(hookwrapper=True)\n def pytest_sessionfinish(self, exitstatus):\n

in pytest 5.0+, exitstatus is an IntEnum object\n self.config.workeroutput["exitstatus"] = int(exitstatus)\n yield\n self.sendevent("workerfinished", workeroutput=self.config.workeroutput)\n\n def pytest_collection(self, session):\n self.sendevent("collectionstart")\n\n def pytest_runtestloop(self, session):\n self.log("entering main loop")\n torun = []\n while 1:\n try:\n name, kwargs = self.channel.receive()\n except EOFError:\n return True\n self.log("received command", name, kwargs)\n if name == "runtests":\n torun.extend(kwargs["indices"])\n

elif name == "runtests_all":\n torun.extend(range(len(session.items)))\n self.log("items to run:", torun)\n # only run if we have an item and a next item\n while len(torun) >= 2:\n self.run_one_test(torun)\n if name == "shutdown":\n if torun:\n self.run_one_test(torun)\n break\n return True\n\n def run_one_test(self, torun):\n items = self.session.items\n self.item_index = torun.pop(0)\n item = items[self.item_index]\n if torun:\n nextitem = items[torun[0]]\n else:\n nextitem = None\n\n start = time.time()\n self.config.hook.pytest_runtest_protocol(item=item, nextitem=nextitem)\n duration = time.time() - start\n self.sendevent(\n "runtest_protocol_complete", item_index=self.item_index, duration=duration\n )\n\n def pytest_collection_finish(self, session):\n try:\n topdir = str(self.config.rootpath)\n except AttributeError: # pytest <= 6.1.0\n topdir = str(self.config.rootdir)\n\n self.sendevent(\n "collectionfinish",\n topdir=topdir,\n ids=[item.nodeid for item in session.items],\n )\n\n def pytest_runtest_logstart(self, nodeid, location):\n self.sendevent("logstart", nodeid=nodeid, location=location)\n\n def pytest_runtest_logfinish(self, nodeid, location):\n self.sendevent("logfinish", nodeid=nodeid, location=location)\n\n def pytest_runtest_logreport(self, report):\n data = self.config.hook.pytest_report_to_serializable(\n config=self.config, report=report\n )\n data["item_index"] = self.item_index\n data["worker_id"] = self.workerid\n data["testrun_uid"] = self.testrunuid\n assert self.session.items[self.item_index].nodeid == report.nodeid\n self.sendevent("testreport", data=data)\n\n def pytest_collectreport(self, report):\n # send only reports that have not passed to controller as optimization (#330)\n if not report.passed:\n data = self.config.hook.pytest_report_to_serializable(\n config=self.config, report=report\n )\n self.sendevent("collectreport", data=data)\n\n def pytest_warning_recorded(self, warning_message, when, nodeid, location):\n self.sendevent(\n "warning_recorded",\n warning_message_data=serialize_warning_message(warning_message),\n when=when,\n nodeid=nodeid,\n location=location,\n )\n\n\ndef serialize_warning_message(warning_message):\n if isinstance(warning_message.message, Warning):\n message_module = type(warning_message.message).module\n message_class_name = type(warning_message.message).name\n message_str = str(warning_message.message)\n # check now if we can serialize the warning arguments (#349)\n # if not, we will just use the exception message on the controller node\n try:\n dumps(warning_message.message.args)\n except DumpError:\n message_args = None\n else:\n message_args = warning_message.message.args\n else:\n message_str = warning_message.message\n message_module = None\n message_class_name = None\n message_args = None\n if warning_message.category:\n category_module = warning_message.category.module\n category_class_name = warning_message.category.name\n else:\n category_module = None\n category_class_name = None\n\n result = {\n "message_str": message_str,\n "message_module": message_module,\n "message_class_name": message_class_name,\n "message_args": message_args,\n "category_module": category_module,\n "category_class_name": category_class_name,\n }\n # access private _WARNING_DETAILS because the attributes vary between Python versions\n for attr_name in warning_message._WARNING_DETAILS:\n if attr_name in ("message", "category"):\n continue\n attr = getattr(warning_message, attr_name)\n # Check if we can serialize the warning detail, marking None otherwise\n # Note that we need to define the attr (even as None) to allow deserializing\n try:\n dumps(attr)\n except DumpError:\n result[attr_name] = repr(attr)\n else:\n result[attr_name] = attr\n return result\n\n\ndef getinfodict():\n import platform\n\n return dict(\n version=sys.version,\n version_info=tuple(sys.version_info),\n sysplatform=sys.platform,\n platform=platform.platform(),\n executable=sys.executable,\n cwd=os.getcwd(),\n )\n\n\ndef remote_initconfig(option_dict, args):\n option_dict["plugins"].append("no:terminal")\n return Config.fromdictargs(option_dict, args)\n\n\ndef setup_config(config, basetemp):\n config.option.looponfail = False\n config.option.usepdb = False\n config.option.dist = "no"\n config.option.distload = False\n config.option.numprocesses = None\n config.option.maxprocesses = None\n config.option.basetemp = basetemp\n\n\nif name == "channelexec":\n channel = channel # noqa\n workerinput, args, option_dict, change_sys_path = channel.receive()\n\n if change_sys_path is None:\n importpath = os.getcwd()\n sys.path.insert(0, importpath)\n os.environ["PYTHONPATH"] = (\n importpath + os.pathsep + os.environ.get("PYTHONPATH", "")\n )\n else:\n sys.path = change_sys_path\n\n os.environ["PYTEST_XDIST_TESTRUNUID"] = workerinput["testrunuid"]\n os.environ["PYTEST_XDIST_WORKER"] = workerinput["workerid"]\n os.environ["PYTEST_XDIST_WORKER_COUNT"] = str(workerinput["workercount"])\n\n if hasattr(Config, "InvocationParams"):\n config = _prepareconfig(args, None)\n else:\n config = remote_initconfig(option_dict, args)\n config.args = args\n\n setup_config(config, option_dict.get("basetemp"))\n config._parser.prog = os.path.basename(workerinput["mainargv"][0])\n config.workerinput = workerinput\n config.workeroutput = {}\n interactor = WorkerInteractor(config, channel)\n config.hook.pytest_cmdline_main(config=config)\nN\x00\x00\x006/usr/local/lib/python3.7/dist-packages/xdist/remote.pyLJ@\x00\x00\x00\x04Q' SETUP SEND --------------- [118] gw2-worker [receiver-thread] received DATA: b"JN\x00\x00\x00\x08workeridN\x00\x00\x00\x03gw2PN\x00\x00\x00\x0bworkercountF\x00\x00\x00\x02PN\x00\x00\x00\ntestrunuidN\x00\x00\x00 4aac3410757c43ac89115a8a092c4d36PN\x00\x00\x00\x08mainargvK\x00\x00\x00\x06F\x00\x00\x00\x00N\x00\x00\x00\x15/usr/local/bin/pytestPF\x00\x00\x00\x01N\x00\x00\x00\x02-mPF\x00\x00\x00\x02N\x00\x00\x00\trun_smokePF\x00\x00\x00\x03N\x00\x00\x00\x02-nPF\x00\x00\x00\x04N\x00\x00\x00\x012PF\x00\x00\x00\x05N\x00\x00\x00\x16--max-worker-restart=2PPK\x00\x00\x00\x05F\x00\x00\x00\x00N\x00\x00\x00\x02-mPF\x00\x00\x00\x01N\x00\x00\x00\trun_smokePF\x00\x00\x00\x02N\x00\x00\x00\x02-nPF\x00\x00\x00\x03N\x00\x00\x00\x012PF\x00\x00\x00\x04N\x00\x00\x00\x16--max-worker-restart=2PJN\x00\x00\x00\x08basetempN\x00\x00\x00'/tmp/pytest-of-root/pytest-11/popen-gw2PK\x00\x00\x00\x08F\x00\x00\x00\x00N\x00\x00\x00\x0e/usr/local/binPF\x00\x00\x00\x01N\x00\x00\x00\x15/usr/lib/python37.zipPF\x00\x00\x00\x02N\x00\x00\x00\x12/usr/lib/python3.7PF\x00\x00\x00\x03N\x00\x00\x00\x1e/usr/lib/python3.7/lib-dynloadPF\x00\x00\x00\x04N\x00\x00\x00&/usr/local/lib/python3.7/dist-packagesPF\x00\x00\x00\x05N\x00\x00\x00L/usr/local/lib/python3.7/dist-packages/allure_python_commons-2.7.1-py3.7.eggPF\x00\x00\x00\x06N\x00\x00\x00D/usr/local/lib/python3.7/dist-packages/allure_pytest-2.7.1-py3.7.eggPF\x00\x00\x00\x07N\x00\x00\x00\x1e/usr/lib/python3/dist-packagesP@\x00\x00\x00\x04Q" SETUP SENT --------------- [118] gw2-worker [receiver-thread] received [118] gw2-worker execution starts[3]: '"""\n This module is executed in remote subpro [118] gw2-worker 1 channel.del [118] gw2-worker sent [gw2] Python 3.7.3 (default, Jan 22 2021, 20:04:44) -- [GCC 8.3.0] gw2 ok / gw1 ok[71] gw1-worker sent gw2 ok / gw1 [77][gw2] node down: Not properly terminated, couldnt load message header, expected 9 bytes, got 0

replacing crashed worker gw2 [163] creating workergateway on <execnet.gateway_base.Popen2IO object at 0x7ff2c7d82e48> [163] gw3-worker [serve] spawning receiver thread [163] gw3-worker [serve] integrating as primary thread [163] gw3-worker [receiver-thread] RECEIVERTHREAD: starting to run DATA: b'N\x00\x00\x01\xb7\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\ndef rinfo_source(channel):\n import sys\n import os\n\n channel.send(\n dict(\n executable=sys.executable,\n version_info=sys.version_info[:5],\n platform=sys.platform,\n cwd=os.getcwd(),\n pid=os.getpid(),\n )\n )\nN\x00\x00\x009/usr/local/lib/python3.7/dist-packages/execnet/gateway.pyN\x00\x00\x00\x0crinfo_sourceJ@\x00\x00\x00\x04Q' [163] gw3-worker [receiver-thread] received

[163] gw3-worker calling rinfo_source( {}) [163] gw3-worker sent [163] gw3-worker execution finished [163] gw3-worker sent [163] gw3-worker 1 sent channel close message [gw3] linux Python 3.7.3 cwd: /MyNameNecklace/Tests gw3 C / gw1 [77]DATA: b'N\x00\x00!\x8f"""\n This module is executed in remote subprocesses and helps to\n control a remote testing session and relay back information.\n It assumes that \'py\' is importable and does not have dependencies\n on the rest of the xdist code. This means that the xdist-plugin\n needs not to be installed in remote environments.\n"""\n\nimport sys\nimport os\nimport time\n\nimport py\nimport pytest\nfrom execnet.gateway_base import dumps, DumpError\n\nfrom _pytest.config import _prepareconfig, Config\n\n\nclass WorkerInteractor:\n def init(self, config, channel):\n self.config = config\n self.workerid = config.workerinput.get("workerid", "?")\n self.testrunuid = config.workerinput["testrunuid"]\n self.log = py.log.Producer("worker-%s" % self.workerid)\n if not config.option.debug:\n py.log.setconsumer(self.log._keywords, None)\n self.channel = channel\n config.pluginmanager.register(self)\n\n def sendevent(self, name, kwargs):\n self.log("sending", name, kwargs)\n self.channel.send((name, kwargs))\n\n def pytest_internalerror(self, excrepr):\n formatted_error = str(excrepr)\n for line in formatted_error.split("\n"):\n self.log("IERROR>", line)\n interactor.sendevent("internal_error", formatted_error=formatted_error)\n\n def pytest_sessionstart(self, session):\n self.session = session\n workerinfo = getinfodict()\n self.sendevent("workerready", workerinfo=workerinfo)\n\n @pytest.hookimpl(hookwrapper=True)\n def pytest_sessionfinish(self, exitstatus):\n # in pytest 5.0+, exitstatus is an IntEnum object\n self.config.workeroutput["exitstatus"] = int(exitstatus)\n yield\n self.sendevent("workerfinished", workeroutput=self.config.workeroutput)\n\n def pytest_collection(self, session):\n self.sendevent("collectionstart")\n\n def pytest_runtestloop(self, session):\n self.log("entering main loop")\n torun = []\n while 1:\n try:\n name, kwargs = self.channel.receive()\n except EOFError:\n return True\n self.log("received command", name, kwargs)\n if name == "runtests":\n torun.extend(kwargs["indices"])\n elif name == "runtests_all":\n torun.extend(range(len(session.items)))\n self.log("items to run:", torun)\n # only run if we have an item and a next item\n while len(torun) >= 2:\n self.run_one_test(torun)\n if name == "shutdown":\n if torun:\n self.run_one_test(torun)\n break\n return True\n\n def run_one_test(self, torun):\n items = self.session.items\n self.item_index = torun.pop(0)\n item = items[self.item_index]\n if torun:\n nextitem = items[torun[0]]\n else:\n nextitem = None\n\n start = time.time()\n self.config.hook.pytest_runtest_protocol(item=item, nextitem=nextitem)\n duration = time.time() - start\n self.sendevent(\n "runtest_protocol_complete", item_index=self.item_index, duration=duration\n )\n\n def pytest_collection_finish(self, session):\n try:\n topdir = str(self.config.rootpath)\n except AttributeError: # pytest <= 6.1.0\n topdir = str(self.config.rootdir)\n\n self.sendevent(\n "collectionfinish",\n topdir=topdir,\n ids=[item.nodeid for item in session.items],\n )\n\n def pytest_runtest_logstart(self, nodeid, location):\n self.sendevent("logstart", nodeid=nodeid, location=location)\n\n def pytest_runtest_logfinish(self, nodeid, location):\n self.sendevent("logfinish", nodeid=nodeid, location=location)\n\n def pytest_runtest_logreport(self, report):\n data = self.config.hook.pytest_report_to_serializable(\n config=self.config, report=report\n )\n data["item_index"] = self.item_index\n data["worker_id"] = self.workerid\n data["testrun_uid"] = self.testrunuid\n assert self.session.items[self.item_index].nodeid == report.nodeid\n self.sendevent("testreport", data=data)\n\n def pytest_collectreport(self, report):\n # send only reports that have not passed to controller as optimization (#330)\n if not report.passed:\n data = self.config.hook.pytest_report_to_serializable(\n config=self.config, report=report\n )\n self.sendevent("collectreport", data=data)\n\n def pytest_warning_recorded(self, warning_message, when, nodeid, location):\n self.sendevent(\n "warning_recorded",\n warning_message_data=serialize_warning_message(warning_message),\n when=when,\n nodeid=nodeid,\n location=location,\n )\n\n\ndef serialize_warning_message(warning_message):\n if isinstance(warning_message.message, Warning):\n message_module = type(warning_message.message).module\n message_class_name = type(warning_message.message).name\n message_str = str(warning_message.message)\n # check now if we can serialize the warning arguments (#349)\n # if not, we will just use the exception message on the controller node\n try:\n dumps(warning_message.message.args)\n except DumpError:\n message_args = None\n else:\n message_args = warning_message.message.args\n else:\n message_str = warning_message.message\n message_module = None\n message_class_name = None\n message_args = None\n if warning_message.category:\n category_module = warning_message.category.module\n category_class_name = warning_message.category.name\n else:\n category_module = None\n category_class_name = None\n\n result = {\n "message_str": message_str,\n "message_module": message_module,\n "message_class_name": message_class_name,\n "message_args": message_args,\n "category_module": category_module,\n "category_class_name": category_class_name,\n }\n # access private _WARNING_DETAILS because the attributes vary between Python versions\n for attr_name in warning_message._WARNING_DETAILS:\n if attr_name in ("message", "category"):\n continue\n attr = getattr(warning_message, attr_name)\n # Check if we can serialize the warning detail, marking None otherwise\n # Note that we need to define the attr (even as None) to allow deserializing\n try:\n dumps(attr)\n except DumpError:\n result[attr_name] = repr(attr)\n else:\n result[attr_name] = attr\n return result\n\n\ndef getinfodict():\n import platform\n\n return dict(\n version=sys.version,\n version_info=tuple(sys.version_info),\n sysplatform=sys.platform,\n platform=platform.platform(),\n executable=sys.executable,\n cwd=os.getcwd(),\n )\n\n\ndef remote_initconfig(option_dict, args):\n option_dict["plugins"].append("no:terminal")\n return Config.fromdictargs(option_dict, args)\n\n\ndef setup_config(config, basetemp):\n config.option.looponfail = False\n config.option.usepdb = False\n config.option.dist = "no"\n config.option.distload = False\n config.option.numprocesses = None\n config.option.maxprocesses = None\n config.option.basetemp = basetemp\n\n\nif name == "channelexec":\n channel = channel # noqa\n workerinput, args, option_dict, change_sys_path = channel.receive()\n\n if change_sys_path is None:\n importpath = os.getcwd()\n sys.path.insert(0, importpath)\n os.environ["PYTHONPATH"] = (\n importpath + os.pathsep + os.environ.get("PYTHONPATH", "")\n )\n else:\n sys.path = change_sys_path\n\n os.environ["PYTEST_XDIST_TESTRUNUID"] = workerinput["testrunuid"]\n os.environ["PYTEST_XDIST_WORKER"] = workerinput["workerid"]\n os.environ["PYTEST_XDIST_WORKER_COUNT"] = str(workerinput["workercount"])\n\n if hasattr(Config, "InvocationParams"):\n config = _prepareconfig(args, None)\n else:\n config = remote_initconfig(option_dict, args)\n config.args = args\n\n setup_config(config, option_dict.get("basetemp"))\n config._parser.prog = os.path.basename(workerinput["mainargv"][0])\n config.workerinput = workerinput\n config.workeroutput = {}\n interactor = WorkerInteractor(config, channel)\n config.hook.pytest_cmdline_main(config=config)\nN\x00\x00\x006/usr/local/lib/python3.7/dist-packages/xdist/remote.pyLJ@\x00\x00\x00\x04Q' SETUP SEND --------------- [163] gw3-worker [receiver-thread] received DATA: b"JN\x00\x00\x00\x08workeridN\x00\x00\x00\x03gw3PN\x00\x00\x00\x0bworkercountF\x00\x00\x00\x02PN\x00\x00\x00\ntestrunuidN\x00\x00\x00 4aac3410757c43ac89115a8a092c4d36PN\x00\x00\x00\x08mainargvK\x00\x00\x00\x06F\x00\x00\x00\x00N\x00\x00\x00\x15/usr/local/bin/pytestPF\x00\x00\x00\x01N\x00\x00\x00\x02-mPF\x00\x00\x00\x02N\x00\x00\x00\trun_smokePF\x00\x00\x00\x03N\x00\x00\x00\x02-nPF\x00\x00\x00\x04N\x00\x00\x00\x012PF\x00\x00\x00\x05N\x00\x00\x00\x16--max-worker-restart=2PPK\x00\x00\x00\x05F\x00\x00\x00\x00N\x00\x00\x00\x02-mPF\x00\x00\x00\x01N\x00\x00\x00\trun_smokePF\x00\x00\x00\x02N\x00\x00\x00\x02-nPF\x00\x00\x00\x03N\x00\x00\x00\x012PF\x00\x00\x00\x04N\x00\x00\x00\x16--max-worker-restart=2PJN\x00\x00\x00\x08basetempN\x00\x00\x00'/tmp/pytest-of-root/pytest-11/popen-gw3PK\x00\x00\x00\x08F\x00\x00\x00\x00N\x00\x00\x00\x0e/usr/local/binPF\x00\x00\x00\x01N\x00\x00\x00\x15/usr/lib/python37.zipPF\x00\x00\x00\x02N\x00\x00\x00\x12/usr/lib/python3.7PF\x00\x00\x00\x03N\x00\x00\x00\x1e/usr/lib/python3.7/lib-dynloadPF\x00\x00\x00\x04N\x00\x00\x00&/usr/local/lib/python3.7/dist-packagesPF\x00\x00\x00\x05N\x00\x00\x00L/usr/local/lib/python3.7/dist-packages/allure_python_commons-2.7.1-py3.7.eggPF\x00\x00\x00\x06N\x00\x00\x00D/usr/local/lib/python3.7/dist-packages/allure_pytest-2.7.1-py3.7.eggPF\x00\x00\x00\x07N\x00\x00\x00\x1e/usr/lib/python3/dist-packagesP@\x00\x00\x00\x04Q" SETUP SENT --------------- [163] gw3-worker [receiver-thread] received [163] gw3-worker execution starts[3]: '"""\n This module is executed in remote subpro [163] gw3-worker 1 channel.del [163] gw3-worker sent [gw3] Python 3.7.3 (default, Jan 22 2021, 20:04:44) -- [GCC 8.3.0] gw3 ok / gw1 [77][gw3] node down: Not properly terminated, couldnt load message header, expected 9 bytes, got 0

maximum crashed workers reached: 2 DATA: b'N\x00\x00\x00\x08shutdownJ@\x00\x00\x00\x02Q' [71] gw1-worker [receiver-thread] received [71] gw1-worker sent DATA: b'' DATA: b'' DATA: b'' DATA: b'' [71] gw1-worker [receiver-thread] received [71] gw1-worker [receiver-thread] finishing receiving thread [71] gw1-worker [receiver-thread] terminating execution [71] gw1-worker shutting down execution pool [71] gw1-worker execution finished [71] gw1-worker [serve] joining receiver thread [71] gw1-worker [receiver-thread] closing read [71] gw1-worker waiting for receiver thread to finish [71] gw1-worker [receiver-thread] closing write [71] gw1-worker [receiver-thread] terminating our receive pseudo pool [71] === atexit cleanup <Group []> ===`

tamaskakuszi commented 3 years ago

@RonnyPfannschmidt Do you have any suggestion, please? Thank you

RonnyPfannschmidt commented 3 years ago

sorry, no idea at the moment, im currently not working on execnet and wont work myself into details within the next few months

tamaskakuszi commented 3 years ago

@nicoddemus Do you have any idea? Any help would be appreciated.

tamaskakuszi commented 3 years ago

sorry, no idea at the moment, im currently not working on execnet and wont work myself into details within the next few months

NP. Thank you :)