databrickslabs / cicd-templates

Manage your Databricks deployments and CI with code.
Other
201 stars 100 forks source link

Is it possible to package Structured Streaming Applications via this CI/CD Framework #63

Closed rickyschools closed 3 years ago

rickyschools commented 3 years ago

Hello -

I've recently stumbled upon this framework and so far am really enjoying the workflow put forth by the team. Thank you!

I'm working on a PySpark application that relies upon structured streaming to get data out of Kafka, and I'm running into issues getting the pipeline to complete after running a deployment to our release cluster. I have versions of this code working in Databricks notebooks, so I'm trying to decipher what it is I'm missing in the context of this ci/cd framework..

The dbx deployments via Azure Pipelines and the sample jobs execute as expected - so I can rule out a deployment issue.

I'm aware of the limitation that databricks-connect has regarding structured streaming support, but am wondering if that's getting passed along to the job cluster I'm spinning up for the job I've crafted.

Any wisdom you could impart would be greatly appreciated. Below is the full stack-trace of the standard error from the spark job.

Thanks in advance,

RS


/databricks/python/lib/python3.7/site-packages/IPython/config.py:13: ShimWarning: The `IPython.config` package has been deprecated since IPython 4.0. You should import from traitlets.config instead.
  "You should import from traitlets.config instead.", ShimWarning)
/databricks/python/lib/python3.7/site-packages/IPython/nbconvert.py:13: ShimWarning: The `IPython.nbconvert` package has been deprecated since IPython 4.0. You should import from nbconvert instead.
  "You should import from nbconvert instead.", ShimWarning)
Thu Feb 25 00:55:55 2021 py4j imported
Thu Feb 25 00:55:55 2021 Python shell started with PID  2490  and guid  1c65e51ed47b4b95a033bef96013d3f8
Thu Feb 25 00:55:55 2021 Initialized gateway on port 46845
Thu Feb 25 00:55:56 2021 py4j imported
Thu Feb 25 00:55:56 2021 Python shell executor start
Dropped logging in PythonShell:

b'/local_disk0/tmp/1614214540907-0/PythonShell.py:1084: DeprecationWarning: The `use_readline` parameter is deprecated and ignored since IPython 6.0.\n  parent=self,\n'
/databricks/python/lib/python3.7/site-packages/IPython/config.py:13: ShimWarning: The `IPython.config` package has been deprecated since IPython 4.0. You should import from traitlets.config instead.
  "You should import from traitlets.config instead.", ShimWarning)
/databricks/python/lib/python3.7/site-packages/IPython/nbconvert.py:13: ShimWarning: The `IPython.nbconvert` package has been deprecated since IPython 4.0. You should import from nbconvert instead.
  "You should import from nbconvert instead.", ShimWarning)
Thu Feb 25 00:55:59 2021 py4j imported
Thu Feb 25 00:55:59 2021 Python shell started with PID  2514  and guid  5d2f02b085044e0e81d7961dfacf6ab7
Thu Feb 25 00:55:59 2021 Initialized gateway on port 36749
Thu Feb 25 00:56:00 2021 py4j imported
Thu Feb 25 00:56:00 2021 Python shell executor start
Dropped logging in PythonShell:

b'/local_disk0/tmp/1614214540907-0/PythonShell.py:1084: DeprecationWarning: The `use_readline` parameter is deprecated and ignored since IPython 6.0.\n  parent=self,\n'
---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
<command--1> in <module>
     12 
     13 with open(filename, "rb") as f:
---> 14   exec(f.read())
     15 

<string> in <module>

<string> in launch(self)

<string> in run_topic_ingestion(self, topic, topic_idx)

/databricks/spark/python/pyspark/sql/streaming.py in load(self, path, format, schema, **options)
    418             return self._df(self._jreader.load(path))
    419         else:
--> 420             return self._df(self._jreader.load())
    421 
    422     @since(2.0)

/databricks/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1303         answer = self.gateway_client.send_command(command)
   1304         return_value = get_return_value(
-> 1305             answer, self.gateway_client, self.target_id, self.name)
   1306 
   1307         for temp_arg in temp_args:

/databricks/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
    125     def deco(*a, **kw):
    126         try:
--> 127             return f(*a, **kw)
    128         except py4j.protocol.Py4JJavaError as e:
    129             converted = convert_exception(e.java_exception)

/databricks/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    326                 raise Py4JJavaError(
    327                     "An error occurred while calling {0}{1}{2}.\n".
--> 328                     format(target_id, ".", name), value)
    329             else:
    330                 raise Py4JError(

Py4JJavaError: An error occurred while calling o399.load.
: java.lang.NullPointerException
    at org.apache.spark.sql.kafka010.KafkaSourceProvider.validateGeneralOptions(KafkaSourceProvider.scala:243)
    at org.apache.spark.sql.kafka010.KafkaSourceProvider.org$apache$spark$sql$kafka010$KafkaSourceProvider$$validateStreamOptions(KafkaSourceProvider.scala:331)
    at org.apache.spark.sql.kafka010.KafkaSourceProvider.sourceSchema(KafkaSourceProvider.scala:71)
    at org.apache.spark.sql.execution.datasources.DataSource.sourceSchema(DataSource.scala:242)
    at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo$lzycompute(DataSource.scala:122)
    at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo(DataSource.scala:122)
    at org.apache.spark.sql.execution.streaming.StreamingRelation$.apply(StreamingRelation.scala:35)
    at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:221)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380)
    at py4j.Gateway.invoke(Gateway.java:295)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:251)
    at java.lang.Thread.run(Thread.java:748)```
renardeinside commented 3 years ago

Hi @rickyschools !

It's definitely possible to package Structured Streaming Apps since there are no limitations to do this.

Unfortunately, by the error you've provided, it's quite hard to grasp what's the source of it, but most probably it's somewhere between Spark and Kafka, and it's completely unrelated to cicd-templates. I think you need to check the logs under the error message (this error only shows the fact of the error, not exactly the source of it).

Could you please verify the following:

  1. Exactly this code (same parameters) works in the notebook?
  2. You've provided the Kafka-related dependencies from the interactive cluster to the job cluster?
rickyschools commented 3 years ago

@renardeinside

Thanks for your confirmation re: Structured Streaming.

I can confirm that environment was provisioned as expected from the interactive to job cluster. After digging a bit deeper in research and debugging, I found there was an invalid reference in the configuration I put forth. Apologies for the false alarm.

Thank you for your prompt response! I'm really excited to get this framework into my team's workflow as we start leverage Databricks more and more. I really appreciate what you and the team have put forth here.

Cheers,

RS