evidentlyai / evidently

Evidently is ​​an open-source ML and LLM observability framework. Evaluate, test, and monitor any AI-powered system or data pipeline. From tabular data to Gen AI. 100+ metrics.
https://www.evidentlyai.com/evidently-oss
Apache License 2.0
5.4k stars 597 forks source link

Report rendering fails while using `SparkEngine` if column_mapping.datetime is not `None` #861

Closed NazyS closed 11 months ago

NazyS commented 12 months ago

Environment

evidently==0.4.9

How to reproduce

Run spark example notebook with following modifications:

def spark_data_generation(n_rows=3000, n_columns=5):
    return spark.range(n_rows).select(
        sf.col("id").cast("timestamp").alias("timestamp"),            # add timestamp col
        *[sf.rand().alias(str(i)) for i in range(n_columns)]
    )

...

column_mapping = ColumnMapping()
column_mapping.datetime = 'timestamp'                               # add datetime index col by mapping

spark_drift_report = Report(metrics=[
    DatasetDriftMetric(),
    DataDriftTable()
])

spark_drift_report.run(reference_data=reference, current_data=current, column_mapping=column_mapping,
                 engine=SparkEngine)

Report executes successfully, but cannot be rendered:

spark_drift_report.show(mode='inline')

returns following errors:

---------------------------------------------------------------------------
AnalysisException                         Traceback (most recent call last)
File <command-731577770872362>, line 1
----> 1 spark_drift_report.show(mode='inline')

File /local_disk0/.ephemeral_nfs/envs/pythonEnv-905dc614-3cfb-4e6f-838e-f0c689eaba24/lib/python3.10/site-packages/evidently/suite/base_suite.py:160, in Display.show(self, mode)
    159 def show(self, mode="auto"):
--> 160     dashboard_id, dashboard_info, graphs = self._build_dashboard_info()
    161     template_params = TemplateParams(
    162         dashboard_id=dashboard_id,
    163         dashboard_info=dashboard_info,
    164         additional_graphs=graphs,
    165     )
    166     # pylint: disable=import-outside-toplevel

File /local_disk0/.ephemeral_nfs/envs/pythonEnv-905dc614-3cfb-4e6f-838e-f0c689eaba24/lib/python3.10/site-packages/evidently/report/report.py:204, in Report._build_dashboard_info(self)
    202 # set the color scheme from the report for each render
    203 renderer.color_options = color_options
--> 204 html_info = renderer.render_html(test)
    206 for info_item in html_info:
    207     for additional_graph in info_item.get_additional_graphs():

File /local_disk0/.ephemeral_nfs/envs/pythonEnv-905dc614-3cfb-4e6f-838e-f0c689eaba24/lib/python3.10/site-packages/evidently/metrics/data_drift/dataset_drift_metric.py:111, in DataDriftMetricsRenderer.render_html(self, obj)
    110 def render_html(self, obj: DatasetDriftMetric) -> List[BaseWidgetInfo]:
--> 111     result = obj.get_result()
    113     if result.dataset_drift:
    114         drift_detected = "detected"

File /local_disk0/.ephemeral_nfs/envs/pythonEnv-905dc614-3cfb-4e6f-838e-f0c689eaba24/lib/python3.10/site-packages/evidently/base_metric.py:223, in Metric.get_result(self)
    221 result = self._context.metric_results.get(self, None)
    222 if isinstance(result, ErrorResult):
--> 223     raise result.exception
    224 if result is None:
    225     raise ValueError(f"No result found for metric {self} of type {type(self).__name__}")

File /local_disk0/.ephemeral_nfs/envs/pythonEnv-905dc614-3cfb-4e6f-838e-f0c689eaba24/lib/python3.10/site-packages/evidently/calculation_engine/engine.py:42, in Engine.execute_metrics(self, context, data)
     40 logging.debug(f"Executing {type(calculation)}...")
     41 try:
---> 42     calculations[metric] = calculation.calculate(context, converted_data)
     43 except BaseException as ex:
     44     calculations[metric] = ErrorResult(exception=ex)

File /local_disk0/.ephemeral_nfs/envs/pythonEnv-905dc614-3cfb-4e6f-838e-f0c689eaba24/lib/python3.10/site-packages/evidently/spark/metrics/data_drift.py:99, in SparkDatasetDriftMetric.calculate(self, context, data)
     96 if data.reference_data is None:
     97     raise ValueError("Reference dataset should be present")
---> 99 result = get_drift_for_columns(
    100     current_data=data.current_data,
    101     reference_data=data.reference_data,
    102     data_drift_options=self.metric.drift_options,
    103     data_definition=data.data_definition,
    104     column_mapping=data.column_mapping,
    105     columns=self.metric.columns,
    106 )
    107 return DatasetDriftMetricResults(
    108     drift_share=self.metric.drift_share,
    109     number_of_columns=result.number_of_columns,
   (...)
    112     dataset_drift=result.dataset_drift,
    113 )

File /local_disk0/.ephemeral_nfs/envs/pythonEnv-905dc614-3cfb-4e6f-838e-f0c689eaba24/lib/python3.10/site-packages/evidently/spark/calculations/data_drift.py:224, in get_drift_for_columns(current_data, reference_data, data_definition, column_mapping, data_drift_options, drift_share_threshold, columns)
    221 drift_by_columns: Dict[str, ColumnDataDriftMetrics] = {}
    223 for column_name in columns:
--> 224     drift_by_columns[column_name] = get_one_column_drift(
    225         current_feature_data=current_data,
    226         reference_feature_data=reference_data,
    227         datetime_column=datetime_column_name,
    228         column=ColumnName.from_any(column_name),
    229         options=data_drift_options,
    230         data_definition=data_definition,
    231         column_type=data_definition.get_column(column_name).column_type,
    232     )
    234 dataset_drift = get_dataset_drift(drift_by_columns, drift_share_threshold)
    235 return DatasetDriftMetrics(
    236     number_of_columns=len(columns),
    237     number_of_drifted_columns=dataset_drift.number_of_drifted_columns,
   (...)
    251     ),
    252 )

File /local_disk0/.ephemeral_nfs/envs/pythonEnv-905dc614-3cfb-4e6f-838e-f0c689eaba24/lib/python3.10/site-packages/evidently/spark/calculations/data_drift.py:117, in get_one_column_drift(current_feature_data, reference_feature_data, datetime_column, column, options, data_definition, column_type)
    114 reference_small_distribution = get_histogram(reference_column, column.name, current_nbinsx, density=True)
    115 current_scatter = {}
--> 117 df, prefix = prepare_df_for_time_index_plot(
    118     current_column,
    119     column.name,
    120     datetime_column,
    121 )
    122 current_scatter["current (mean)"] = df
    123 if prefix is None:

File /local_disk0/.ephemeral_nfs/envs/pythonEnv-905dc614-3cfb-4e6f-838e-f0c689eaba24/lib/python3.10/site-packages/evidently/spark/visualizations.py:40, in prepare_df_for_time_index_plot(df, column_name, datetime_name)
     35 else:
     36     period_col = sf.date_format(date_col, pattern).alias(PERIOD_COL)
     37 plot_df = (
     38     df.select(column_name, period_col)
     39     .groupby(period_col)
---> 40     .agg(sf.mean(column_name).alias("mean"), sf.stddev_pop(column_name).alias("std"))
     41 )
     42 if pattern == "week":
     43     split = sf.split(PERIOD_COL, "-")

File /databricks/spark/python/pyspark/instrumentation_utils.py:48, in _wrap_function.<locals>.wrapper(*args, **kwargs)
     46 start = time.perf_counter()
     47 try:
---> 48     res = func(*args, **kwargs)
     49     logger.log_success(
     50         module_name, class_name, function_name, time.perf_counter() - start, signature
     51     )
     52     return res

File /databricks/spark/python/pyspark/sql/group.py:188, in GroupedData.agg(self, *exprs)
    186     assert all(isinstance(c, Column) for c in exprs), "all exprs should be Column"
    187     exprs = cast(Tuple[Column, ...], exprs)
--> 188     jdf = self._jgd.agg(exprs[0]._jc, _to_seq(self.session._sc, [c._jc for c in exprs[1:]]))
    189 return DataFrame(jdf, self.session)

File /databricks/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py:1322, in JavaMember.__call__(self, *args)
   1316 command = proto.CALL_COMMAND_NAME +\
   1317     self.command_header +\
   1318     args_command +\
   1319     proto.END_COMMAND_PART
   1321 answer = self.gateway_client.send_command(command)
-> 1322 return_value = get_return_value(
   1323     answer, self.gateway_client, self.target_id, self.name)
   1325 for temp_arg in temp_args:
   1326     if hasattr(temp_arg, "_detach"):

File /databricks/spark/python/pyspark/errors/exceptions/captured.py:194, in capture_sql_exception.<locals>.deco(*a, **kw)
    190 converted = convert_exception(e.java_exception)
    191 if not isinstance(converted, UnknownException):
    192     # Hide where the exception came from that shows a non-Pythonic
    193     # JVM exception message.
--> 194     raise converted from None
    195 else:
    196     raise

AnalysisException: [UNRESOLVED_COLUMN.WITH_SUGGESTION] A column or function parameter with name `timestamp` cannot be resolved. Did you mean one of the following? [`per`, `3`].;
'Aggregate [date_format('timestamp, y, Some(Etc/UTC))], [date_format('timestamp, y, Some(Etc/UTC)) AS per#61457, avg(3#61161) AS mean#61463, stddev_pop(3#61161) AS std#61473]
+- Project [3#61161, date_format(timestamp#47659, y, Some(Etc/UTC)) AS per#61457]
   +- Filter atleastnnonnulls(1, 3#61161)
      +- Project [timestamp#47659, CASE WHEN (0#47660 = -Infinity) THEN cast(NaN as double) WHEN (0#47660 = Infinity) THEN cast(NaN as double) ELSE 0#47660 END AS 0#61158, CASE WHEN (1#47661 = -Infinity) THEN cast(NaN as double) WHEN (1#47661 = Infinity) THEN cast(NaN as double) ELSE 1#47661 END AS 1#61159, CASE WHEN (2#47662 = -Infinity) THEN cast(NaN as double) WHEN (2#47662 = Infinity) THEN cast(NaN as double) ELSE 2#47662 END AS 2#61160, CASE WHEN (3#47663 = -Infinity) THEN cast(NaN as double) WHEN (3#47663 = Infinity) THEN cast(NaN as double) ELSE 3#47663 END AS 3#61161, CASE WHEN (4#47664 = -Infinity) THEN cast(NaN as double) WHEN (4#47664 = Infinity) THEN cast(NaN as double) ELSE 4#47664 END AS 4#61162]
         +- Project [cast(id#47657L as timestamp) AS timestamp#47659, rand(6431941843633845424) AS 0#47660, rand(3495374954888763826) AS 1#47661, rand(4423528527585211611) AS 2#47662, rand(8462703941357656504) AS 3#47663, rand(-7913456532449328831) AS 4#47664]
            +- Range (0, 3000, step=1, splits=Some(8))

Proposed Fix

Looks like potential fix is simple. Remove unnecessary call to not selected column: https://github.com/evidentlyai/evidently/blob/2fb9dbe2cdfc619498d1804b7e52e9c95dd8d0e2/src/evidently/spark/visualizations.py#L37-L41

plot_df = (
    df.groupby(period_col)
    .agg(sf.mean(column_name).alias("mean"), sf.stddev_pop(column_name).alias("std"))
)

and also add conversion to Pandas dataframe: https://github.com/evidentlyai/evidently/blob/2fb9dbe2cdfc619498d1804b7e52e9c95dd8d0e2/src/evidently/spark/visualizations.py#L42-L47

if pattern == "week":
    ...
else:
    plot_df = plot_df.toPandas()

PS

It looks like this line https://github.com/evidentlyai/evidently/blob/2fb9dbe2cdfc619498d1804b7e52e9c95dd8d0e2/src/evidently/spark/visualizations.py#L47 introduces duplicated columns with PERIOD_COL name. Might be undesirable

mike0sv commented 11 months ago

Hey @NazyS ! Thanks for the catch! It should be fixed in https://github.com/evidentlyai/evidently/pull/865 - can you confirm that it works for you?