datajoint / datajoint-python

Relational data pipelines for the science lab
https://datajoint.com/docs
GNU Lesser General Public License v2.1
168 stars 83 forks source link

Internal Error (1205, 'Lock wait timeout exceeded; try restarting transaction') #409

Closed Visdoom closed 6 years ago

Visdoom commented 6 years ago

Hey there, I am starting some classification jobs in parallel and from time to time a job gets aborted with the following message

Populating key  {'ds_id': 1, 'part_id': 3, 'statistic_id': 5, 'reduction_id': 0, 'classifier_id': 1}                               
Pairwise classification of  ('CBC1', 'CBC2')                                                                                       
Pairwise classification of  ('CBC1', 'CBC3A')                                                                                      
Traceback (most recent call last):                                                                                                 
  File "<string>", line 1, in <module>                                                                                             
  File "/gpfs01/berens/user/slaturnus/.local/lib/python3.5/site-packages/datajoint/autopopulate.py", line 141, in populate         
    make(dict(key))                                                                                                                
  File "/gpfs01/berens/user/slaturnus/Projects/BC_morphologies/git/sophie/schemata/classification.py", line 172, in _make_tuples   
    self.Pairwise().make_tuples(comb, X, y, copy.copy(m), copy.copy(key))                                                          
  File "/gpfs01/berens/user/slaturnus/Projects/BC_morphologies/git/sophie/schemata/classification.py", line 113, in make_tuples    
    self.insert1(d_)                                                                                                               
  File "/gpfs01/berens/user/slaturnus/.local/lib/python3.5/site-packages/datajoint/base_relation.py", line 137, in insert1         
    self.insert((row,), **kwargs)                                                                                                  
  File "/gpfs01/berens/user/slaturnus/.local/lib/python3.5/site-packages/datajoint/base_relation.py", line 280, in insert          
    itertools.chain.from_iterable((v for v in r['values'] if v is not None) for r in rows)))                                       
  File "/gpfs01/berens/user/slaturnus/.local/lib/python3.5/site-packages/datajoint/connection.py", line 129, in query              
    cur.execute(query, args)                                                                                                       
  File "/usr/local/lib/python3.5/dist-packages/pymysql/cursors.py", line 166, in execute                                           
    result = self._query(query)                                                                                                    
  File "/usr/local/lib/python3.5/dist-packages/pymysql/cursors.py", line 322, in _query                                            
    conn.query(q)                                                                                                                  
  File "/usr/local/lib/python3.5/dist-packages/pymysql/connections.py", line 856, in query                                         
    self._affected_rows = self._read_query_result(unbuffered=unbuffered)                                                           
  File "/usr/local/lib/python3.5/dist-packages/pymysql/connections.py", line 1057, in _read_query_result                           
    result.read()                                                                                                                  
  File "/usr/local/lib/python3.5/dist-packages/pymysql/connections.py", line 1340, in read                                         
    first_packet = self.connection._read_packet()                                                                                  
  File "/usr/local/lib/python3.5/dist-packages/pymysql/connections.py", line 1014, in _read_packet                                 
    packet.check_error()                                                                                                           
  File "/usr/local/lib/python3.5/dist-packages/pymysql/connections.py", line 393, in check_error                                   
    err.raise_mysql_exception(self._data)                                                                                          
  File "/usr/local/lib/python3.5/dist-packages/pymysql/err.py", line 107, in raise_mysql_exception                                 
    raise errorclass(errno, errval)                                                                                                
pymysql.err.InternalError: (1205, 'Lock wait timeout exceeded; try restarting transaction')  

When I restart that job everything works fine. I guess there is too much reading and writing on one of the tables but is there a way to circumvent that problem so that the jobs don't stop but wait?

dimitri-yatsenko commented 6 years ago

Often this has to do with the order of the primary key attributes in the tables that are selected from. Do you mind sharing the schema?

Visdoom commented 6 years ago

I terms of code or in terms of the erd?

eywalker commented 6 years ago

Code would be most helpful for this.

Visdoom commented 6 years ago

It's a bit messy. But a classification is basically defined over a data set. Each data set contains several classes, so I perform a pairwise classification for each pair of classes (in Classification().Pairwise()) and each pairwise classification performs 5-10 cross-validated runs to estimate the classification performance

@schema
class Classifier(dj.Lookup):
    definition = """
    classifier_id: serial # 
    ---
    classifier: varchar(100) # type of the classifier used. E.g. kNN = k-Nearest Neighbor or Logit = Logistic Regression
    params: varchar(200) # json dumped dictionary of classifier parameters. Make sure the names fit with function description
    ___
    UNIQUE INDEX(classifier, params)
    """

@schema
class ClassificationExperiment(dj.Computed):
    definition = """
    -> FeatureMap
    -> Classifier
    ---
    """

    class Pairwise(dj.Part):
        definition = """
        -> ClassificationExperiment
        group_a : varchar(25)   # type of first class
        group_b : varchar(25)   # type of second class
        ---
        model  : longblob          # pickled model
        """

        def make_tuples(self, comb, X, y, m, key):

            d_ = copy.copy(key)

            d_['group_a'] = comb[0]
            d_['group_b'] = comb[1]

            if X.shape[1] > 5:
                pca = PCA(copy=True, whiten=False, n_components=5)
            else:
                pca = None

            sm = SMOTE(k_neighbors=3, random_state=42)
            X, y = sm.fit_sample(X, y)

            n1, n2 = np.unique(y, return_counts=True)[1]

            if n2 < 10 or n1 < 10:
                kf = StratifiedKFold(n_splits=5, shuffle=True)
            else:
                kf = StratifiedKFold(n_splits=10, shuffle=True)

            runs = []
            for train_ix, test_ix in kf.split(X, y):

                r_ = copy.copy(d_)

                X_train = X[train_ix]
                y_train = y[train_ix]
                X_test = X[test_ix]
                y_test = y[test_ix]

                if pca:
                    X_train = pca.fit_transform(X_train)
                    X_test = pca.transform(X_test)

                try:
                    m.fit(X_train, y_train)

                    if type(m) == LogitNet:

                        coef = list(m.coef_[0])
                        coef.insert(0, m.intercept_)
                        r_['w'] = np.array(coef)

                    y_hat = np.array(m.predict(X_test)).reshape(-1)

                    r_['avg_training_score'] = m.score(X_train, y_train)
                    r_['avg_test_score'] = np.mean(y_hat == y_test)
                    r_['test_indices'] = test_ix
                    r_['training_indices'] = train_ix
                    r_['y_hat'] = (y_hat == comb[1]).astype(int)
                    runs.append(r_)
                except ValueError:
                    continue

            d_['model'] = pickle.dumps(m)

            # insert key
            self.insert1(d_)
            ClassificationCrossValidationRun().insert(runs)

    def _make_tuples(self, key):

        print('Populating key ', key)
        z = copy.copy(key)
        classifier = (Classifier() & z).fetch1('classifier')
        classifier_params = json.loads((Classifier() & z).fetch1('params'))

        # create classifier
        if classifier == 'kNN':
            m = KNeighborsClassifier(**classifier_params)
        elif classifier == 'Logit':
            m = LogitNet(**classifier_params)
        elif classifier == 'randomForest':
            m = RandomForestClassifier(**classifier_params)
        else:
            raise NotImplementedError('There is no classifier implemented for {0}'.format(classifier))

        # get data to base classification on
        df = FeatureMap().get_as_dataframe(z)
        y = df.columns.get_level_values('type')
        combinations = list(itertools.combinations(np.unique(y), 2))

        self.insert([z])
        # for each pairwise combination
        for comb in combinations:
            print('Pairwise classification of ', comb)

            X = df[list(comb)].replace(np.inf, np.nan).fillna(0).values.T
            y = df[list(comb)].columns.get_level_values('type')

            self.Pairwise().make_tuples(comb, X, y, copy.copy(m), copy.copy(key))

@schema
class ClassificationCrossValidationRun(dj.Lookup):
    definition = """
    run_id: serial
    -> ClassificationExperiment.Pairwise
    ---
    training_indices: blob   # indices of training data
    test_indices: blob       # indices of test data
    y_hat: blob              # predicted labels of test data after training
    avg_training_score: float   # average score of training set
    avg_test_score: float       # average score of test set
    w=NULL: blob              # if applicable, trained coefficient vector in this run
    """
dimitri-yatsenko commented 6 years ago

The problem arises from the inserts into ClassificationCrossValidationRun where the primary key does not start with the same attributes as that of ClassificationExperiment.Pairwise. Let me explain in more detail in the forthcoming post.

dimitri-yatsenko commented 6 years ago

You can fix the problem by making the primary key of ClassificationCrossValidationRun begin with ->ClassificationExperiment.Pairwise and making run_id the last attribute in the primary key.

True, you will not be able to make it autoincremented. You will need to generate run_id in the code. Since ClassificationCrossValidationRun is populated by ClassificationExperiment, in best DataJoint practices, it should be made its part table.

dimitri-yatsenko commented 6 years ago

It may also be caused by the auto-increment log. There were actually two problems that could cause timeouts due to locks. https://dev.mysql.com/doc/refman/5.7/en/innodb-locking.html

Here is a design that fits your use case that will not cause lockouts.


@schema 
class ClassificiationExperiment(dj.Computed):
    definition = ...

    class Pairwise(dj.Part):
        definition = ...

    class ValidationRun(dj.Part):
        definition = """
        -> master
        run_id: smallint 
        ---
        training_indices: blob   # indices of training data
        ...
        """
Visdoom commented 6 years ago

Hey there, thanks for the explanation! My problem with the part table is, that it syntactically would need to be a part table of the ClassificationExperiment().Pairwise() table. But I vaguely remember that data joint would not let me define a part table of a part table. I think I will now make it a part table of ClassificationExperiment and enforce the same keys as in Pairwise() + the run_id.

Visdoom commented 6 years ago

As a note: It would be really grand if one were able to drop part tables without having to drop the master.

dimitri-yatsenko commented 6 years ago

The part table is a different issue from the locking problem. Once you restructure the primary keys and foreign keys and replace auto_increment, the locking will no longer occur.

True, part tables cannot have part tables. However, part tables of the same master can depend on each other. Your design could be:

@schema 
class ClassificiationExperiment(dj.Computed):
    definition = ...

    class Pairwise(dj.Part):
        definition = ...

    class ValidationRun(dj.Part):
        definition = """
        -> ClassificationExperiment.Pairwise
        run_id: smallint 
        ---
        training_indices: blob   # indices of training data
        ...
        """
dimitri-yatsenko commented 6 years ago

If you don't mind, I will close this issue. This is something we need to cover in documentation.