microsoft / sql-spark-connector

Apache Spark Connector for SQL Server and Azure SQL
Apache License 2.0
274 stars 116 forks source link

DATETIME support? #167

Closed andrew-mashkovsky closed 2 years ago

andrew-mashkovsky commented 2 years ago

Is it planned to add support of DATETIME fields when saving df? When saving df with timestamp column to table with datetime column it fails with: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 6.0 failed 4 times, most recent failure: Lost task 1.3 in stage 6.0 (TID 42) (10.0.13.70 executor 1): com.microsoft.sqlserver.jdbc.SQLServerException: The connection is closed.

luxu1-ms commented 2 years ago

SQL Datetime data type only allows 3 digits fractional seconds while spark dateframe might have more digits. Please try to use datetime2 in SQL instead of datetime. We do have a fix in spark which has not been taken by spark. https://github.com/apache/spark/pull/32655

luxu1-ms commented 2 years ago

close inactive issues

mo-zed commented 1 year ago

Hi @luxu1-ms I am experiencing similar issue and even when using datetime2 format I get the same error

luxu1-ms commented 1 year ago

@mo-zed Please try to create sql table with datatime2 column and append to that table

mo-zed commented 1 year ago

@luxu1-ms I have done the above per my previous comment which didn't work. I dropped all the milliseconds from the timestamp column in databricks and I was able to push to datetime column in Azure SQL. Your suggestion of using datetime2 does not work I'm afraid.

luxu1-ms commented 1 year ago

@mo-zed Could you share the repro?

mo-zed commented 1 year ago

Unfortunately I cannot share the repo



ingestion_time = datetime.now()

# df is some dataframe
df =  df.withColumn("Read_Time", lit(ingestion_time))

driver = "com.microsoft.sqlserver.jdbc.SQLServerDriver"  # I have tried with and without driver
url = f"jdbc:sqlserver://{url};database={database};encrypt=true;trustServerCertificate=false;hostNameInCertificate=*.database.windows.net;loginTimeout=30"

context = adal.AuthenticationContext(f"https://login.windows.net/{tenant_id}")
token = context.acquire_token_with_client_credentials(resource_app_id_url, service_principal_id, service_principal_secret)
access_token = token["accessToken"]

df.write \
                     .format("com.microsoft.sqlserver.jdbc.spark") \
                     .option("driver", driver) \
                     .option("url", url) \
                     .option("enableServicePrincipalAuth", "true") \
                     .option("dbtable", table_name) \
                     .option("accessToken", access_token) \
                     .mode("append") \
                     .save()```
luxu1-ms commented 1 year ago

I tested on DBR 11.3 LTS and it works.

from datetime import datetime
from pyspark.sql.functions import lit

ingestion_time = datetime.now()

df_datetime2 = spark \
  .createDataFrame([("a", 1), ("b", 2), ("c",  3)], ["Col1", "Col2"]) \
  .withColumn('datetime2', lit(ingestion_time))
df_datetime2.show(truncate= False)

dbtable = "datetime2"

try:
  df_datetime2.write \
    .format("com.microsoft.sqlserver.jdbc.spark") \
    .mode("append") \
    .option("url", url) \
    .option("dbtable", dbtable) \
    .option("user", user) \
    .option("password", password) \
    .save()
except ValueError as error :
    print("MSSQL Connector write failed", error)

print("MSSQL Connector write(append) succeeded  ")

CREATE TABLE datetime2 ( Col1 nvarchar(4), Col2 bigint, datetime2 DATETIME2(7) NOT NULL )