awslabs / aws-glue-libs

AWS Glue Libraries are additions and enhancements to Spark for ETL operations.
Other
635 stars 300 forks source link

Failed to find data source: kinesis #154

Open DenisMurakhovskiy opened 1 year ago

DenisMurakhovskiy commented 1 year ago

I have an issue with Kinesis. I use AWS Glue version 3.0 jobs locally (https://aws.amazon.com/blogs/big-data/develop-and-test-aws-glue-version-3-0-jobs-locally-using-a-docker-container/). For example:

dataframe_AmazonKinesis_node = glueContext.create_data_frame.from_options(
    connection_type="kinesis",
    connection_options={
        "typeOfData": "kinesis",
        "streamARN": stream_arn,
        "classification": "json",
        "startingPosition": "earliest",
        "inferSchema": "true",
    },
    transformation_ctx="dataframe_AmazonKinesis_node",
)

or

dataframe_AmazonKinesis_node = glueContext.create_data_frame.from_catalog(
    database="kinesis-data-stream",
    table_name="table-name",
    additional_options={
        "streamName": "stream-name",
        "startingPosition": "earliest",
        "inferSchema": "true",
        "classification": "json",
    },
    transformation_ctx="dataframe_AmazonKinesis_node",
)

I got that error (Failed to find data source: kinesis):

An error was encountered:
An error occurred while calling o137.getDataFrame.
: java.lang.ClassNotFoundException: Failed to find data source: kinesis. Please find packages at http://spark.apache.org/third-party-projects.html
    at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:689)
    at org.apache.spark.sql.streaming.DataStreamReader.loadInternal(DataStreamReader.scala:209)
    at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:195)
    at com.amazonaws.services.glue.StreamingDataSource.readDataFrame(DataSource.scala:1225)
    at com.amazonaws.services.glue.StreamingDataSource.readDataFrame$(DataSource.scala:1219)
    at com.amazonaws.services.glue.KinesisDataSource.readDataFrame(DataSource.scala:1501)
    at com.amazonaws.services.glue.KinesisDataSource.getDataFrame(DataSource.scala:1530)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.ClassNotFoundException: kinesis.DefaultSource
    at java.net.URLClassLoader.findClass(URLClassLoader.java:387)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
    at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$5(DataSource.scala:663)
    at scala.util.Try$.apply(Try.scala:213)
    at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$4(DataSource.scala:663)
    at scala.util.Failure.orElse(Try.scala:224)
    at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:663)
    ... 17 more

Any clue?

kesor commented 1 year ago

Adding https://github.com/qubole/kinesis-sql as a dependency, like described in https://stackoverflow.com/questions/66153584/caused-by-java-lang-classnotfoundexception-kinesis-defaultsource resolved this error message in my test.

With Jupyter, as the blog post suggests, you can add this package using a notebook cell like so -

%%configure -f
{
    "conf": {
        "spark.jars.packages": "com.qubole.spark:spark-sql-kinesis_2.12:1.2.0_spark-3.0"
    }
}
DenisMurakhovskiy commented 1 year ago

This is a good tip, I'll check it out! Thank you!