kubeflow / pipelines

Machine Learning Pipelines for Kubeflow
https://www.kubeflow.org/docs/components/pipelines/
Apache License 2.0
3.63k stars 1.63k forks source link

[bug] main.go:49: failed to execute component: failed to create task in a pipeline #11252

Open danilyef opened 2 months ago

danilyef commented 2 months ago

Environment

Steps to reproduce

When I try to run a simple pipeline, I am getting an error message in logs of second component in pipeline(train_model):

F0925 14:28:11.058914      42 main.go:49] failed to execute component: failed to create task: rpc error: code = Unavailable desc = connection error: desc = "transport: Error while dialing: dial tcp 10.96.183.202:8887: connect: connection refused"
time="2024-09-25T14:28:11.100Z" level=info msg="sub-process exited" argo=true error="<nil>"
Error: exit status 1
time="2024-09-25T14:28:11.515Z" level=info msg="sub-process exited" argo=true error="<nil>"
Error: exit status 1

Second component finishes successfully.

Here is the code:

import kfp
from kfp import dsl
from kfp.dsl import Dataset, Input, Model, Output
import torch
import torchvision

@dsl.component(base_image='python:3.11', packages_to_install=['torch', 'torchvision'])
def load_training_data(dataset: Output[Dataset]):
    import torch
    import torchvision
    import shutil
    from pathlib import Path
    import os
    # Load MNIST dataset

    os.makedirs('./app/data')
    train_dataset_obj = torchvision.datasets.MNIST(root=Path("./app/data"), train=True, download=True,
                                                   transform=torchvision.transforms.ToTensor())

    test_dataset_obj = torchvision.datasets.MNIST(root=Path("./app/data"), train=False, download=True,
                                                  transform=torchvision.transforms.ToTensor())

    shutil.move(Path("./app/data"), dataset.path)

@dsl.component(base_image='python:3.11', packages_to_install=['torch', 'torchvision'])
def train_model(dataset: Input[Dataset], model: Output[Model]):
    import torch
    import torchvision
    import shutil
    from pathlib import Path
    Path("/tmp").mkdir(exist_ok=True)

    shutil.copytree(dataset.path, Path("/tmp/data"), dirs_exist_ok=True)

    # Load datasets
    train_dataset_obj = torchvision.datasets.MNIST(root=Path("/tmp/data"), train=True, download=False,
                                                   transform=torchvision.transforms.ToTensor())

    test_dataset_obj = torchvision.datasets.MNIST(root=Path("/tmp/data"), train=False, download=False,
                                                  transform=torchvision.transforms.ToTensor())

    # Define a simple neural network
    model_obj = torch.nn.Sequential(
        torch.nn.Flatten(),
        torch.nn.Linear(28*28, 128),
        torch.nn.ReLU(),
        torch.nn.Linear(128, 10)
    )

    # Train the model
    optimizer = torch.optim.Adam(model_obj.parameters())
    criterion = torch.nn.CrossEntropyLoss()

    train_loader = torch.utils.data.DataLoader(train_dataset_obj, batch_size=64, shuffle=True)

    for epoch in range(1):  # Train for 1 epoch
        for batch in train_loader:
            inputs, targets = batch
            outputs = model_obj(inputs)
            loss = criterion(outputs, targets)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

    # Evaluate the model
    test_loader = torch.utils.data.DataLoader(test_dataset_obj, batch_size=64, shuffle=False)
    correct = 0
    total = 0
    with torch.no_grad():
        for batch in test_loader:
            inputs, targets = batch
            outputs = model_obj(inputs)
            _, predicted = torch.max(outputs.data, 1)
            total += targets.size(0)
            correct += (predicted == targets).sum().item()

    print(f'Accuracy of the network on the 10000 test images: {100 * correct / total:.2f}%')

    Path("/tmp/model").mkdir(exist_ok=True)
    # Save the trained model
    torch.save(model_obj.state_dict(), Path("/tmp/model/model.pth"))
    shutil.move(Path("/tmp/model/model.pth"), model.path)

@dsl.component(base_image='python:3.11', packages_to_install=['torch', 'torchvision', 'wandb'])
def save_trained_model(model: Input[Model]):
    import torch
    import shutil
    import wandb
    from pathlib import Path
    import os

    Path("/tmp/model").mkdir(exist_ok=True)

    shutil.copy(model.path, Path("/tmp/model"))

    # Load the trained model
    model_state_dict = torch.load(Path("/tmp/model/model.pth"))

    # Set the Weights & Biases API key
    os.environ["WANDB_API_KEY"] = 'my_api_key'

    # Initialize wandb
    wandb.init(project="mnist_training", name="model_upload")

    # Save and upload the model to wandb
    artifact = wandb.Artifact('mnist_model', type='model')
    with artifact.new_file('model.pth', mode='wb') as f:
        torch.save(model_state_dict, f)
    wandb.log_artifact(artifact)

    wandb.finish()

@dsl.pipeline(name='MNIST Training Pipeline')
def mnist_pipeline():
    load_data_task = load_training_data()
    train_model_task = train_model(
        dataset=load_data_task.outputs['dataset'],
    )
    save_model_task = save_trained_model(model=train_model_task.outputs['model'])

if __name__ == '__main__':
    # Compile the pipeline
    kfp.compiler.Compiler().compile(mnist_pipeline, 'mnist_pipeline.yaml')

    client = kfp.Client()
    pipeline_info = client.upload_pipeline(
        pipeline_package_path='mnist_pipeline.yaml',
        pipeline_name='MNIST Training Pipeline 3',
        description='A pipeline to train a model on the MNIST dataset'
    )

after that I start a run in UI.

Impacted by this bug? Give it a 👍.

rimolive commented 1 month ago

I don't think this is a bug, but it's also hard to see what the issue is. I'm unsure what k8s version minikube 1.16 uses but our tested k8s api version is 1.19. I'd recommend upgrade minikube to at least version 1.32.

At last, can you share the full logs? There might be something else that can help troubleshooting that issue.

HumairAK commented 1 month ago

yeah this:

transport: Error while dialing: dial tcp 10.96.183.202:8887: connect: connection refused

is probably happening when your launcher pod is trying to grpc.dial to the ml-pipeline-server pod, there can be many reasons for this, I would check the health of your server pod, see if persistence agent pod is showing any errors as well, it also connects to the ml-server-pod via grpc.dial, so I suspect it might also be having the same issues