databrickslabs / tempo

API for manipulating time series on top of Apache Spark: lagged time values, rolling statistics (mean, avg, sum, count, etc), AS OF joins, downsampling, and interpolation
https://pypi.org/project/dbl-tempo
Other
309 stars 53 forks source link

data type mismatch exception from withRangeStats function #286

Open padamshrestha opened 1 year ago

padamshrestha commented 1 year ago

I'm getting data type mismatch from withRangeStats while using the sample dataset.

python version: 3.10.6 Spark/PySpark version: 3.3.1 delta lake version: 2.2.0 tempo version: commit from 10/24/2022 0350dffb5d6d2a6d7d138036738e07ed3411a2a2

Usage: moving_avg = trades_tsdf.withRangeStats("trade_pr", rangeBackWindowSecs=600).df

Error: { "name": "AnalysisException", "message": "cannot resolve '(PARTITION BY spark_catalog.tempo.trades.date, spark_catalog.tempo.trades.symbol ORDER BY spark_catalog.tempo.trades.event_ts ASC NULLS FIRST RANGE BETWEEN -600L FOLLOWING AND CURRENT ROW)' due to data type mismatch: The data type 'timestamp' used in the order specification does not match the data type 'bigint' which is used in the range frame.;\n'Project [symbol#70, event_ts#71, trade_dt#72, trade_pr#73, trade_qt#74, date#75, avg(trade_pr#73) windowspecdefinition(date#75, symbol#70, event_ts#71 ASC NULLS FIRST, specifiedwindowframe(RangeFrame, -600, currentrow$())) AS mean_trade_pr#4778, count(trade_pr#73) windowspecdefinition(date#75, symbol#70, event_ts#71 ASC NULLS FIRST, specifiedwindowframe(RangeFrame, -600, currentrow$())) AS count_trade_pr#4780, min(trade_pr#73) windowspecdefinition(date#75, symbol#70, event_ts#71 ASC NULLS FIRST, specifiedwindowframe(RangeFrame, -600, currentrow$())) AS min_trade_pr#4782, max(trade_pr#73) windowspecdefinition(date#75, symbol#70, event_ts#71 ASC NULLS FIRST, specifiedwindowframe(RangeFrame, -600, currentrow$())) AS max_trade_pr#4784, sum(trade_pr#73) windowspecdefinition(date#75, symbol#70, event_ts#71 ASC NULLS FIRST, specifiedwindowframe(RangeFrame, -600, currentrow$())) AS sum_trade_pr#4786, stddev_samp(trade_pr#73) windowspecdefinition(date#75, symbol#70, event_ts#71 ASC NULLS FIRST, specifiedwindowframe(RangeFrame, -600, currentrow$())) AS stddev_trade_pr#4796]\n+- SubqueryAlias spark_catalog.tempo.trades\n +- Relation tempo.trades[symbol#70,event_ts#71,trade_dt#72,trade_pr#73,trade_qt#74,date#75] parquet\n", "stack": "\u001b[0;31m---------------------------------------------------------------------------\u001b[0m\n\u001b[0;31mAnalysisException\u001b[0m Traceback (most recent call last)\nCell \u001b[0;32mIn[25], line 2\u001b[0m\n\u001b[1;32m 1\u001b[0m \u001b[39m# DBTITLE 1,Simple Moving Average with Tempo\u001b[39;00m\n\u001b[0;32m----> 2\u001b[0m moving_avg \u001b[39m=\u001b[39m trades_tsdf\u001b[39m.\u001b[39;49mwithRangeStats(colsToSummarize\u001b[39m=\u001b[39;49m[\u001b[39m\"\u001b[39;49m\u001b[39mtrade_pr\u001b[39;49m\u001b[39m\"\u001b[39;49m],rangeBackWindowSecs\u001b[39m=\u001b[39;49m\u001b[39m600\u001b[39;49m)\u001b[39m.\u001b[39mdf\n\u001b[1;32m 3\u001b[0m \u001b[39m# output = moving_avg.select('symbol', 'event_ts', 'trade_pr', 'mean_trade_pr', 'stddev_trade_pr', 'sum_trade_pr', 'min_tradepr')\u001b[39;00m\n\nFile \u001b[0;32m/opt/workspace/notebooks/src/dbl-tempo/python/tempo/tsdf.py:1070\u001b[0m, in \u001b[0;36mTSDF.withRangeStats\u001b[0;34m(self, type, colsToSummarize, rangeBackWindowSecs)\u001b[0m\n\u001b[1;32m 1063\u001b[0m selectedCols\u001b[39m.\u001b[39mappend(f\u001b[39m.\u001b[39mstddev(metric)\u001b[39m.\u001b[39mover(w)\u001b[39m.\u001b[39malias(\u001b[39m\"\u001b[39m\u001b[39mstddev\u001b[39m\u001b[39m\"\u001b[39m \u001b[39m+\u001b[39m metric))\n\u001b[1;32m 1064\u001b[0m derivedCols\u001b[39m.\u001b[39mappend(\n\u001b[1;32m 1065\u001b[0m (\n\u001b[1;32m 1066\u001b[0m (f\u001b[39m.\u001b[39mcol(metric) \u001b[39m-\u001b[39m f\u001b[39m.\u001b[39mcol(\u001b[39m\"\u001b[39m\u001b[39mmean\u001b[39m\u001b[39m\"\u001b[39m \u001b[39m+\u001b[39m metric))\n\u001b[1;32m 1067\u001b[0m \u001b[39m/\u001b[39m f\u001b[39m.\u001b[39mcol(\u001b[39m\"\u001b[39m\u001b[39mstddev\u001b[39m\u001b[39m\"\u001b[39m \u001b[39m+\u001b[39m metric)\n\u001b[1;32m 1068\u001b[0m )\u001b[39m.\u001b[39malias(\u001b[39m\"\u001b[39m\u001b[39mzscore_\u001b[39m\u001b[39m\"\u001b[39m \u001b[39m+\u001b[39m metric)\n\u001b[1;32m 1069\u001b[0m )\n\u001b[0;32m-> 1070\u001b[0m selected_df \u001b[39m=\u001b[39m \u001b[39mself\u001b[39;49m\u001b[39m.\u001b[39;49mdf\u001b[39m.\u001b[39;49mselect(\u001b[39m\u001b[39;49mselectedCols)\n\u001b[1;32m 1071\u001b[0m summary_df \u001b[39m=\u001b[39m selected_df\u001b[39m.\u001b[39mselect(\u001b[39m\u001b[39mselected_df\u001b[39m.\u001b[39mcolumns, \u001b[39m\u001b[39mderivedCols)\u001b[39m.\u001b[39mdrop(\n\u001b[1;32m 1072\u001b[0m \u001b[39m\"\u001b[39m\u001b[39mdouble_ts\u001b[39m\u001b[39m\"\u001b[39m\n\u001b[1;32m 1073\u001b[0m )\n\u001b[1;32m 1075\u001b[0m \u001b[39mreturn\u001b[39;00m TSDF(summary_df, \u001b[39mself\u001b[39m\u001b[39m.\u001b[39mts_col, \u001b[39mself\u001b[39m\u001b[39m.\u001b[39mpartitionCols)\n\nFile \u001b[0;32m/usr/local/lib/python3.10/dist-packages/pyspark/sql/dataframe.py:2023\u001b[0m, in \u001b[0;36mDataFrame.select\u001b[0;34m(self, cols)\u001b[0m\n\u001b[1;32m 2002\u001b[0m \u001b[39mdef\u001b[39;00m \u001b[39mselect\u001b[39m(\u001b[39mself\u001b[39m, \u001b[39m\u001b[39mcols: \u001b[39m\"\u001b[39m\u001b[39mColumnOrName\u001b[39m\u001b[39m\"\u001b[39m) \u001b[39m-\u001b[39m\u001b[39m>\u001b[39m \u001b[39m\"\u001b[39m\u001b[39mDataFrame\u001b[39m\u001b[39m\"\u001b[39m: \u001b[39m# type: ignore[misc]\u001b[39;00m\n\u001b[1;32m 2003\u001b[0m \u001b[39m\"\"\"Projects a set of expressions and returns a new :class:DataFrame.\u001b[39;00m\n\u001b[1;32m 2004\u001b[0m \n\u001b[1;32m 2005\u001b[0m \u001b[39m .. versionadded:: 1.3.0\u001b[39;00m\n\u001b[0;32m (...)\u001b[0m\n\u001b[1;32m 2021\u001b[0m \u001b[39m [Row(name='Alice', age=12), Row(name='Bob', age=15)]\u001b[39;00m\n\u001b[1;32m 2022\u001b[0m \u001b[39m \"\"\"\u001b[39;00m\n\u001b[0;32m-> 2023\u001b[0m jdf \u001b[39m=\u001b[39m \u001b[39mself\u001b[39;49m\u001b[39m.\u001b[39;49m_jdf\u001b[39m.\u001b[39;49mselect(\u001b[39mself\u001b[39;49m\u001b[39m.\u001b[39;49m_jcols(\u001b[39m\u001b[39;49mcols))\n\u001b[1;32m 2024\u001b[0m \u001b[39mreturn\u001b[39;00m DataFrame(jdf, \u001b[39mself\u001b[39m\u001b[39m.\u001b[39msparkSession)\n\nFile \u001b[0;32m/usr/local/lib/python3.10/dist-packages/py4j/java_gateway.py:1321\u001b[0m, in \u001b[0;36mJavaMember.call\u001b[0;34m(self, args)\u001b[0m\n\u001b[1;32m 1315\u001b[0m command \u001b[39m=\u001b[39m proto\u001b[39m.\u001b[39mCALL_COMMAND_NAME \u001b[39m+\u001b[39m\\n\u001b[1;32m 1316\u001b[0m \u001b[39mself\u001b[39m\u001b[39m.\u001b[39mcommand_header \u001b[39m+\u001b[39m\\n\u001b[1;32m 1317\u001b[0m args_command \u001b[39m+\u001b[39m\\n\u001b[1;32m 1318\u001b[0m proto\u001b[39m.\u001b[39mEND_COMMAND_PART\n\u001b[1;32m 1320\u001b[0m answer \u001b[39m=\u001b[39m \u001b[39mself\u001b[39m\u001b[39m.\u001b[39mgateway_client\u001b[39m.\u001b[39msend_command(command)\n\u001b[0;32m-> 1321\u001b[0m return_value \u001b[39m=\u001b[39m get_return_value(\n\u001b[1;32m 1322\u001b[0m answer, \u001b[39mself\u001b[39;49m\u001b[39m.\u001b[39;49mgateway_client, \u001b[39mself\u001b[39;49m\u001b[39m.\u001b[39;49mtarget_id, \u001b[39mself\u001b[39;49m\u001b[39m.\u001b[39;49mname)\n\u001b[1;32m 1324\u001b[0m \u001b[39mfor\u001b[39;00m temp_arg \u001b[39min\u001b[39;00m temp_args:\n\u001b[1;32m 1325\u001b[0m temp_arg\u001b[39m.\u001b[39m_detach()\n\nFile \u001b[0;32m/usr/local/lib/python3.10/dist-packages/pyspark/sql/utils.py:196\u001b[0m, in \u001b[0;36mcapture_sql_exception..deco\u001b[0;34m(a, **kw)\u001b[0m\n\u001b[1;32m 192\u001b[0m converted \u001b[39m=\u001b[39m convert_exception(e\u001b[39m.\u001b[39mjava_exception)\n\u001b[1;32m 193\u001b[0m \u001b[39mif\u001b[39;00m \u001b[39mnot\u001b[39;00m \u001b[39misinstance\u001b[39m(converted, UnknownException):\n\u001b[1;32m 194\u001b[0m \u001b[39m# Hide where the exception came from that shows a non-Pythonic\u001b[39;00m\n\u001b[1;32m 195\u001b[0m \u001b[39m# JVM exception message.\u001b[39;00m\n\u001b[0;32m--> 196\u001b[0m \u001b[39mraise\u001b[39;00m converted \u001b[39mfrom\u001b[39;00m \u001b[39mNone\u001b[39m\n\u001b[1;32m 197\u001b[0m \u001b[39melse\u001b[39;00m:\n\u001b[1;32m 198\u001b[0m \u001b[39mraise\u001b[39;00m\n\n\u001b[0;31mAnalysisException\u001b[0m: cannot resolve '(PARTITION BY spark_catalog.tempo.trades.date, spark_catalog.tempo.trades.symbol ORDER BY spark_catalog.tempo.trades.event_ts ASC NULLS FIRST RANGE BETWEEN -600L FOLLOWING AND CURRENT ROW)' due to data type mismatch: The data type 'timestamp' used in the order specification does not match the data type 'bigint' which is used in the range frame.;\n'Project [symbol#70, event_ts#71, trade_dt#72, trade_pr#73, trade_qt#74, date#75, avg(trade_pr#73) windowspecdefinition(date#75, symbol#70, event_ts#71 ASC NULLS FIRST, specifiedwindowframe(RangeFrame, -600, currentrow$())) AS mean_trade_pr#4778, count(trade_pr#73) windowspecdefinition(date#75, symbol#70, event_ts#71 ASC NULLS FIRST, specifiedwindowframe(RangeFrame, -600, currentrow$())) AS count_trade_pr#4780, min(trade_pr#73) windowspecdefinition(date#75, symbol#70, event_ts#71 ASC NULLS FIRST, specifiedwindowframe(RangeFrame, -600, currentrow$())) AS min_trade_pr#4782, max(trade_pr#73) windowspecdefinition(date#75, symbol#70, event_ts#71 ASC NULLS FIRST, specifiedwindowframe(RangeFrame, -600, currentrow$())) AS max_trade_pr#4784, sum(trade_pr#73) windowspecdefinition(date#75, symbol#70, event_ts#71 ASC NULLS FIRST, specifiedwindowframe(RangeFrame, -600, currentrow$())) AS sum_trade_pr#4786, stddev_samp(trade_pr#73) windowspecdefinition(date#75, symbol#70, event_ts#71 ASC NULLS FIRST, specifiedwindowframe(RangeFrame, -600, currentrow$())) AS stddev_trade_pr#4796]\n+- SubqueryAlias spark_catalog.tempo.trades\n +- Relation tempo.trades[symbol#70,event_ts#71,trade_dt#72,trade_pr#73,trade_qt#74,date#75] parquet\n" }

rportilla-databricks commented 1 year ago

Hi @padamshrestha, thanks for the question. I've tested this on Spark 3.3.0 with the latest version of tempo and the commands below work fine (note that the trades_df and quotes_df are created by enforcing the timestamp data type for event_ts)

Can you paste the exact code you run before the withRangeStats method? Also, which Spark platform do you run this on?

padamshrestha commented 1 year ago

Hi @rportilla-databricks, thanks for the quick response. Here is the code, it's actually from example from the link you provided:

from tempo import *
from pyspark.sql.types import *
from pyspark.sql.functions import *

trade_schema = StructType([
    StructField("symbol", StringType()),
    StructField("event_ts", TimestampType()),
    StructField("trade_dt", StringType()),
    StructField("trade_pr", DoubleType()),
    StructField("trade_qt", IntegerType()),
    StructField("date", TimestampType())
])

spark.read.format("csv").schema(trade_schema).option("header", "true").option("delimiter", ",").load("ASOF_Trades.csv").withColumn("trade_qt", lit(100)).withColumn("date", col("event_ts").cast("date")).write.mode('overwrite').option("overwriteSchema", "true").saveAsTable('tempo.trades')
trades_df = spark.table("tempo.trades")

trades_tsdf = TSDF(trades_df, partition_cols = ['date', 'symbol'], ts_col = 'event_ts')

moving_avg = trades_tsdf.withRangeStats("trade_pr", rangeBackWindowSecs=600).df

I have spark-3.3.1-bin-hadoop3.tgz on ubuntu container running them as local cluster with driver as client mode. And the host is macOS Ventura 13.0.1 (22A400)