AgnostiqHQ / covalent

Pythonic tool for orchestrating machine-learning/high performance/quantum-computing workflows in heterogeneous compute environments.
https://www.covalent.xyz
Apache License 2.0
772 stars 91 forks source link

Error when parsing electron positional arguments in workflows #441

Closed venkatBala closed 2 years ago

venkatBala commented 2 years ago

Details When workflows are executing, there is a bug when parsing positional arguments for electrons with more than 2 arguments. For example, consider the following electron

import xgboost as xgb
import pandas as pd

@ct.electron
def train(model: xgb.XGBModel, X: pd.DataFrame, Y: targets):
     model.fit(X, Y)
     return trained_model

@ct.workflow
def workflow():
   trained_model = train(model, features, targets)

When this workflow is dispatched, the argument model gets interpreted as a pandas dataframe and an AttributeError is raised since a dataframe has not fit method. The error can be cirumvented when passing arguments to electron using kwargs i.e

@ct.workflow
def workflow():
   trained_model = train(model=model, X=features, Y=target)
kessler-frost commented 2 years ago

Another example:

import torch
import covalent as ct

@ct.electron
def cost(a):
    l=torch.sum(a ** 2)
    return l

@ct.electron
def move_forward_works(optimizer,point):
    loss = cost(point)
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()
    return loss,optimizer

@ct.electron
def move_forward_does_not_works(point,optimizer):
    loss = cost(point)
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()
    return loss,optimizer

@ct.lattice
def optimize(point,steps):
    optimizer = torch.optim.Adam([point], lr=.2)
    for i in range(steps):
        # loss,optimizer=move_forward_works(optimizer,point)
        loss,optimizer=move_forward_does_not_works(point,optimizer)
    return point

Switching the input parameters for some reason seems to work. Something weird must be happening in graph creation.

Edit: returning loss and optimizer and using the "not working" version of the function now.

scottwn commented 2 years ago

Hey team! Please add your planning poker estimate with ZenHub @FyzHsn @wjcunningham7 @kessler-frost

anushkrishnav commented 2 years ago

Where can I start? I get a gist of the issue, I will set up the project and will try to recreate the issue to better understand what's going on.

WingCode commented 2 years ago

@scottwn Could you please assign me this issue? I have started to look into it.

@venkatBala @kessler-frost I tried to reproduce this issue by modifying the examples you have given but I am able to successfully execute the job without any error.

from covalent.executor import DaskExecutor
from dask.distributed import LocalCluster
import covalent as ct

cluster = LocalCluster(processes=True)

dask_executor = DaskExecutor(scheduler_address=cluster.scheduler_address)

import xgboost as xgb
import pandas as pd
from typing import List

@ct.electron
def train(model: xgb.XGBModel, X: pd.DataFrame, Y: List):
     trained_model = model.fit(X, Y)
     return trained_model

@ct.lattice
def simple_workflow():
   model = xgb.XGBModel()
   features = pd.DataFrame({"feature1":[1,2]})
   targets = [0, 1]
   trained_model = train(model, features, targets)
   return trained_model

dispatch_id = ct.dispatch(simple_workflow)()

Another modified simpler example inspired from the covalent docs :) This also runs without any error when parsing positional arguments for electrons with more than 2 arguments.

import covalent as ct

# Construct tasks as "electrons"
@ct.electron
def join_words(a, b, c):
   return ", ".join([a, b, c])

@ct.electron
def excitement(a):
   return f"{a}!"

# Construct a workflow of tasks
@ct.lattice()
def test_workflow(a, b, c):
   phrase = join_words(a, b, c)
   return excitement(phrase)

# Dispatch the workflow
dispatch_id = ct.dispatch(test_workflow)("Hello", "World", "today")

This is the versions I have used.

cova==0.106.0
covalent-dask-plugin==0.3.0

Could you kindly provide a complete workflow code (with the ct.dispatch and inputs passed into lattice) with versions of cova used so that I can reproduce this issue on my end?

kessler-frost commented 2 years ago

Hey @WingCode ! So, I think this error only pops up in certain scenarios so in the case where you might've tried to use a custom example it might've not been an issue. But in the optimizer case, it was failing for me if I ran the move_forward_does_not_works function instead. Maybe it has something to do with the names of the arguments - maybe somewhere we are sorting the names and then passing it to the function.

I realize I've made a small error in the optimize example, editing it now, but it shouldn't have affected the root issue.

santoshkumarradha commented 2 years ago

@kessler-frost , Can confirm that

import torch
import covalent as ct

@ct.electron
def cost(a):
    l=torch.sum(a ** 2)
    return l

@ct.electron
def move_forward_works(optimizer,point):
    loss = cost(point)
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()
    return loss,optimizer

@ct.electron
def move_forward_does_not_works(point,optimizer):
    loss = cost(point)
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()
    return loss,optimizer

@ct.lattice
def optimize(point,steps):
    optimizer = torch.optim.Adam([point], lr=.2)
    for i in range(steps):
        # loss,optimizer=move_forward_works(optimizer,point)
        loss,optimizer=move_forward_does_not_works(point,optimizer)
    return point

gives me error.

WingCode commented 2 years ago

@kessler-frost , @venkatBala Thank you for the inputs. I made a runnable version from the above scripts you have shared (since points, steps parameters wasn't present in the code.) But unfortunately/fortunately (unfortunate that I am not able to help you with the issues; fortunate that code seems to be okay) the code is not breaking.

import torch
import covalent as ct

@ct.electron
def cost(a):
    l=torch.sum(a ** 2)
    return l

@ct.electron
def move_forward_works(optimizer,point):
    loss = cost(point)
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()
    return loss,optimizer

@ct.electron
def move_forward_does_not_works(point,optimizer):
    loss = cost(point)
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()
    return loss,optimizer

@ct.lattice
def optimize_test(point,steps):
    optimizer = torch.optim.Adam([point], lr=.2)
    for i in range(steps):
        loss1, optimizer1 = move_forward_works(optimizer, point)
        loss2, optimizer2 = move_forward_does_not_works(point, optimizer)
    return point

point = torch.randn(5, 5, dtype=torch.complex64, requires_grad=True)
dispatch_id = ct.dispatch(optimize_test)(point, 2)

Successful execution shown in the covalent-ui

image
  1. Could you run this script and let me know if it's working?
  2. Could you let me know the python versions of covalent which you are using? You can use pip freeze | grep cova to find out in the python environment where you have installed covalent.

Thank you for your patience! Without able to reproduce the issue on my end locally, I cannot fix it :(

Emmanuel289 commented 2 years ago

@WingCode Thanks for your contributions on this issue. We will circle back to you on updates from our end.

Cc @santoshkumarradha @venkatBala @kessler-frost

kessler-frost commented 2 years ago

@WingCode Thanks for the prompt response.

I'm using Covalent version 0.106.0 with python 3.8.13 on an Apple Silicon M1 MacOS 12.3.1.

@santoshkumarradha So the reason why our script is not working and @WingCode 's is, has something to do with the name of the optimizer variable if we change that then the move_forward_does_not_works function seems to be working fine:

...
optimizer = torch.optim.Adam([point], lr=.2)
    for i in range(steps):
        # this doesn't work
        loss, optimizer = move_forward_does_not_works(point, optimizer)

        # this will work
        loss, optimizer1 = move_forward_does_not_works(point, optimizer)
...

@WingCode A very simple example:

@ct.electron
def get_transform():
    return 5

@ct.electron
def get_dataset(train, transform, root):
    assert type(train) == bool
    assert type(transform) == int
    assert type(root) == str

@ct.lattice
def workflow():
    transform = get_transform()

    # this does not work
    get_dataset(False, transform, "turtles")

    # this works
    get_dataset(train=False, transform=transform, root="turtles")

ct.dispatch(workflow)()
WingCode commented 2 years ago

@kessler-frost Thanks for getting back with a detailed explanation! With your code, I am able to reproduce the issue in my local. Let me dig into the code and figure what's wrong 👍

WingCode commented 2 years ago

Finally I have figured out the issue! When we solely use only args in ct.electrons, it relies on insertion order of edges into networkx.DiGraph to determine the order of the arguments passed into the function. The problem happens when we try to serialise the graph using nx.readwrite.node_link_data before sending it over the network using: https://github.com/AgnostiqHQ/covalent/blob/8889f4423c80499115bd43a77f5a17e0667f4bc3/covalent/_workflow/transport.py#L302 This method doesn't preserve the insertion order of edges hence when we deserialise here, https://github.com/AgnostiqHQ/covalent/blob/8889f4423c80499115bd43a77f5a17e0667f4bc3/covalent/_workflow/transport.py#L348 it doesn't preserve insertion order of the edges.

Thereby finding the dependencies for particular node doesn't follow the insertion order and we are solely trusting on the order of the edges to build the args: https://github.com/AgnostiqHQ/covalent/blob/8889f4423c80499115bd43a77f5a17e0667f4bc3/covalent_dispatcher/_core/execution.py#L120-L127

Networkx graphs doesn't guarantee insertion order for edges.

So to resolve this issue we should give args the kwargs treatment by persisting the order of the elements, I believe.

I would love to work on this issue. I am all ears to any other better ways of tackling this.

WingCode commented 2 years ago

@scottwn / @Emmanuel289 Could you assign this issue to me? Thank you!

kessler-frost commented 2 years ago

I have to say @WingCode you did a pretty good job on finding, explaining, and documenting this issue! Feel free to start working on a PR to resolve this and make sure you put a [unitaryhack] label in the title of it.

So to resolve this issue we should give args the kwargs treatment by persisting the order of the elements, I believe.

Yeah, that is a viable solution. If possible let's try to find a way to solve this without adding additional attributes to the transport graph (using an edge number maybe?, although I'm not sure if something like that already exists in networkx), but if it seems to get to convoluted then we can go the route of storing the order as well.

santoshkumarradha commented 2 years ago

@kessler-frost, @scottwn Reopening this again as seems like few Demo workflows we are writing is still affected by it with Covalent==0.110.2. Do not have a minimum working example as the one we tested in demo was a complicated ML flow and seems to be working with kwargs passed, but throws up error related to argument being different type when passed by args.

santoshkumarradha commented 2 years ago

PS: @wjcunningham7 has the demo in which it did not work.

cjao commented 2 years ago

I'd probably start by recording the argument order "locally", just in the edges between two electrons, rather than keeping track of the global edge insertion order.

cjao commented 2 years ago

Hi @wjcunningham7, could you share (either here or privately) your reproducer?

wjcunningham7 commented 2 years ago

@cjao see PR #818. The workflow function needs to call construct and fit electrons using kwargs else it fails.

cjao commented 2 years ago

Here is a potential problem.

        self._graph.graph["edge_insertion_order"].append(x)

We can't track edges just by their parent nodes. In the extreme case where all nodes share the same parent, the edge sorting procedure has no information about the edge order:

def sort_edges_based_on_insertion_order(self):
    unsorted_edges = list(self._graph.edges(data=True))
    insertion_order = self._graph.graph["edge_insertion_order"]
    unsorted_edges_position_index = [insertion_order.index(i[0]) for i in unsorted_edges]  # This would be all 0's
    unsorted_index_map = zip(unsorted_edges_position_index, unsorted_edges)
    sorted_edge_list_with_index = sorted(unsorted_index_map, key=lambda x: x[0])
    sorted_edge_list = [i[1] for i in sorted_edge_list_with_index]

@wjcunningham7 's DNN workflow probably breaks when using args because the multiple construct and fit electrons share the same inputs.