snowflakedb / spark-snowflake

Snowflake Data Source for Apache Spark.
http://www.snowflake.net
Apache License 2.0
217 stars 99 forks source link

Issue with Databricks Spark Streaming: Private key must be specified in Snowflake streaming #157

Open satendrakumar opened 5 years ago

satendrakumar commented 5 years ago

We are using databricks Spark to load data into snowflake. It is working perfectly with Batch jobs but failing with streaming. here is code:

 val options =Map(
"sfUrl" -> "********.snowflakecomputing.com",
"sfUser" -> "*****",
"sfPassword" -> "****",
"sfDatabase" -> "TEST_DB",
"sfSchema" -> "TEST_DOCUMENT",
"sfWarehouse" -> "COMPUTE_WH"
)

val rawStream = spark.readStream.schema(schema).json(path)
rawStream.writeStream.format("snowflake").options(options) .option("dbtable", "L_FEATURE_TEST").option("checkpointLocation", checkpointRaw).trigger(Trigger.Once()).start()

Error:

java.lang.IllegalArgumentException: requirement failed: Private key must be specified in Snowflake streaming
    at scala.Predef$.require(Predef.scala:224)
    at net.snowflake.spark.snowflake.SnowflakeSink.<init>(SnowflakeSink.scala:41)
    at net.snowflake.spark.snowflake.DefaultSource.createSink(DefaultSource.scala:137)
    at org.apache.spark.sql.execution.datasources.DataSource.createSink(DataSource.scala:305)
    at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:330)
    at line92a282bca6f44a208d621b415f7ee12490.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(command-2679090388403770:1)
    at line92a282bca6f44a208d621b415f7ee12490.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(command-2679090388403770:64)
    at line92a282bca6f44a208d621b415f7ee12490.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(command-2679090388403770:66)
    at line92a282bca6f44a208d621b415f7ee12490.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(command-2679090388403770:68)
    at line92a282bca6f44a208d621b415f7ee12490.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(command-2679090388403770:70)
    at line92a282bca6f44a208d621b415f7ee12490.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(command-2679090388403770:72)
    at line92a282bca6f44a208d621b415f7ee12490.$read$$iw$$iw$$iw$$iw$$iw$$iw.<init>(command-2679090388403770:74)
    at line92a282bca6f44a208d621b415f7ee12490.$read$$iw$$iw$$iw$$iw$$iw.<init>(command-2679090388403770:76)
    at line92a282bca6f44a208d621b415f7ee12490.$read$$iw$$iw$$iw$$iw.<init>(command-2679090388403770:78)
    at line92a282bca6f44a208d621b415f7ee12490.$read$$iw$$iw$$iw.<init>(command-2679090388403770:80)
    at line92a282bca6f44a208d621b415f7ee12490.$read$$iw$$iw.<init>(command-2679090388403770:82)
    at line92a282bca6f44a208d621b415f7ee12490.$read$$iw.<init>(command-2679090388403770:84)
    at line92a282bca6f44a208d621b415f7ee12490.$read.<init>(command-2679090388403770:86)
    at line92a282bca6f44a208d621b415f7ee12490.$read$.<init>(command-2679090388403770:90)
    at line92a282bca6f44a208d621b415f7ee12490.$read$.<clinit>(command-2679090388403770)
    at line92a282bca6f44a208d621b415f7ee12490.$eval$.$print$lzycompute(<notebook>:7)

Not sure, This is issue. Is this possible to load streaming data using the username and password ?

andregoode commented 5 years ago

I am also having issues using when using Spark Structured Streaming. I noticed the error @satendrakumar was experiencing above so I modified my code to supply a private key via the privateKey option. It returned the following error:

ERROR IllegalArgumentException: "A snowflake passsword or private key path must be provided with 'sfpassword or pem_private_key' parameter, e.g. 'password'"

When trying to also include the pem_private_key option, I get the following exception despite me following code examples found in the Snowflake docs:

IllegalArgumentException: 'Input PEM private key is invalid'
rkesh-singh commented 5 years ago

The streaming mode does not currently support streaming data directly from Databricks or Qubole. However, the connector still works in non-streaming mode with both Qubole and Databricks.

andregoode commented 4 years ago

The streaming mode does not currently support streaming data directly from Databricks or Qubole. However, the connector still works in non-streaming mode with both Qubole and Databricks.

To be clear @rkesh-singh, you are currently using the Spark-Snowflake connector for batch writes? I am as well... but looking to use the structured streaming SnowflakeSink published here for streaming jobs. No documentation exists 😩

rkesh-singh commented 4 years ago

@andregoode Streaming support is still in preview. You can contact Snowflake for enabling in your account.

dwai1714 commented 4 years ago

@rkesh-singh Is there any update on this? Or is it still in preview mode?

Mr-Fantasy commented 2 years ago

I used below method, and it worked.

I have tried it with PySpark, should also work with modifications in Spark-Scala.

Pre-requisites: Public key must be added to user in Snowflake.

Additional Libraries imported:

from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization

Used following code to obtain decrypted Private key without header and trailer:

private_key_obj = open(private_key_path,"r")
private_key=private_key_obj.read()
private_key_obj.close()

key = bytes(private_key, 'utf-8')

p_key = serialization.load_pem_private_key(key, password=passphrase.encode(), backend=default_backend())

pkb = p_key.private_bytes(
    encoding=serialization.Encoding.PEM,
    format=serialization.PrivateFormat.PKCS8,
    encryption_algorithm=serialization.NoEncryption()
).replace(b"-----BEGIN PRIVATE KEY-----\n", b"") \
    .replace(b"\n-----END PRIVATE KEY-----", b"") \
        .decode("utf-8")

In options added 'pem_private_key' as 'pkb'.

Added some additional parameters in writeStream():

rawstream.writeStream\
    .outputMode("append")\
    .option("checkpointLocation", <checkpoint location>)\
    .option("dbtable",<target table name>)\
    .options(**options)\
    .option("streaming_stage", <temp stage name>)\
    .format("snowflake")\
    .start().awaitTermination()
liu1324871885 commented 1 year ago

still un-support streaming read?