flyteorg / flyte

Scalable and flexible workflow orchestration platform that seamlessly unifies data, ML and analytics stacks.
https://flyte.org
Apache License 2.0
5.68k stars 639 forks source link

[BUG] Dynamic workflows: Module from file /usr/local/bin/pyflyte-execute cannot be loaded #4245

Open iirekm opened 1 year ago

iirekm commented 1 year ago

Describe the bug

import sys
from typing import List
from flytekit import task, workflow, dynamic, Resources
from flytekit.remote import *
from flytekit.configuration import Config

@task(requests=Resources(mem="500Mi"))
def add1(x: int) -> int:
    return x + 1

@dynamic(requests=Resources(mem="500Mi"))
def q(n: int) -> int:
    ctr = 0
    for _ in range(n):
        ctr = add1(x=ctr)
    return ctr

@task
def fin(x: int):
    print(x)

@workflow
def workflow():
    fin(x=q(n=5))

What's going on here?

@task, @workflow work as expected. There's only problem with @dynamic.

Python: 3.11 Flytekit: 1.9.1

Full stack trace:


2023-10-16 19:50:34,538925 ERROR    2023-10-16 19:50:34,538 flytekit ERROR Exception when executing Module from file                               base_task.py:612
                                    /home/.../.venv/bin/pyflyte cannot be loaded                                                  
                                    ╭──────────────────────────────────── Traceback (most recent call last) ─────────────────────────────────────╮                 
                                    │ /home/.../demos/nnets/.venv/lib/python3.11/site-packages/flytekit/core/tracker.py:18  │                 
                                    │ in import_module_from_file                                                                                 │                 
                                    │                                                                                                            │                 
                                    │    15 def import_module_from_file(module_name, file):                                                      │                 
                                    │    16 │   try:                                                                                             │                 
                                    │    17 │   │   spec = importlib.util.spec_from_file_location(module_name, file)                             │                 
                                    │ ❱  18 │   │   module = importlib.util.module_from_spec(spec)                                               │                 
                                    │    19 │   │   spec.loader.exec_module(module)                                                              │                 
                                    │    20 │   │   return module                                                                                │                 
                                    │    21 │   except AssertionError:                                                                           │                 
                                    │ in module_from_spec:570                                                                                    │                 
                                    ╰────────────────────────────────────────────────────────────────────────────────────────────────────────────╯                 
                                    AttributeError: 'NoneType' object has no attribute 'loader'                                                                    

                                    The above exception was the direct cause of the following exception:                                                           

                                    ╭──────────────────────────────────── Traceback (most recent call last) ─────────────────────────────────────╮                 
                                    │ /home/.../.venv/lib/python3.11/site-packages/flytekit/core/base_task.py:6 │                 
                                    │ 10 in dispatch_execute                                                                                     │                 
                                    │                                                                                                            │                 
                                    │   607 │   │   │   logger.info(f"Invoking {self.name} with inputs: {native_inputs}")                        │                 
                                    │   608 │   │   │   try:                                                                                     │                 
                                    │   609 │   │   │   │   with timeit("Execute user level code"):                                              │                 
                                    │ ❱ 610 │   │   │   │   │   native_outputs = self.execute(**native_inputs)                                   │                 
                                    │   611 │   │   │   except Exception as e:                                                                   │                 
                                    │   612 │   │   │   │   logger.exception(f"Exception when executing {e}")                                    │                 
                                    │   613 │   │   │   │   raise e                                                                              │                 
                                    │                                                                                                            │                 
                                    │ /home/.../.venv/lib/python3.11/site-packages/flytekit/core/python_functio │                 
                                    │ n_task.py:180 in execute                                                                                   │                 
                                    │                                                                                                            │                 
                                    │   177 │   │   │   kwargs["async_ctx"] = FlyteContextManager.current_context()                              │                 
                                    │   178 │   │   │   return exception_scopes.user_entry_point(self._task_function)(**kwargs)                  │                 
                                    │   179 │   │   elif self.execution_mode == self.ExecutionBehavior.DYNAMIC:                                  │                 
                                    │ ❱ 180 │   │   │   return self.dynamic_execute(self._task_function, **kwargs)                               │                 
                                    │   181 │                                                                                                    │                 
                                    │   182 │   def _create_and_cache_dynamic_workflow(self):                                                    │                 
                                    │   183 │   │   if self._wf is None:                                                                         │                 
                                    │                                                                                                            │                 
                                    │ /home/.../.venv/lib/python3.11/site-packages/flytekit/core/python_functio │                 
                                    │ n_task.py:279 in dynamic_execute                                                                           │                 
                                    │                                                                                                            │                 
                                    │   276 │   │   │   # The rest of this function mimics the local_execute of the workflow. We                 │                 
                                    │       can't use the workflow                                                                               │                 
                                    │   277 │   │   │   # local_execute directly though since that converts inputs into Promises.                │                 
                                    │   278 │   │   │   logger.debug(f"Executing Dynamic workflow, using raw inputs {kwargs}")                   │                 
                                    │ ❱ 279 │   │   │   self._create_and_cache_dynamic_workflow()                                                │                 
                                    │   280 │   │   │   function_outputs = cast(PythonFunctionWorkflow, self._wf).execute(**kwargs)              │                 
                                    │   281 │   │   │                                                                                            │                 
                                    │   282 │   │   │   if isinstance(function_outputs, VoidPromise) or function_outputs is None:                │                 
                                    │                                                                                                            │                 
                                    │ /home/.../.venv/lib/python3.11/site-packages/flytekit/core/python_functio │                 
                                    │ n_task.py:188 in _create_and_cache_dynamic_workflow                                                        │                 
                                    │                                                                                                            │                 
                                    │   185 │   │   │   defaults = WorkflowMetadataDefaults(                                                     │                 
                                    │   186 │   │   │   │   interruptible=self.metadata.interruptible if self.metadata.interruptible             │                 
                                    │       is not None else False                                                                               │                 
                                    │   187 │   │   │   )                                                                                        │                 
                                    │ ❱ 188 │   │   │   self._wf = PythonFunctionWorkflow(self._task_function,                                   │                 
                                    │       metadata=workflow_meta, default_metadata=defaults)                                                   │                 
                                    │   189 │                                                                                                    │                 
                                    │   190 │   def compile_into_workflow(                                                                       │                 
                                    │   191 │   │   self, ctx: FlyteContext, task_function: Callable, **kwargs                                   │                 
                                    │                                                                                                            │                 
                                    │ /home/.../.venv/lib/python3.11/site-packages/flytekit/core/tracker.py:78  │                 
                                    │ in __call__                                                                                                │                 
                                    │                                                                                                            │                 
                                    │    75 │                                                                                                    │                 
                                    │    76 │   def __call__(cls, *args, **kwargs):                                                              │                 
                                    │    77 │   │   o = super(InstanceTrackingMeta, cls).__call__(*args, **kwargs)                               │                 
                                    │ ❱  78 │   │   mod_name, mod_file = InstanceTrackingMeta._find_instance_module()                            │                 
                                    │    79 │   │   o._instantiated_in = mod_name                                                                │                 
                                    │    80 │   │   o._module_file = mod_file                                                                    │                 
                                    │    81 │   │   return o                                                                                     │                 
                                    │                                                                                                            │                 
                                    │ /home/.../.venv/lib/python3.11/site-packages/flytekit/core/tracker.py:69  │                 
                                    │ in _find_instance_module                                                                                   │                 
                                    │                                                                                                            │                 
                                    │    66 │   │   │   │   │   return frame.f_globals["__name__"], frame.f_globals["__file__"]                  │                 
                                    │    67 │   │   │   │   # if the remote_deploy command is invoked in the same module as where                │                 
                                    │    68 │   │   │   │   # the app is defined, get the module from the file name                              │                 
                                    │ ❱  69 │   │   │   │   mod = InstanceTrackingMeta._get_module_from_main(frame.f_globals)                    │                 
                                    │    70 │   │   │   │   if mod is None:                                                                      │                 
                                    │    71 │   │   │   │   │   return None, None                                                                │                 
                                    │    72 │   │   │   │   return mod.__name__, mod.__file__                                                    │                 
                                    │                                                                                                            │                 
                                    │ /home/.../.venv/lib/python3.11/site-packages/flytekit/core/tracker.py:58  │                 
                                    │ in _get_module_from_main                                                                                   │                 
                                    │                                                                                                            │                 
                                    │    55 │   │                                                                                                │                 
                                    │    56 │   │   # make sure current directory is in the PYTHONPATH.                                          │                 
                                    │    57 │   │   sys.path.insert(0, str(curdir))                                                              │                 
                                    │ ❱  58 │   │   return import_module_from_file(module_name, file)                                            │                 
                                    │    59 │                                                                                                    │                 
                                    │    60 │   @staticmethod                                                                                    │                 
                                    │    61 │   def _find_instance_module():                                                                     │                 
                                    │                                                                                                            │                 
                                    │ /home/.../demos/nnets/.venv/lib/python3.11/site-packages/flytekit/core/tracker.py:25  │                 
                                    │ in import_module_from_file                                                                                 │                 
                                    │                                                                                                            │                 
                                    │    22 │   │   # handle where we can't determine the module of functions within the module                  │                 
                                    │    23 │   │   return importlib.import_module(module_name)                                                  │                 
                                    │    24 │   except Exception as exc:                                                                         │                 
                                    │ ❱  25 │   │   raise ModuleNotFoundError(f"Module from file {file} cannot be loaded") from exc              │                 
                                    │    26                                                                                                      │                 
                                    │    27                                                                                                      │                 
                                    │    28 class InstanceTrackingMeta(type):                                                                    │                 
                                    ╰────────────────────────────────────────────────────────────────────────────────────────────────────────────╯                 
                                    ModuleNotFoundError: Module from file /home/.../.venv/bin/pyflyte cannot be                   
                                    loaded                                                                                                                         
Failed with Unknown Exception <class 'ModuleNotFoundError'> Reason: Error encountered while executing 'workflow':
  Module from file /home/.../.venv/bin/pyflyte cannot be loaded
Error encountered while executing 'workflow':
  Module from file /home/.../.venv/bin/pyflyte cannot be loaded```

### Expected behavior

it should simply work

### Additional context to reproduce

_No response_

### Screenshots

_No response_

### Are you sure this issue hasn't been raised already?

- [X] Yes

### Have you read the Code of Conduct?

- [X] Yes
welcome[bot] commented 1 year ago

Thank you for opening your first issue here! 🛠

pingsutw commented 1 year ago

@iirekm I ran into same issue. this PR can fix it, could you give it a try?

iirekm commented 1 year ago

After further investigation:

wild-endeavor commented 11 months ago

@iirekm can you confirm that this has been fixed in the latest release of flytekit?

github-actions[bot] commented 2 months ago

Hello 👋, this issue has been inactive for over 9 months. To help maintain a clean and focused backlog, we'll be marking this issue as stale and will engage on it to decide if it is still applicable. Thank you for your contribution and understanding! 🙏