Netflix / metaflow

Open Source Platform for developing, scaling and deploying serious ML, AI, and data science systems
https://metaflow.org
Apache License 2.0
8.26k stars 774 forks source link

Resume Doesn't Resume from the Last Failed Step #1851

Closed EdIzaguirre closed 3 months ago

EdIzaguirre commented 6 months ago

Hi all,

I am trying to use metaflow resume to resume my flow from the last failed step. Unfortunately, it seems to resume at the step prior to the failed step. Why is this happening? Is this a bug? My gut told me that it is the @batch decorator, since the train_model step is the only one with the decorator. But upon removing @batch and doing local compute it still resumed from training. The train step succeeded, and the test step failed, but resume always restarts train. For reference here are the relevant steps:

@batch(memory=40000)
    @conda(libraries={'numpy': '1.23.5', 'tensorflow': '2.15'})
    @step
    def train_model(self):
        """
        Train models in parallel and store KPIs and path for downstream consumption

        """
        # from comet_ml import Experiment
        from model.lstm_model import get_model
        import numpy as np
        from tensorflow.keras.optimizers import Adam  # pylint: disable=import-error
        from tensorflow.keras.callbacks import EarlyStopping  # pylint: disable=import-error 
        from tensorflow.keras.preprocessing.text import Tokenizer  # pylint: disable=import-error
        from tensorflow.keras.preprocessing.sequence import pad_sequences  # pylint: disable=import-error

        # TODO: pick a sensible EXP name!!!
        self.COMET_EXP_NAME = 'my_lstm_recs'
        # define some hyper parameters for model
        self.hypers = {
            'EMBEDDING_DIM': 1,
            'LSTM_HIDDEN_DIM': 1,
            'MAX_LEN': 20,
            'LEARNING_RATE': 1e-3,
            'DROPOUT': 0.3
        }
        # init comet object for tracking
        #  comet_exp = Experiment(project_name=self.COMET_EXP_NAME)
        # linking task to experiment
        #  comet_exp.add_tag(current.pathspec)
        #  comet_exp.log_parameters(self.hypers)

        # get sessions for training
        train_sessions = self.session_dataset.x_train
        # convert to strings for keras tokenization
        train_sessions = [' '.join(s) for s in train_sessions]
        # init tokenizer
        tokenizer = Tokenizer(
            filters='',
            lower=False,
            split=' ',
            oov_token='<UNK>'
        )
        # fit on training data to initialize vocab
        tokenizer.fit_on_texts(train_sessions)
        VOCAB_SIZE = len(tokenizer.word_index)
        # convert sessions to tokens
        train_sessions_token = tokenizer.texts_to_sequences(train_sessions)
        # get N-1 items as seed
        x_train = [s[:-1] for s in train_sessions_token]
        # pad to MAX_LEN
        x_train = np.array(pad_sequences(x_train, maxlen=self.hypers['MAX_LEN']))
        # get last item as label;
        # TODO: Decrementing index here because 0 is reserved for masking; Find a better way around this.
        y_train = np.array([s[-1] - 1 for s in train_sessions_token])
        print("NUMBER OF SESSIONS : {}".format(x_train.shape[0]))
        print('First 3 x:', x_train[:3])
        print('First 3 y:', y_train[:3])

        # get model
        model = get_model(VOCAB_SIZE,
                          self.hypers['MAX_LEN'],
                          self.hypers['EMBEDDING_DIM'],
                          self.hypers['LSTM_HIDDEN_DIM'],
                          self.hypers['DROPOUT'])
        # compile model
        model.compile(optimizer=Adam(learning_rate=self.hypers['LEARNING_RATE']),
                      loss='sparse_categorical_crossentropy',
                      metrics=['acc'])
        model.summary()
        # fit model
        model.fit(x_train, y_train,
                  epochs=1,
                  verbose=2,
                  batch_size=32,
                  validation_split=0.2,
                  callbacks=[EarlyStopping(patience=20)])
        # comet_exp.end()
        # save model info
        self.model = {
            'model': model.to_json(),
            'model_weights': model.get_weights(),
            'tokenizer': tokenizer.to_json(),
            'model_config': {
                'vocab_size': VOCAB_SIZE,
                'max_len': self.hypers['MAX_LEN'],
                'embedding_dim': self.hypers['EMBEDDING_DIM'],
                'lstm_hidden_dim': self.hypers['LSTM_HIDDEN_DIM'],
                'dropout': self.hypers['DROPOUT']
            }
        }
        self.next(self.test_model)

@card(type='blank', id='recCard')
    # @batch(memory=40000)
    @conda(libraries={'numpy': '1.23.5', 'tensorflow': '2.15', 'matplotlib': '3.8.4', 'scikit-learn': '1.4'})
    @step
    def test_model(self):
        """
        Load the train model and use a custom RecList to test it
        and report its performance!
        """
        from model.lstm_model import LSTMRecModel
        import matplotlib.pyplot as plt
        import tensorflow as tf
        from sklearn.preprocessing import OneHotEncoder
        import numpy as np
        rec_model = LSTMRecModel(model_dict=self.model)
        y_preds = rec_model.predict(prediction_input=self.session_dataset.x_test)
        self.predictions = y_preds

        # Initialize the OneHotEncoder
        encoder = OneHotEncoder(sparse_output=False)

        y_true = self.session_dataset.y_test

        # Fit and transform y_true
        y_true = np.array(y_true)
        y_true_encoded = encoder.fit_transform(y_true.reshape(-1, 1))

        # Compute AUC-ROC using TensorFlow
        auc = tf.keras.metrics.AUC(curve='ROC')
        auc.update_state(y_true_encoded, y_preds)
        print('AUC-ROC:', auc.result().numpy())

        # Compute ROC curve points
        fpr, tpr, _ = tf.metrics.roc_curve(y_true_encoded, y_preds)

        # Plot the ROC curve
        fig = plt.figure()
        plt.figure()
        plt.plot(fpr, tpr, label='ROC curve (area = %0.2f)' % auc.result().numpy())
        plt.plot([0, 1], [0, 1], 'k--')
        plt.xlim([0.0, 1.0])
        plt.ylim([0.0, 1.05])
        plt.xlabel('False Positive Rate')
        plt.ylabel('True Positive Rate')
        plt.title('Receiver Operating Characteristic Curve')
        plt.legend(loc="lower right")
        plt.show()

        current.card.append(Image.from_matplotlib(fig))
        self.next(self.deploy_model)
savingoyal commented 3 months ago

@EdIzaguirre apologies for the delay here, but are you still running into this issue?