pyiron / pyiron_workflow

Graph-and-node based workflows
BSD 3-Clause "New" or "Revised" License
10 stars 1 forks source link

extend node decorator to write/read node functions to/from repository #14

Open JNmpi opened 1 year ago

JNmpi commented 1 year ago

I played a bit with a class that extends the node generator to write the python function to a repository consisting of directories and a python file. Having such a features provide the following advantages:

A simple application of these classes and their functionality is given below:

Application of the node-based workflow class

Objective: Create a module that looks and feels like pyiron but is based on a nodes. Key criteria are:

Create an example node

Note: The decorator is a convinience function. You can directly define the node function in the corresponding python file.

from node_store import FunctionToFileConverter
np_nodes = FunctionToFileConverter('nodelib/math/my_numpy.py')
np_nodes.add_code('import numpy as np')
np_nodes.add_code('from typing import List')
from typing import List

@np_nodes
def linspace(i_start:int=0, i_end:int=1, n_steps:int=10) -> List[float]:
    return np.linspace(i_start, i_end, n_steps)
File 'nodelib/math/my_numpy.py' created successfully.

Construct an example workflow

from node_store import Workflow
wf = Workflow('nodelib')
wf
Workflow(nodes=dict_keys([]))
lin = wf.create.math.my_numpy.nodes.linspace()
lin
node: linspace
lin.input.i_start = -1
lin.execute()
array([-1.        , -0.77777778, -0.55555556, -0.33333333, -0.11111111,
        0.11111111,  0.33333333,  0.55555556,  0.77777778,  1.        ])
lin.input
Input Arguments: i_start: int, i_end: int, n_steps: int
lin.output
Output Argument: linspace: List

Since I cannot store .py files in the text I append the code below:

          import inspect
          import ast

          class FunctionToFileConverter:
              def __init__(self, filename):
                  self.filename = filename
                  self.function_dict = {}
                  self.additional_code = ''
                  self._namespace = None

              def __call__(self, func):
                  self.function_dict[func.__name__] = inspect.getsource(func)
                  self.create_file()
                  return func

              def __repr__(self):
                  num_functions = len(self.function_dict)
                  return f"FunctionToFileConverter(filename='{self.filename}', num_functions={num_functions})"

              def create_file(self):
                  with open(self.filename, 'w') as file:
                      file.write(self.additional_code + '\n\n')
                      for func_name, source_code in self.function_dict.items():
                          lines = source_code.strip().split('\n')
                          if len(lines) > 1:
                              function_source = '\n'.join(lines[1:])
                              file.write(function_source + '\n\n')
                  print(f"File '{self.filename}' created successfully.")

              def add_code(self, code):
                  self.additional_code += code + '\n'

          class FileToFunctionConverter:
              def __init__(self, filename, workflow=None):
                  self.filename = filename
                  self.function_dict = self.read_functions()
                  self.additional_code = self.read_added_code()
                  self._namespace = None
                  self._workflow = workflow

              # def __call__(self, func):
              #     print ('call: ', func)
              #     self.function_dict[func.__name__] = inspect.getsource(func)
              #     # self.create_file()
              #     return func

              def __repr__(self):
                  num_functions = len(self.function_dict)
                  return f"FileToFunctionConverter(filename='{self.filename}', num_functions={num_functions})"

              def add_code(self, code):
                  self.additional_code += code + '\n'

              def read_functions(self):
                  function_definitions = {}
                  with open(self.filename, 'r') as file:
                      source_code = file.read()

                  module = ast.parse(source_code)
                  for node in module.body:
                      if isinstance(node, ast.FunctionDef):
                          function_name = node.name
                          function_source = ast.unparse(node).strip()
                          function_definitions[function_name] = function_source

                  self.function_dict = function_definitions
                  return function_definitions 

              def read_added_code(self):
                  with open(self.filename, 'r') as file:
                      code = file.read()
                  functions_start = code.find("def")
                  if functions_start != -1:
                      self.additional_code = code[:functions_start]
                  else:
                      self.additional_code = code
                  return self.additional_code

              def create_namespace(self):
                  if self._namespace is None:
                      ns = {}
                      exec(self.additional_code, ns)
                      for func_name, source_code in self.function_dict.items():
                          exec(source_code, ns)
                      self._namespace = ns
                  return self._namespace

              @property
              def nodes(self):
                  return NamespaceWrapper(self.create_namespace(), self.function_dict, self._workflow)

          class NamespaceWrapper:
              def __init__(self, namespace, function_dict, workflow=None):
                  self.namespace = namespace
                  self.function_dict = function_dict
                  self._workflow = workflow

              def __getattr__(self, name):
                  return extract_arguments(self.namespace[name], self._workflow, name)

              def __dir__(self):
                  return list(self.function_dict.keys())

              def __repr__(self):
                  return str(list(self.function_dict.keys()))  

          from functools import wraps
          import inspect

          class InputArguments:
              """
              Class representing the input arguments of the wrapped function.

              Provides convenient access to the input arguments and their values.

              """
              def __init__(self, arguments):
                  """
                  Initialize the InputArguments instance.

                  Args:
                      arguments (dict): Dictionary containing the input arguments and their values.

                  """
                  self.arguments = arguments

              def keys(self):
                  """
                  Get the keys of the input arguments.

                  Returns:
                      Keys of the input arguments.

                  """
                  return self.arguments.keys()

              def items(self):
                  """
                  Get the items (key-value pairs) of the input arguments.

                  Returns:
                      Items (key-value pairs) of the input arguments.

                  """
                  return self.arguments.items()

              def values(self):
                  """
                  Get the values of the input arguments.

                  Returns:
                      Values of the input arguments.

                  """
                  return self.arguments.values()

              def __getitem__(self, key):
                  """
                  Get the value of an input argument by its key.

                  Args:
                      key: The key of the input argument.

                  Returns:
                      The value of the input argument.

                  """
                  return self.arguments[key]

              def __setitem__(self, key, value):
                  """
                  Set the value of an input argument by its key.

                  Args:
                      key: The key of the input argument.
                      value: The value to set.

                  """
                  self.arguments[key] = value

              def __getattr__(self, name):
                  """
                  Get the value of an input argument by its name.

                  Args:
                      name: The name of the input argument.

                  Returns:
                      The value of the input argument.

                  """
                  return self.arguments[name]

              def __setattr__(self, name, value):
                  """
                  Set the value of an input argument by its name.

                  Args:
                      name: The name of the input argument.
                      value: The value to set.

                  """
                  if name == "arguments":
                      super().__setattr__(name, value)
                  else:
                      self.arguments[name] = value

              def __dir__(self):
                  """
                  Get the list of attribute names (input argument names).

                  Returns:
                      List of attribute names.

                  """
                  default_attributes = dir(type(self))
                  argument_keys = list(self.arguments.keys())
                  return default_attributes + argument_keys

              def __repr__(self):
                  """
                  Get the string representation of the input arguments.

                  Returns:
                      String representation of the input arguments.
                  """
                  argument_repr = [f"{name}: {type(value).__name__}" for name, value in self.items()]
                  return f"Input Arguments: {', '.join(argument_repr)}"

          class OutputArguments:
              """
              Class representing the output argument (return type) of a function.

              Provides access to the name and type of the output argument.
              """

              def __init__(self, name, return_type):
                  """
                  Initialize the OutputArguments instance.

                  Args:
                      name (str): The name of the output argument.
                      return_type: The type of the output argument.
                  """
                  self.name = name
                  self.return_type = return_type

              def __repr__(self):
                  """
                  Get the string representation of the output argument.

                  Returns:
                      String representation of the output argument.
                  """
                  return f"Output Argument: {self.name}: {self.return_type.__name__}"   

          def extract_arguments(func, workflow=None, node_name=None):
              """
              Decorator that extracts the arguments and return type from a function's signature.

              Args:
                  func: The function to be wrapped.

              Returns:
                  The wrapped function with extracted arguments and return type.
              """
              signature = inspect.signature(func)
              parameters = signature.parameters

              class FunctionWrapper:
                  """
                  Wrapper class for the decorated function.

                  Provides access to the input and output arguments, and allows execution of the function.
                  """

                  def __init__(self, *args, **kwargs):
                      """
                      Initialize the FunctionWrapper instance.

                      Args:
                          *args: Positional arguments to be passed to the wrapped function.
                          **kwargs: Keyword arguments to be passed to the wrapped function.
                      """
                      bound_arguments = signature.bind(*args, **kwargs)
                      bound_arguments.apply_defaults()
                      arguments = bound_arguments.arguments

                      input_arguments = InputArguments(arguments)
                      self.inputs_type = {}
                      for name, value in arguments.items():
                          #input_arguments[name] = value
                          #input_arguments[name + "_type"] = parameters[name].annotation
                          self.inputs_type[name] = parameters[name].annotation

                      self.input = input_arguments
                      self.output = OutputArguments(func.__name__, signature.return_annotation)

                      self._workflow = workflow
                      self._name = node_name
                      self._path = None  # TODO: include file path where function has been defined
                      if workflow is not None:
                          if node_name not in workflow.nodes.keys():
                              self._workflow.nodes[node_name] = self
                          else:
                              print ('Node exists already: ', node_name)

                  def execute(self):
                      """
                      Execute the wrapped function with the provided input arguments.

                      Returns:
                          The result of the function execution.
                      """
                      return func(**self.input)

                  def __repr__(self):
                      return f'node: {self._name}'

              return FunctionWrapper

          import os

          class FileDictionary:
              def __init__(self, path, workflow=None):
                  self.path = path
                  self.file_dict = {}
                  self.dir_dict = {}
                  self._workflow = workflow

                  self._create_file_dict()
                  self._create_dir_dict()

              def __getitem__(self, key):
                  if key in self.file_dict:
                      return FileToFunctionConverter(self.file_dict[key], workflow=self._workflow)
                  elif key in self.dir_dict:
                      return FileDictionary(self.dir_dict[key], workflow=self._workflow)
                  else:
                      raise KeyError(f"Key '{key}' not found.")

              def __getattr__(self, name):
                  if (name in self.file_dict) or (name in self.dir_dict):
                      return self.__getitem__(name)
                  else:
                      raise AttributeError(f"Attribute '{name}' not found.")

              def _create_file_dict(self):
                  for file in os.listdir(self.path):
                      if not file.startswith(".") and file.endswith(".py"):
                          file_name = os.path.splitext(file)[0]
                          file_path = os.path.join(self.path, file)
                          self.file_dict[file_name] = file_path

              def _create_dir_dict(self):
                  for item in os.listdir(self.path):
                      item_path = os.path.join(self.path, item)
                      if os.path.isdir(item_path) and not item.startswith("."):
                          self.dir_dict[item] = item_path

              def __dir__(self):
                  default_attributes = dir(type(self))
                  file_keys = list(self.file_dict.keys())
                  dir_keys = list(self.dir_dict.keys())
                  return default_attributes + file_keys + dir_keys

          class DictList:
              def __init__(self, label='DictList'):
                  self._dict = {}
                  self._label = label

              def __setitem__(self, key, value):
                  self._dict[key] = value

              def __getitem__(self, item):
                  return self._dict[item]

              def __getattr__(self, name):
                  if name in self._dict.keys():
                      return self.__getitem__(name)
                  else:
                      raise AttributeError(f"Attribute '{name}' not found.")

              def __repr__(self):
                  return f'{self._label}({[k for k in self._dict.keys()]})'

              def keys(self):
                  return self._dict.keys()

              def values(self):
                  return self._dict.values()

              def __dir__(self):
                  return self._dict.keys()

          class Workflow:
              def __init__(self, path_libs='.'):
                  self._path_libs = path_libs
                  self.create = FileDictionary(path=path_libs, workflow=self)
                  self.nodes = DictList(label='Nodes')
                  self.libs = FileDictionary(self._path_libs)

              def __getattr_rm__(self, name):
                  # print ('workflow.__getattr__')
                  if name.startswith('add.'):
                      func_name = name[4:]
                      return lambda: self.add(func_name, getattr(self, func_name))
                  raise AttributeError(f"'{type(self).__name__}' object has no attribute '{name}'")

              def add(self, node_name, node):
                  self.nodes[node_name] = node

              def __repr__(self):
                  return f"Workflow(nodes={self.nodes.keys()})"

EDIT: github syntax highlighting

liamhuber commented 1 year ago

I played a bit with a class that extends the node generator to write the python function to a repository consisting of directories and a python file. Having such a features provide the following advantages:

  • The user can stay inside the notebook to add new node repositories or nodes to existing ones. Of course, if preferred users can directly add/edit such files outside jupyter using their editor of choice.
  • Storing and sharing workflows requires to store not only the state of the node but also the node definition. For nodes defined in a local jupyter session, such functionality provides an easy way to store local nodes.
  • This feature naturally extends our create construction.

@JNmpi, awesome! It will take me a while to go through the classes in detail, but overall this sounds A-OK to me!

Key criteria are:

  • Make adding new nodes easy (via decorator or simply adding functions to a file)

I like being able to have plain-text code get serialized from inside the notebook. I have some mild concern about the "simply adding functions to a file" part, as the choice of which node decorator to apply ("standard" (i.e. "fast"), "slow", or "single-value") is not totally trivial, and I'm not sure we want to automate that part away just so that users don't need to add a single @Workflow.wrap_as.... line above their functions. But this concern is mild and I may change my tune after digging into your implementation

  • Use directories to build up a class schema (e.g. workflow.create.math.numpy)

Being able to register an entire directory with multiple sub-modules of stuff to import instead of only being able to register a single depth of namespace after create (or currently add) sounds good to me!

  • Make nodes delayed (only node.execute() executes the node

I am a bit confused how this relates to our existing paradigms and terminology. Currently we call it run() to execute the node functionality. The old default (main:HEAD:workflow.node.Node) is to be delayed (run_on_updates=False, update_on_instantiation=False), the new default (#729 workflow.function.Function) is to aggressively re-run the node (both the flags set to True) whenever the input changes (and is all compliant with type hints); In pyiron/pyiron_contrib#729 has a Slow(Function) class available with the old delayed defaults.

JNmpi commented 1 year ago

Thanks, @Liam for your quick reply and thoughts. We can set up a Zoom meeting for one of the next days to discuss these ideas in more detail.

Regarding the last point. This is identical to the last topic in the issue 'Suggestions and issues for the workflow class pyiron/pyiron_contrib#756'. Regarding the key word I am completely open. We can keep it 'run', but I am also open to something like 'compute', 'evaluate' or 'execute'. What I like to have is to have a workflow of delayed nodes that I can execute with a single command (i.e. workflow.run() or top_node.run()) and that runs all necessary nodes.

liamhuber commented 1 year ago

We can set up a Zoom meeting for one of the next days to discuss these ideas in more detail.

Sure thing. I'm available Thursday and Friday >1500 CET (>1700 CET would be even better, as then I can help get the kids out the door, and to set a Thursday meeting I'll need ~12 hours heads-up so that I know to set an alarm.)

Since we also plan to discuss this on Monday, I'm also ok waiting until then.

Regarding the last point. ... What I like to have is to have a workflow of delayed nodes that I can execute with a single command (i.e. workflow.run() or top_node.run()) and that runs all necessary nodes.

Super! More detail over on pyiron/pyiron_contrib#756, but I think we're well-aligned on wanting exactly such a feature/interface available to users.

JNmpi commented 1 year ago

For me Today (Thursday) >17 CET or e.g. next Monday would be OK. To prepare and focus the meeting on Monday it may be good to have a short meeting today.

liamhuber commented 1 year ago

For me Today (Thursday) >17 CET or e.g. next Monday would be OK. To prepare and focus the meeting on Monday it may be good to have a short meeting today.

@JNmpi sounds good -- I'll be in the pyiron zoom room around 1715

JNmpi commented 1 year ago

Great! I will there be too.

liamhuber commented 1 month ago

@samwaseda, here is the existing thoughts on what we were talking about. Some of it is a bit outdated (e.g. there are no longer fast/slow/single value nodes -- this has all been unified), but other parts are relevant.

I guess what we had in mind may be narrower in scope, i.e. something like

from pyiron_workflow import Workflow

import math

@Workflow.wrap.as_function_node("y")
def Foo(n: int, k: int):
    return math.perm(n, k)

Foo.to_py_file("scratch")

Populating scratch.py with something along the lines of

from pyiron_workflow import Workflow

@Workflow.wrap.as_function_node("y")
def Foo(n: int, k: int):
    import math
    return math.perm(n, k)

Per our conversation, I've assigned you here, but this issue is quite old and I think we can modify the scope of it as we go.

Skimming over this and thinking a bit, I wonder how the export method would handle things if a file already exists? Can it cleverly merge in nodes of the same name by overwriting and simply append nodes of a new name? One of the tricky bits we anticipated was dependency management; if we get merging running how will the merge handle the dependencies changing, e.g. the old version required from foo import bar but the new version does not? Can(should?) we leverage AI for any of this? This sort of "extract and modify very slightly" is something I've found GPT to be passably good at.

samwaseda commented 1 month ago

Skimming over this and thinking a bit, I wonder how the export method would handle things if a file already exists? Can it cleverly merge in nodes of the same name by overwriting and simply append nodes of a new name? One of the tricky bits we anticipated was dependency management; if we get merging running how will the merge handle the dependencies changing, e.g. the old version required from foo import bar but the new version does not? Can(should?) we leverage AI for any of this? This sort of "extract and modify very slightly" is something I've found GPT to be passably good at.

Can’t we simply set something like overwrite = True? My biggest problem right now is that I don’t really know how we can automatically detect math and writeimport math` in the file, but whether the line can be correctly exported or we can only see that there are undefined variables, I don’t think it’s a deal breaker to overwrite or not do anything at all when the file already exists. Or did I miss something crucial here maybe?

liamhuber commented 4 weeks ago

Finding the necessary dependencies is definitely the more serious issue -- that's rather a deal breaker. Although to get the ball rolling we could do something like say all imports need to happen inside the node definition and if they don't things will break and it's your fault.

Overwriting is fine, but unless we want each node to be completely alone in its own file, we need some way of merging content into an existing node -- at which point we need to be able to tell if we're overriding old content or appending new content. Unlike determining imports, I don't see any fundamental technical barrier here or some missing knowledge, but it's still a matter of doing the legwork to get a parser running that knows (at a minimum) how to isolate and replace the decorated function definition when a new one with the same name is provided.