springml / spark-sftp

Spark connector for SFTP
Apache License 2.0
100 stars 98 forks source link

pem key is throwing error #79

Open ucrkarthik opened 3 years ago

ucrkarthik commented 3 years ago

I am able to use pysftp to upload file to an sftp location with a userid and private_key:

    with pysftp.Connection(host=job_args["sftp_host"], username=job_args["sftp_username"],
                           private_key=job_args["sftp_pem"], log=os.path.join(dir_name, 'logs/pysftp.log')) as sftp:
        sftp.put(f"{job_args['gzip_target_path']}part-00000-45e353c1-dce3-48a6-ba20-adc4c341a1b8-c000.csv.gz",
                 f"/TO_MRKTNG_TEST/BRWSNG_BHVR_{datetime.today().strftime('%Y%m%d')}.TXT.gz")  # upload file to public/ on remote
        sftp.close()

But I am getting an error when I try the same with spark-sftp:

    result_df.write.format("com.springml.spark.sftp"). \
        option("host", job_args["sftp_host"]). \
        option("username", job_args["sftp_username"]). \
        option("pem", job_args["sftp_pem"]). \
        option("fileType", "csv"). \
        option("delimiter", ","). \
        save(f"/TO_MRKTNG_TEST/BRWSNG_BHVR_{datetime.today().strftime('%Y%m%d')}.TXT.gz")

Here is the error:

answer = 'xro102', gateway_client = <py4j.java_gateway.GatewayClient object at 0x11793b710>, target_id = 'o101', name = 'save'

    def get_return_value(answer, gateway_client, target_id=None, name=None):
        """Converts an answer received from the Java gateway into a Python object.

        For example, string representation of integers are converted to Python
        integer, string representation of objects are converted to JavaObject
        instances, etc.

        :param answer: the string returned by the Java gateway
        :param gateway_client: the gateway client used to communicate with the Java
            Gateway. Only necessary if the answer is a reference (e.g., object,
            list, map)
        :param target_id: the name of the object from which the answer comes from
            (e.g., *object1* in `object1.hello()`). Optional.
        :param name: the name of the member from which the answer comes from
            (e.g., *hello* in `object1.hello()`). Optional.
        """
        if is_error(answer)[0]:
            if len(answer) > 1:
                type = answer[1]
                value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
                if answer[1] == REFERENCE_TYPE:
                    raise Py4JJavaError(
                        "An error occurred while calling {0}{1}{2}.\n".
>                       format(target_id, ".", name), value)
E                   py4j.protocol.Py4JJavaError: An error occurred while calling o101.save.
E                   : com.jcraft.jsch.JSchException: invalid privatekey: [B@35c88b66
E                       at com.jcraft.jsch.KeyPair.load(KeyPair.java:664)
E                       at com.jcraft.jsch.KeyPair.load(KeyPair.java:561)
E                       at com.jcraft.jsch.IdentityFile.newInstance(IdentityFile.java:40)
E                       at com.jcraft.jsch.JSch.addIdentity(JSch.java:407)
E                       at com.jcraft.jsch.JSch.addIdentity(JSch.java:367)
E                       at com.springml.sftp.client.SFTPClient.createSFTPChannel(SFTPClient.java:266)
E                       at com.springml.sftp.client.SFTPClient.copyToFTP(SFTPClient.java:102)
E                       at com.springml.spark.sftp.DefaultSource.upload(DefaultSource.scala:160)
E                       at com.springml.spark.sftp.DefaultSource.createRelation(DefaultSource.scala:126)
E                       at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
E                       at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
E                       at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
E                       at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
E                       at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
E                       at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
E                       at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
E                       at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
E                       at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
E                       at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
E                       at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
E                       at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
E                       at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
E                       at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
E                       at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
E                       at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
E                       at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
E                       at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
E                       at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)
E                       at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
E                       at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:229)
E                       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
E                       at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
E                       at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
E                       at java.lang.reflect.Method.invoke(Method.java:498)
E                       at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
E                       at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
E                       at py4j.Gateway.invoke(Gateway.java:282)
E                       at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
E                       at py4j.commands.CallCommand.execute(CallCommand.java:79)
E                       at py4j.GatewayConnection.run(GatewayConnection.java:238)
E                       at java.lang.Thread.run(Thread.java:748)

../../../.virtualenvs/venv/regionsBank-aamFileExport-spark-etl/lib/python3.7/site-packages/py4j/protocol.py:328: Py4JJavaError
=============================================================================================== 1 failed, 1 deselected in 23.42 seconds ===============================================================================================
(regionsBank-aamFileExport-spark-etl) kavenkats-MacBook-Pro:regionsBank-aamFileExport-spark-etl kavenkat$ 

Am I missing something?

Thanks