snorkel-team / snorkel

A system for quickly generating training data with weak supervision
https://snorkel.org
Apache License 2.0
5.81k stars 857 forks source link

SparkLFApplier to return either RDD or df column #1544

Closed S-C-H closed 3 years ago

S-C-H commented 4 years ago

I find it strange that the SparkLFApplier and other, associated, functions return Numpy arrays instead of Spark RDDs or Columns.

I'm unsure as to how a numpy array is then supposed to be recombined with the RDD or dataframe given these are distributed objects.

The nicest solution would be to allow the user to go: df.withColumn(SparkLFApplier(*cols))

https://github.com/snorkel-team/snorkel/blob/master/snorkel/labeling/apply/spark.py

Edit: I've successfully re-combined this (I think). But I'm finding scalability quite challenging - this is where I feel a better connection between Snorkel and Spark would be helpful.


n_partitions = split_rdd.getNumPartitions()
L = applier.apply(split_rdd)
L_p = sc.parallelize(L,n_partitions)

weak_labels = L_p.mapPartitions(lambda arr: (label_model_bc.value.predict_proba(arr).max(axis=1), label_model_bc.value.predict(arr)))

split_data[i] = (split_rdd
            .zip(weak_labels)
            .map(lambda tup: Row(**tup[0].asDict(), WEAK_LABEL=tup[1][0], WEAK_LABEL_PROBABILITY=tup[1][1]) )
            .toDF(schema)
           )```
S-C-H commented 4 years ago

Updated.

My approach to tackle labeling 100m+ rows has been:

  1. Create a sampled L array of rules. (~1m rows)
  2. Apply the Label model on the sample.
  3. Broadcast the model.
  4. Chunk a dataframe (split into 2m rows). THEN:
    1. Create the array of rules for the sample.
    2. Apply the model for prediction and probabilities.
    3. Re-zip with the rdd into a dataframe.

This is quite slow on a nice-size cluster. I'm thinking a Pandas UDF might be a better approach here or attempt to create the L - rule based array on the entire dataset then predict in chunks. I'll look to try that next.

S-C-H commented 4 years ago

Update:

Tried

  1. A Pandas UDF: Not particularly intuitive given how these work. Each "Rule" would need to be split into it's own column in the dataframe.
  2. Created the L-Rule based array on the entire dataset and then writing out... the problem is the broadcast size for this array is large - here a sparse array would be useful.
jraiskin commented 4 years ago

I second @S-C-H, returning a numpy array is strange and could pose issues when working with Spark. My DataFrame is a sample of ~2.5M rows. I currently feed that L matrix as an np.array directly to the fit method. This does work on this scale but I assume it's a matter of scale or the cardinality of the LFs until this setup will break. I'll follow for any updates on this issue.

jraiskin commented 4 years ago

Update: Success, kind of! After a lot of trial and error, I reached to a working solution. tl;dr - I used mapPartition to predict locally on each partition, using the pandasApplier interface. What I tried and didn't work (partial list):

All of the above methods encountered either closure or pickling issues. It seems like pickle couldn't handle the lf decorator syntax and replacing pickle with dill seem to had pyspark-specific issues (dill tried to and failed to write the spark context to file).

Here's the version that did work. The idea was to define the lfs object as a closure so it will be available in the executors. Then I produce a pd.DataFrame in each executor and proceed with pandasApplier API. Note: I had issues converting between Pandas and Pyspark types, since int is nullable in Pyspark but not in native Pyothn / Pandas. Here I replace null values with -1 (which I know is not present in my data) and later in the classifier pipeline, the imputer treats the -1 values as missing.

_model_bc = sc.broadcast(label_model)

def get_proba_labels(df, spark):

    model = _model_bc.value    

    lfs = [lf1, lf2, ...]
    dtypes_list = df.dtypes

    def predict_proba_iterator(rows, dtypes_list):
        pd_df = pd.DataFrame([row.asDict() for row in rows])
        for col, dtype in dtypes_list:
            if dtype == 'int':
                pd_df[col] = pd_df[col].fillna(-1).astype(int)  # int can be nullable in spark but not in native python

        pd_df = predict_proba_pd_df(pd_df)

        for _, pd_row in pd_df.iterrows():
            yield T.Row(**pd_row.to_dict())

    def predict_proba_pd_df(df):
        applier = PandasLFApplier(lfs)

        l_matrix = applier.apply(df)
        df['label_proba'] = model.predict_proba(l_matrix).tolist()
        df['label_argmax'] = np.vectorize(np.argmax)(df['label_proba'])

        return df

    output_schema = T.StructType(
        df.schema.fields + [
            T.StructField('label_proba', T.ArrayType(T.FloatType())), 
            T.StructField('label_argmax', T.IntegerType())
        ]
    )
    rdd = df.rdd.mapPartitions(lambda x: predict_proba_iterator(x, dtypes_list))
    res_df = spark.createDataFrame(
        data=rdd, 
        schema=output_schema
    ).select(df.columns + [
        'label_proba',
        'label_argmax'
    ])
    return res_df

df_train_labeled = get_proba_labels(
    df_train,
    spark=spark
)
df_train_labeled.cache()
github-actions[bot] commented 4 years ago

This issue is stale because it has been open 90 days with no activity. Remove stale label or comment or this will be closed in 7 days.

jraiskin commented 4 years ago

keep

github-actions[bot] commented 3 years ago

This issue is stale because it has been open 90 days with no activity. Remove stale label or comment or this will be closed in 7 days.