projectglow / glow

An open-source toolkit for large-scale genomic analysis
https://projectglow.io
Apache License 2.0
273 stars 111 forks source link

PySpark and ndarray #360

Closed EpiSlim closed 8 months ago

EpiSlim commented 3 years ago

Hi all - I ran into a TypeError: not supported type: <class 'numpy.ndarray'> error when reproducing the examples in docs/source/tertiary/regression-tests.rst for binary phenotypes (see error stack below).

log_reg_df = glow.gwas.logistic_regression(
    genotypes,
    binary_phenotypes,
    covariates,
    correction='approx-firth',
    pvalue_threshold=0.05,
    values_column='gt'
  )

The error seems to be related to the reshape_for_gwas function, and more specifically this line https://github.com/projectglow/glow/blob/93851b96b267e8082996388e43d9f7f91a88d892/python/glow/wgr/wgr_functions.py#L209 I have pyspark 3.0.0 and the cmd for older versions values = transposed_df.to_numpy().tolist() seems to be working fine. Is this a bug or am I missing something? Thanks.

Error stack:

--------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
~/anaconda3/envs/glow/lib/python3.7/site-packages/pyspark/sql/types.py in _infer_type(obj)
   1040         try:
-> 1041             return _infer_schema(obj)
   1042         except TypeError:

~/anaconda3/envs/glow/lib/python3.7/site-packages/pyspark/sql/types.py in _infer_schema(row, names)
   1066     else:
-> 1067         raise TypeError("Can not infer schema for type: %s" % type(row))
   1068 

TypeError: Can not infer schema for type: <class 'numpy.ndarray'>

During handling of the above exception, another exception occurred:

TypeError                                 Traceback (most recent call last)
<ipython-input-11-d9b7575cf60f> in <module>
      5     correction='approx-firth',
      6     pvalue_threshold=0.05,
----> 7     values_column='gt'
      8   )

~/anaconda3/envs/glow/lib/python3.7/site-packages/typeguard/__init__.py in wrapper(*args, **kwargs)
    838         memo = _CallMemo(python_func, _localns, args=args, kwargs=kwargs)
    839         check_argument_types(memo)
--> 840         retval = func(*args, **kwargs)
    841         check_return_type(retval, memo)
    842 

~/anaconda3/envs/glow/lib/python3.7/site-packages/glow/gwas/log_reg.py in logistic_regression(genotype_df, phenotype_df, covariate_df, offset_df, correction, pvalue_threshold, contigs, add_intercept, values_column, dt)
    125 
    126     state = _create_log_reg_state(spark, phenotype_df, offset_df, sql_type, C, correction,
--> 127                                   add_intercept, contigs)
    128 
    129     phenotype_names = phenotype_df.columns.to_series().astype('str')

~/anaconda3/envs/glow/lib/python3.7/site-packages/typeguard/__init__.py in wrapper(*args, **kwargs)
    838         memo = _CallMemo(python_func, _localns, args=args, kwargs=kwargs)
    839         check_argument_types(memo)
--> 840         retval = func(*args, **kwargs)
    841         check_return_type(retval, memo)
    842 

~/anaconda3/envs/glow/lib/python3.7/site-packages/glow/gwas/log_reg.py in _create_log_reg_state(spark, phenotype_df, offset_df, sql_type, C, correction, add_intercept, contigs)
    222     if offset_type == gwas_fx._OffsetType.LOCO_OFFSET and contigs is not None:
    223         offset_df = offset_df.loc[pd.IndexSlice[:, contigs], :]
--> 224     pivoted_phenotype_df = reshape_for_gwas(spark, phenotype_df)
    225     result_fields = [
    226         StructField('label', StringType()),

~/anaconda3/envs/glow/lib/python3.7/site-packages/glow/wgr/wgr_functions.py in reshape_for_gwas(spark, label_df)
    209         values = list(transposed_df.to_numpy())
    210     transposed_df['values_array'] = values
--> 211     return spark.createDataFrame(transposed_df[['values_array']].reset_index(), column_names)
    212 
    213 

~/anaconda3/envs/glow/lib/python3.7/site-packages/pyspark/sql/session.py in createDataFrame(self, data, schema, samplingRatio, verifySchema)
    599             # Create a DataFrame from pandas DataFrame.
    600             return super(SparkSession, self).createDataFrame(
--> 601                 data, schema, samplingRatio, verifySchema)
    602         return self._create_dataframe(data, schema, samplingRatio, verifySchema)
    603 

~/anaconda3/envs/glow/lib/python3.7/site-packages/pyspark/sql/pandas/conversion.py in createDataFrame(self, data, schema, samplingRatio, verifySchema)
    298                     raise
    299         data = self._convert_from_pandas(data, schema, timezone)
--> 300         return self._create_dataframe(data, schema, samplingRatio, verifySchema)
    301 
    302     def _convert_from_pandas(self, pdf, schema, timezone):

~/anaconda3/envs/glow/lib/python3.7/site-packages/pyspark/sql/session.py in _create_dataframe(self, data, schema, samplingRatio, verifySchema)
    625             rdd, schema = self._createFromRDD(data.map(prepare), schema, samplingRatio)
    626         else:
--> 627             rdd, schema = self._createFromLocal(map(prepare, data), schema)
    628         jrdd = self._jvm.SerDeUtil.toJavaArray(rdd._to_java_object_rdd())
    629         jdf = self._jsparkSession.applySchemaToPythonRDD(jrdd.rdd(), schema.json())

~/anaconda3/envs/glow/lib/python3.7/site-packages/pyspark/sql/session.py in _createFromLocal(self, data, schema)
    446 
    447         if schema is None or isinstance(schema, (list, tuple)):
--> 448             struct = self._inferSchemaFromList(data, names=schema)
    449             converter = _create_converter(struct)
    450             data = map(converter, data)

~/anaconda3/envs/glow/lib/python3.7/site-packages/pyspark/sql/session.py in _inferSchemaFromList(self, data, names)
    378             warnings.warn("inferring schema from dict is deprecated,"
    379                           "please use pyspark.sql.Row instead")
--> 380         schema = reduce(_merge_type, (_infer_schema(row, names) for row in data))
    381         if _has_nulltype(schema):
    382             raise ValueError("Some of types cannot be determined after inferring")

~/anaconda3/envs/glow/lib/python3.7/site-packages/pyspark/sql/session.py in <genexpr>(.0)
    378             warnings.warn("inferring schema from dict is deprecated,"
    379                           "please use pyspark.sql.Row instead")
--> 380         schema = reduce(_merge_type, (_infer_schema(row, names) for row in data))
    381         if _has_nulltype(schema):
    382             raise ValueError("Some of types cannot be determined after inferring")

~/anaconda3/envs/glow/lib/python3.7/site-packages/pyspark/sql/types.py in _infer_schema(row, names)
   1067         raise TypeError("Can not infer schema for type: %s" % type(row))
   1068 
-> 1069     fields = [StructField(k, _infer_type(v), True) for k, v in items]
   1070     return StructType(fields)
   1071 

~/anaconda3/envs/glow/lib/python3.7/site-packages/pyspark/sql/types.py in <listcomp>(.0)
   1067         raise TypeError("Can not infer schema for type: %s" % type(row))
   1068 
-> 1069     fields = [StructField(k, _infer_type(v), True) for k, v in items]
   1070     return StructType(fields)
   1071 

~/anaconda3/envs/glow/lib/python3.7/site-packages/pyspark/sql/types.py in _infer_type(obj)
   1041             return _infer_schema(obj)
   1042         except TypeError:
-> 1043             raise TypeError("not supported type: %s" % type(obj))
   1044 
   1045 

TypeError: not supported type: <class 'numpy.ndarray'>
kianfar77 commented 3 years ago

Hi @EpiSlim Are your running the whole doctest code in the same sequence using the same data (the invisible and visible code blocks in docs/source/tertiary/regression-tests.rst)?

TomohiroUchida commented 3 years ago

Hi, I faced similar issue too.

I tried glow/docs/source/_static/notebooks/tertiary/gwas-binary.html on both Databricks runtime 7.6 and my local cluster (I tested it using spark-3.0.0, spark-3.0.1 , spark-3.0.2).

I could run the notebook on Databricks environment. But couldn't run it on my local cluster.

I couldn't run below code on my cluster(see error stack below).

results = glow.gwas.logistic_regression(
   variant_df.where(fx.col('contigName') == '21'),
   phenotype_df
)

Error stack:

---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
/usr/local/spark-3.0.2/python/pyspark/sql/types.py in _infer_type(obj)
   1040         try:
-> 1041             return _infer_schema(obj)
   1042         except TypeError:

/usr/local/spark-3.0.2/python/pyspark/sql/types.py in _infer_schema(row, names)
   1066     else:
-> 1067         raise TypeError("Can not infer schema for type: %s" % type(row))
   1068 

TypeError: Can not infer schema for type: <class 'numpy.ndarray'>

During handling of the above exception, another exception occurred:

TypeError                                 Traceback (most recent call last)
<ipython-input-24-e6f2f92fc33d> in <module>
      1 results = glow.gwas.logistic_regression(
      2    variant_df.where(fx.col('contigName') == '21'),
----> 3    phenotype_df
      4 )

/usr/local/miniconda3/envs/spark-3.0.2/lib/python3.7/site-packages/typeguard/__init__.py in wrapper(*args, **kwargs)
    838         memo = _CallMemo(python_func, _localns, args=args, kwargs=kwargs)
    839         check_argument_types(memo)
--> 840         retval = func(*args, **kwargs)
    841         check_return_type(retval, memo)
    842 

/usr/local/miniconda3/envs/spark-3.0.2/lib/python3.7/site-packages/glow/gwas/log_reg.py in logistic_regression(genotype_df, phenotype_df, covariate_df, offset_df, correction, pvalue_threshold, contigs, add_intercept, values_column, dt)
    125 
    126     state = _create_log_reg_state(spark, phenotype_df, offset_df, sql_type, C, correction,
--> 127                                   add_intercept, contigs)
    128 
    129     phenotype_names = phenotype_df.columns.to_series().astype('str')

/usr/local/miniconda3/envs/spark-3.0.2/lib/python3.7/site-packages/glow/gwas/log_reg.py in _create_log_reg_state(spark, phenotype_df, offset_df, sql_type, C, correction, add_intercept, contigs)
    222     if offset_type == gwas_fx._OffsetType.LOCO_OFFSET and contigs is not None:
    223         offset_df = offset_df.loc[pd.IndexSlice[:, contigs], :]
--> 224     pivoted_phenotype_df = reshape_for_gwas(spark, phenotype_df)
    225     result_fields = [
    226         StructField('label', StringType()),

/usr/local/miniconda3/envs/spark-3.0.2/lib/python3.7/site-packages/glow/wgr/wgr_functions.py in reshape_for_gwas(spark, label_df)
    209         values = list(transposed_df.to_numpy())
    210     transposed_df['values_array'] = values
--> 211     return spark.createDataFrame(transposed_df[['values_array']].reset_index(), column_names)
    212 
    213 

/usr/local/spark-3.0.2/python/pyspark/sql/session.py in createDataFrame(self, data, schema, samplingRatio, verifySchema)
    602             # Create a DataFrame from pandas DataFrame.
    603             return super(SparkSession, self).createDataFrame(
--> 604                 data, schema, samplingRatio, verifySchema)
    605         return self._create_dataframe(data, schema, samplingRatio, verifySchema)
    606 

/usr/local/spark-3.0.2/python/pyspark/sql/pandas/conversion.py in createDataFrame(self, data, schema, samplingRatio, verifySchema)
    298                     raise
    299         data = self._convert_from_pandas(data, schema, timezone)
--> 300         return self._create_dataframe(data, schema, samplingRatio, verifySchema)
    301 
    302     def _convert_from_pandas(self, pdf, schema, timezone):

/usr/local/spark-3.0.2/python/pyspark/sql/session.py in _create_dataframe(self, data, schema, samplingRatio, verifySchema)
    628             rdd, schema = self._createFromRDD(data.map(prepare), schema, samplingRatio)
    629         else:
--> 630             rdd, schema = self._createFromLocal(map(prepare, data), schema)
    631         jrdd = self._jvm.SerDeUtil.toJavaArray(rdd._to_java_object_rdd())
    632         jdf = self._jsparkSession.applySchemaToPythonRDD(jrdd.rdd(), schema.json())

/usr/local/spark-3.0.2/python/pyspark/sql/session.py in _createFromLocal(self, data, schema)
    449 
    450         if schema is None or isinstance(schema, (list, tuple)):
--> 451             struct = self._inferSchemaFromList(data, names=schema)
    452             converter = _create_converter(struct)
    453             data = map(converter, data)

/usr/local/spark-3.0.2/python/pyspark/sql/session.py in _inferSchemaFromList(self, data, names)
    381             warnings.warn("inferring schema from dict is deprecated,"
    382                           "please use pyspark.sql.Row instead")
--> 383         schema = reduce(_merge_type, (_infer_schema(row, names) for row in data))
    384         if _has_nulltype(schema):
    385             raise ValueError("Some of types cannot be determined after inferring")

/usr/local/spark-3.0.2/python/pyspark/sql/session.py in <genexpr>(.0)
    381             warnings.warn("inferring schema from dict is deprecated,"
    382                           "please use pyspark.sql.Row instead")
--> 383         schema = reduce(_merge_type, (_infer_schema(row, names) for row in data))
    384         if _has_nulltype(schema):
    385             raise ValueError("Some of types cannot be determined after inferring")

/usr/local/spark-3.0.2/python/pyspark/sql/types.py in _infer_schema(row, names)
   1067         raise TypeError("Can not infer schema for type: %s" % type(row))
   1068 
-> 1069     fields = [StructField(k, _infer_type(v), True) for k, v in items]
   1070     return StructType(fields)
   1071 

/usr/local/spark-3.0.2/python/pyspark/sql/types.py in <listcomp>(.0)
   1067         raise TypeError("Can not infer schema for type: %s" % type(row))
   1068 
-> 1069     fields = [StructField(k, _infer_type(v), True) for k, v in items]
   1070     return StructType(fields)
   1071 

/usr/local/spark-3.0.2/python/pyspark/sql/types.py in _infer_type(obj)
   1041             return _infer_schema(obj)
   1042         except TypeError:
-> 1043             raise TypeError("not supported type: %s" % type(obj))
   1044 
   1045 

TypeError: not supported type: <class 'numpy.ndarray'>

Is this a bug ?

Thanks.

TomohiroUchida commented 3 years ago

Hi, I faced similar issue too.

I tried glow/docs/source/_static/notebooks/tertiary/gwas-binary.html on both Databricks runtime 7.6 and my local cluster (I tested it using spark-3.0.0, spark-3.0.1 , spark-3.0.2).

I could run the notebook on Databricks environment. But couldn't run it on my local cluster.

I couldn't run below code on my cluster(see error stack below).

results = glow.gwas.logistic_regression(
   variant_df.where(fx.col('contigName') == '21'),
   phenotype_df
)

Error stack:

---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
/usr/local/spark-3.0.2/python/pyspark/sql/types.py in _infer_type(obj)
   1040         try:
-> 1041             return _infer_schema(obj)
   1042         except TypeError:

/usr/local/spark-3.0.2/python/pyspark/sql/types.py in _infer_schema(row, names)
   1066     else:
-> 1067         raise TypeError("Can not infer schema for type: %s" % type(row))
   1068 

TypeError: Can not infer schema for type: <class 'numpy.ndarray'>

During handling of the above exception, another exception occurred:

TypeError                                 Traceback (most recent call last)
<ipython-input-24-e6f2f92fc33d> in <module>
      1 results = glow.gwas.logistic_regression(
      2    variant_df.where(fx.col('contigName') == '21'),
----> 3    phenotype_df
      4 )

/usr/local/miniconda3/envs/spark-3.0.2/lib/python3.7/site-packages/typeguard/__init__.py in wrapper(*args, **kwargs)
    838         memo = _CallMemo(python_func, _localns, args=args, kwargs=kwargs)
    839         check_argument_types(memo)
--> 840         retval = func(*args, **kwargs)
    841         check_return_type(retval, memo)
    842 

/usr/local/miniconda3/envs/spark-3.0.2/lib/python3.7/site-packages/glow/gwas/log_reg.py in logistic_regression(genotype_df, phenotype_df, covariate_df, offset_df, correction, pvalue_threshold, contigs, add_intercept, values_column, dt)
    125 
    126     state = _create_log_reg_state(spark, phenotype_df, offset_df, sql_type, C, correction,
--> 127                                   add_intercept, contigs)
    128 
    129     phenotype_names = phenotype_df.columns.to_series().astype('str')

/usr/local/miniconda3/envs/spark-3.0.2/lib/python3.7/site-packages/glow/gwas/log_reg.py in _create_log_reg_state(spark, phenotype_df, offset_df, sql_type, C, correction, add_intercept, contigs)
    222     if offset_type == gwas_fx._OffsetType.LOCO_OFFSET and contigs is not None:
    223         offset_df = offset_df.loc[pd.IndexSlice[:, contigs], :]
--> 224     pivoted_phenotype_df = reshape_for_gwas(spark, phenotype_df)
    225     result_fields = [
    226         StructField('label', StringType()),

/usr/local/miniconda3/envs/spark-3.0.2/lib/python3.7/site-packages/glow/wgr/wgr_functions.py in reshape_for_gwas(spark, label_df)
    209         values = list(transposed_df.to_numpy())
    210     transposed_df['values_array'] = values
--> 211     return spark.createDataFrame(transposed_df[['values_array']].reset_index(), column_names)
    212 
    213 

/usr/local/spark-3.0.2/python/pyspark/sql/session.py in createDataFrame(self, data, schema, samplingRatio, verifySchema)
    602             # Create a DataFrame from pandas DataFrame.
    603             return super(SparkSession, self).createDataFrame(
--> 604                 data, schema, samplingRatio, verifySchema)
    605         return self._create_dataframe(data, schema, samplingRatio, verifySchema)
    606 

/usr/local/spark-3.0.2/python/pyspark/sql/pandas/conversion.py in createDataFrame(self, data, schema, samplingRatio, verifySchema)
    298                     raise
    299         data = self._convert_from_pandas(data, schema, timezone)
--> 300         return self._create_dataframe(data, schema, samplingRatio, verifySchema)
    301 
    302     def _convert_from_pandas(self, pdf, schema, timezone):

/usr/local/spark-3.0.2/python/pyspark/sql/session.py in _create_dataframe(self, data, schema, samplingRatio, verifySchema)
    628             rdd, schema = self._createFromRDD(data.map(prepare), schema, samplingRatio)
    629         else:
--> 630             rdd, schema = self._createFromLocal(map(prepare, data), schema)
    631         jrdd = self._jvm.SerDeUtil.toJavaArray(rdd._to_java_object_rdd())
    632         jdf = self._jsparkSession.applySchemaToPythonRDD(jrdd.rdd(), schema.json())

/usr/local/spark-3.0.2/python/pyspark/sql/session.py in _createFromLocal(self, data, schema)
    449 
    450         if schema is None or isinstance(schema, (list, tuple)):
--> 451             struct = self._inferSchemaFromList(data, names=schema)
    452             converter = _create_converter(struct)
    453             data = map(converter, data)

/usr/local/spark-3.0.2/python/pyspark/sql/session.py in _inferSchemaFromList(self, data, names)
    381             warnings.warn("inferring schema from dict is deprecated,"
    382                           "please use pyspark.sql.Row instead")
--> 383         schema = reduce(_merge_type, (_infer_schema(row, names) for row in data))
    384         if _has_nulltype(schema):
    385             raise ValueError("Some of types cannot be determined after inferring")

/usr/local/spark-3.0.2/python/pyspark/sql/session.py in <genexpr>(.0)
    381             warnings.warn("inferring schema from dict is deprecated,"
    382                           "please use pyspark.sql.Row instead")
--> 383         schema = reduce(_merge_type, (_infer_schema(row, names) for row in data))
    384         if _has_nulltype(schema):
    385             raise ValueError("Some of types cannot be determined after inferring")

/usr/local/spark-3.0.2/python/pyspark/sql/types.py in _infer_schema(row, names)
   1067         raise TypeError("Can not infer schema for type: %s" % type(row))
   1068 
-> 1069     fields = [StructField(k, _infer_type(v), True) for k, v in items]
   1070     return StructType(fields)
   1071 

/usr/local/spark-3.0.2/python/pyspark/sql/types.py in <listcomp>(.0)
   1067         raise TypeError("Can not infer schema for type: %s" % type(row))
   1068 
-> 1069     fields = [StructField(k, _infer_type(v), True) for k, v in items]
   1070     return StructType(fields)
   1071 

/usr/local/spark-3.0.2/python/pyspark/sql/types.py in _infer_type(obj)
   1041             return _infer_schema(obj)
   1042         except TypeError:
-> 1043             raise TypeError("not supported type: %s" % type(obj))
   1044 
   1045 

TypeError: not supported type: <class 'numpy.ndarray'>

Is this a bug ?

Thanks.

Hi!

I can now run logistic regression using below when pyspark start.

--conf spark.sql.execution.arrow.pyspark.enabled=true

Thank you!