alteryx / featuretools

An open source python library for automated feature engineering
https://www.featuretools.com
BSD 3-Clause "New" or "Revised" License
7.26k stars 879 forks source link

"0 instance ids provided" errors while running feature generation on Spark (Bug?) #1796

Open dnmca opened 2 years ago

dnmca commented 2 years ago

"0 instance ids provided" errors while running feature generation on Spark


Bug/Feature Request Description

I'm trying to generate set of features using parallel execution on Spark cluster (Google's Dataproc). Basically, my workflow looks like this:

import featuretools as ft
from pyspark.sql import SparkSession
from pyspark.sql.types import *

df = spark.read.format.jdbc ... # read raw data

def build_entity_set(df):
        mock_entity_set =  ft.EntitySet('foo').entity_from_dataframe(
                entity_id='foo',
                index='foo_id',
                time_index='foo_created',
                secondary_time_index={'foo_created': ['is_foo']},
                dataframe=df,
                variable_types={...}
        ).normalize_entity(
                'foo',
                'bar',
                'bar_id',
                additional_variables=[...],
                make_time_index='bar_created'
        )

mock_entity_set = build_entity_set(df.limit(10).toPandas())

mock_feature_matrix, features = ft.dfs(
   entityset=mock_entity_set,
   agg_primitives=[...], # set of library and custom primitives
   trans_primitives=['time_since', 'is_weekend', 'weekday'],
   drop_exact=[
       'bar_id', ... # and some other Id columns
   ],
   cutoff_time=mock_entity_set['foo'].df[['foo_id', 'foo_created', 'is_foo']],
   target_entity='foo',
   max_depth=10,
)

schema = StructType([
    StructField(field.name, LongType(), True)
    if type(field.dataType) is DecimalType
    else
    field
    for field in self.spark.createDataFrame(mock_feature_matrix).schema.fields
])

def generate_features(pdf):
    return ft.calculate_feature_matrix(
        features=features,
        entityset=build_entity_set(pdf)
    ).reset_index(drop=True)

dataset = (
            df
                .groupby('bar_id')
                .applyInPandas(
                    generate_features,
                    schema
            )
        )

dataset\
            .write\
            .mode("overwrite")\
            .format("parquet")\
            .option("path", OUTPUT_PATH)\
            .save()

So every now and then when I launch feature generation for large number of instances in entity set (7M+), I would see Spark tasks lost for the following reason:

21/11/25 16:37:25 WARN org.apache.spark.scheduler.TaskSetManager: Lost task 14.2 in stage 102.0 (TID 1161) (foo-pyspark-w-11.us-central1-a.c.my-foo-project.internal executor 65): org.apache.spark.SparkException: Task failed while writing rows.
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:296)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$15(FileFormatWriter.scala:210)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:131)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/tmp/48bbb37c-f3bc-474b-8cdc-fc44d84c7d2d/create_foo_dataset.py", line 609, in generate_features
  File "/opt/conda/miniconda3/envs/foo-env/lib/python3.9/site-packages/featuretools/computational_backends/calculate_feature_matrix.py", line 288, in calculate_feature_matrix
    feature_matrix = calculate_chunk(cutoff_time=cutoff_time_to_pass,
  File "/opt/conda/miniconda3/envs/foo-env/lib/python3.9/site-packages/featuretools/computational_backends/calculate_feature_matrix.py", line 363, in calculate_chunk
    _feature_matrix = calculator.run(ids,
  File "/opt/conda/miniconda3/envs/foo-env/lib/python3.9/site-packages/featuretools/computational_backends/feature_set_calculator.py", line 99, in run
    assert len(instance_ids) > 0, "0 instance ids provided"
AssertionError: 0 instance ids provided

Pandas DataFrame input to generate_features() that triggers this error always has length 1 and does not seem to be special in any way (has all necessary columns and does not contain NaN's).

After 4 consecutive task losses with the same input Spark job is killed.

I wasn't able to reproduce this error using the same input DataFrame (of size 1) in local Spark runtime, so it doesn't seem to be a data issue.

Expected Output

I would expect featuretools library to have the same behavior locally and on spark cluster.

Output of featuretools.show_info() (Dataproc)

Featuretools version: 0.26.2 Featuretools installation directory: /opt/conda/miniconda3/envs/foo-env/lib/python3.9/site-packages/featuretools SYSTEM INFO ----------- python: 3.9.5.final.0 python-bits: 64 OS: Linux OS-release: 5.10.0-0.bpo.8-amd64 machine: x86_64 processor: byteorder: little LC_ALL: None LANG: C.UTF-8 LOCALE: en_US.UTF-8 INSTALLED VERSIONS ------------------ numpy: 1.21.0 pandas: 1.2.4 tqdm: 4.62.3 PyYAML: 6.0 cloudpickle: 2.0.0 dask: 2021.11.2 distributed: 2021.11.2 psutil: 5.8.0 pip: 21.3.1 setuptools: 59.2.0

Output of featuretools.show_info() (local)

Featuretools version: 0.26.2 Featuretools installation directory: /home/user/miniconda3/envs/foo-env/lib/python3.7/site-packages/featuretools SYSTEM INFO ----------- python: 3.7.12.final.0 python-bits: 64 OS: Linux OS-release: 5.11.0-40-generic machine: x86_64 processor: x86_64 byteorder: little LC_ALL: None LANG: en_US.UTF-8 LOCALE: en_US.UTF-8 INSTALLED VERSIONS ------------------ numpy: 1.19.5 pandas: 1.3.1 tqdm: 4.62.3 PyYAML: 6.0 cloudpickle: 2.0.0 dask: 2021.11.2 distributed: 2021.11.2 psutil: 5.8.0 pip: 21.2.2 setuptools: 59.2.0
gsheni commented 2 years ago

@dnmca thanks for raising the issue. We will look into this.