Azure / azure-cosmosdb-spark

Apache Spark Connector for Azure Cosmos DB
MIT License
198 stars 118 forks source link

Structured Streaming error: at java.io.DataInputStream.readUnsignedShort(DataInputStream.java:340) #143

Open dennyglee opened 6 years ago

dennyglee commented 6 years ago

When running the Structured Streams Demo and get the error:

scala> var streamData = spark.readStream.format(classOf[CosmosDBSourceProvider].getName).options(sourceConfigMap).load()
17/12/07 23:20:02 WARN Utils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.debug.maxToStringFields' in SparkEnv.conf.
java.io.EOFException
  at java.io.DataInputStream.readUnsignedShort(DataInputStream.java:340)
  at java.io.DataInputStream.readUTF(DataInputStream.java:589)
  at java.io.DataInputStream.readUTF(DataInputStream.java:564)
  at com.microsoft.azure.cosmosdb.spark.util.HdfsUtils.read(HdfsUtils.scala:58)
  at com.microsoft.azure.cosmosdb.spark.util.HdfsUtils.readChangeFeedToken(HdfsUtils.scala:111)
  at com.microsoft.azure.cosmosdb.spark.rdd.CosmosDBRDDIterator$.getCollectionTokens(CosmosDBRDDIterator.scala:85)
  at com.microsoft.azure.cosmosdb.spark.streaming.CosmosDBSource.schema(CosmosDBSource.scala:58)
  at com.microsoft.azure.cosmosdb.spark.streaming.CosmosDBSourceProvider.sourceSchema(CosmosDBSourceProvider.scala:51)
  at org.apache.spark.sql.execution.datasources.DataSource.sourceSchema(DataSource.scala:199)
  at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo$lzycompute(DataSource.scala:87)
  at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo(DataSource.scala:87)
  at org.apache.spark.sql.execution.streaming.StreamingRelation$.apply(StreamingRelation.scala:30)
  at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:125)
  ... 72 elided
dennyglee commented 6 years ago

The issue is caused by a corruption of the ChangeFeedCheckpointLocation. For example, my original collection was:

val sourceConfigMap = Map(
"Endpoint" -> "https://rainier.documents.azure.com:443/",
"Masterkey" -> "[my master key]",
"Database" -> "seahawks",
"Collection" -> "tweets",
"ConnectionMode" -> "Gateway",
"ChangeFeedCheckpointLocation" -> **"checkpointlocation"**,
"changefeedqueryname" -> "Streaming Query from Cosmos DB Change Feed Internal Count"
)

and changed it to:

val sourceConfigMap = Map(
"Endpoint" -> "https://rainier.documents.azure.com:443/",
"Masterkey" -> "[my master key]",
"Database" -> "seahawks",
"Collection" -> "tweets",
"ConnectionMode" -> "Gateway",
"ChangeFeedCheckpointLocation" -> **"checkpointlocation2"**,
"changefeedqueryname" -> "Streaming Query from Cosmos DB Change Feed Internal Count"
)
dennyglee commented 6 years ago

Note, we should fix this so if there is an error based on the ChangeFeedCheckpointLocation, the error should be more descriptive than java.io.EOFException at java.io.DataInputStream.readUnsignedShort(DataInputStream.java:340)