samelamin / spark-bigquery

Google BigQuery support for Spark, Structured Streaming, SQL, and DataFrames with easy Databricks integration.
Apache License 2.0
70 stars 28 forks source link

java.io.IOException: Not found: Uris gs #26

Closed kurtmaile closed 7 years ago

kurtmaile commented 7 years ago

Hi Sam,

Ive been running my streaming ETL jobs (non watermark ones) in Databricks UAT for about 1.5 weeks.

For the most part everything is working fine, however I do get the occasion IO error to GCP bucket which is odd.

java.io.IOException: Not found: Uris gs://sdv-analytics-spark-streaming/hadoop/tmp/spark-bigquery/spark-bigquery-1498172713848=666506084/*

I think this seems to be possibly because of my config / job setup rather then application/transformation logic.

Im going to do some investigating today, but thought to raise it with you incase you had seen a similar exceptions or if its obvious a job setup is wrong?

So one thing to point out, in a given SS job, I have 1 streaming input DF ( reading input events off kinesis), but I stream to 3 targets (doing some transformationsin the middle) - 2 seperate BigQuery tables, 1 S3 bucket sinks - so 3 streamingQuery's in total in a single SS job.

For the 2 BQ target tables in my BQ config, I am using a shared GCP storage folder within the job for SS for intermediary staging area, NOT individual ones.

spark.sqlContext.setBigQueryGcsBucket(sdv-analytics-spark-streaming)

Im wondering if with this IOError, having 2 BQ streamingQuery's from the same single input source DF in the same job is causing this and that the gcp folder suffix part ....

spark-bigquery-1498172713848=666506084

...is wrongly shared as a staging area (as maybe its derived from the input source) ? And does one streamingQuery cleans this up while the other one expects it to be there? Or maybe Im barking up the wrong tree too!!

Should each target StreamingQuery have its OWN GCP bucket rather then a shared one because they are working off the same input stream?

e.g spark.sqlContext.setBigQueryGcsBucket(sdv-analytics-spark-streaming-)

The spark session though is unique per job and not shared, so seems I would need to break the 1 job up into 3 to do this given the shared context - 1 input to 1 sink instead of 1 input to 3?

This could be because of something else, e.g intermittent network issues for example?

When I restart the job it did pick up where it left off and didnt error so its intermittent.

Does this make sense? Should I refactor to 1:1 input to BQ output?

Sorry its a bit long-winded!

Thanks heaps!

Main part of the trace in logs below:

java.io.IOException: Not found: Uris gs://sdv-analytics-spark-streaming/hadoop/tmp/spark-bigquery/spark-bigquery-1498172713848=666506084/*
at com.google.cloud.hadoop.io.bigquery.BigQueryUtils.waitForJobCompletion(BigQueryUtils.java:95) at com.samelamin.spark.bigquery.BigQueryClient.com$samelamin$spark$bigquery$BigQueryClient$$waitForJob(BigQueryClient.scala:143) at com.samelamin.spark.bigquery.BigQueryClient.load(BigQueryClient.scala:110) at com.samelamin.spark.bigquery.package$BigQueryDataFrame.saveAsBigQueryTable(package.scala:163) at com.samelamin.spark.bigquery.streaming.BigQuerySink.addBatch(BigQuerySink.scala:27) at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch$1.apply$mcV$sp(StreamExecution.scala:658)
kurtmaile commented 7 years ago

Hi Sam,

Getting a few more of these java.io.IOException: Not found: Uris gs:// excpetions occuring, do you think its worth doing as described above and isolating the storgage buckets?

I dont loose any data when the errors occur (of course), I just restart the job and it works. The io errors appear intermittent at the moment quite hard to debug why tit cant find the gcp storage bucket for the job? ANy ideas?

Ive had a handful of instances where I had duplicates in my SS BQ sink table too - possibly when there is a replay by spark from kinesis source due an issue somewhere and as BQ is not an SS idempotant sink so am getting duplicates appended......Ill be able to address this with the new watermarking feature anyway once you introduce it, can set drop_duplicates....that still assumes no duplicaates come after the watermark though, which the only way is to support the SS 'update' mode. I see BQ does support SQL updates now, but those quotas look tiny!

Thanks mate

samelamin commented 7 years ago

hey @kurtmaile just finding time to look through these issues

I should think that if you had seperate buckets for both checkpointing and data transfer it should resolve the issue. You can try setting a different key in the bucket for the job if you want everything in one master bucket. eg {bucket-name}/{job_name}

Give it a try and give me a shout with the results. Im having a look at the watermarking issue now

kurtmaile commented 7 years ago

Hi Sam!

Thanks so much.

I've done as you mentioned will see how testing goes today.

And awesome news on the watermarking! Legend thanks

Kurt

On Wed, 28 Jun 2017 at 20:57, Sam Elamin notifications@github.com wrote:

hey @kurtmaile https://github.com/kurtmaile just finding time to look through these issues

I should think that if you had seperate buckets for both checkpointing and data transfer it should resolve the issue. You can try setting a different key in the bucket for the job if you want everything in one master bucket. eg {bucket-name}/{job_name}

Give it a try and give me a shout with the results. Im having a look at the watermarking issue now

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/samelamin/spark-bigquery/issues/26#issuecomment-311769806, or mute the thread https://github.com/notifications/unsubscribe-auth/ATIvhSHoBAkxnlHUk4x8LsUyrWaVTWhtks5sIrASgaJpZM4ODLy1 .

kurtmaile commented 7 years ago

Hi Sam,

On this issue, I know you are still recovering so only when you get some time to look, Im oddly still getting intermittent issues with the gcp staging bucket. I've split my jobs out so each is an isolated streaming notebook (but using a shared 'Library' - more on this later) - with its gcp bucket and s3 checkpoint in the spark session.

In databricks, I am using the 3.0 Dbricks runtime (so v2.2RC with dbricks extra bells and whistles). The cluster by default is set to NOT share the spark session on the cluster, so each job should be isolated and have its own SS.

However......refer below

ioerror

You can see that the gcpp staging bucket reported in the ERROR is not the one for this notebook job in the cell above when configured, rather, its the one for one of the others! These are highlighted in red. Its reporting on the inventory bucket, but its configured in the above cell to the customeraccountevents bucket (which is the config for a seperate job)?

screenshot 2017-07-07 15 01 03

Now I absolutely dont think dbricks would get this ss sharing wrong, so its something Im doing likely. But I have totally isolate the jobs so a little bit stummped and there seems no way it should be looking for that staging bucket.

One thing I can say, I try keep the notebooks as light as possible, mostly just configuring the srreaing job, and they use a reusable library I created for our solution to hide some details from users. Fairly standard pattern. So the BQ streaming sink class below (use in the notebook you can see in the image) is as follows, pretty simple.

class BigQueryStreamingSink(spark: SparkSession = SparkSession.builder().getOrCreate(),
                            gcpKeyFilePath: String,
                            bqProjectId: String,
                            bqDataset: String,
                            bqBucket: String,
                            checkpoint: String,
                            bqRegion: String = "EU",
                            bqAllowSchemaUpdates: Boolean = true) extends Serializable {

  spark.sparkContext.hadoopConfiguration.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
  spark.sqlContext.setGcpJsonKeyFile(gcpKeyFilePath)
  spark.sqlContext.setGSProjectId(bqProjectId)
  spark.sqlContext.setBigQueryProjectId(bqProjectId)
  spark.sqlContext.setBigQueryGcsBucket(bqBucket)
  spark.sqlContext.setBigQueryDatasetLocation(bqRegion)

  if(bqAllowSchemaUpdates)
    spark.sqlContext.setAllowSchemaUpdates()

  def writeStreamToBigQueryTable(streamingDf: DataFrame,
                                 bqTable: String,
                                 outputMode: OutputMode = OutputMode.Append): StreamingQuery = {
    streamingDf
      .writeStream
      .format("com.samelamin.spark.bigquery")
      .option("checkpointLocation", checkpoint)
      .option("tableReferenceSink", s"$bqProjectId:$bqDataset.$bqTable")
      .outputMode(outputMode)
      .start()
  }

This is part of this reusable library I mentioned that gets loaded into dbicks fter my builds and onto the cluster. But I cant see this being an issue, unless Iver got this wrong, as the instance of this is created in the context of the isolated jobs jvm? Or am I missing an very obvious thing here?

The job aborts at this point, and retries again as it is set up to, and usually works first re-attempt, if not a couple. I never loose data, but I do get dupliates as it retries the batch again from the last checkpoint. Which itself isnt an issue, I need to accomodate them anyway, and I dont loose data. But it does just pick back up on the next run and work.....so its intermittent, anoying, but not a show stopper.

Any of your expert help appreciated, when you are feeling up to it of course no rush please heal and get better mate.

Regards and best wishes Kurt

samelamin commented 7 years ago

Hi Kurt

I will try and replicate when I have time, can you just confirm that not only are you using seperate buckets but also different checkpoint and transactionLog locations

Try separating everything at all to rule out any of these issues

Regards Sam

On Fri, Jul 7, 2017 at 5:16 PM, kurtmaile notifications@github.com wrote:

Hi Sam,

On this issue, I know you are still recovering so only when you get some time to look, Im oddly still getting intermittent issues with the gcp staging bucket. I've split my jobs out so each is an isolated streaming notebook (but using a shared 'Library' - more on this later) - with its gcp bucket and s3 checkpoint in the spark session.

In databricks, I am using the 3.0 Dbricks runtime (so v2.2RC with dbricks extra bells and whistles). The cluster by default is set to NOT share the spark session on the cluster, so each job should be isolated and have its own SS.

However......refer below

[image: ioerror] https://user-images.githubusercontent.com/20066181/27960800-7ddd9634-6324-11e7-906b-0d9d49dec913.png

You can see that the gcpp staging bucket reported in the ERROR is not the one for this notebook job in the cell above when configured, rather, its the one for one of the others! These are highlighted in red. Its reporting on the inventory bucket, but its configured in the above cell to the customeraccountevents bucket (which is the config for a seperate job)?

[image: screenshot 2017-07-07 15 01 03] https://user-images.githubusercontent.com/20066181/27960997-2ca82daa-6325-11e7-87fa-ad4124366307.png

Now I absolutely dont think dbricks would get this ss sharing wrong, so its something Im doing likely. But I have totally isolate the jobs so a little bit stummped and there seems no way it should be looking for that staging bucket.

One thing I can say, I try keep the notebooks as light as possible, mostly just configuring the srreaing job, and they use a reusable library I created for our solution to hide some details from users. Fairly standard pattern. So the BQ streaming sink class below (use in the notebook you can see in the image) is as follows, pretty simple.

class BigQueryStreamingSink(spark: SparkSession = SparkSession.builder().getOrCreate(), gcpKeyFilePath: String, bqProjectId: String, bqDataset: String, bqBucket: String, checkpoint: String, bqRegion: String = "EU", bqAllowSchemaUpdates: Boolean = true) extends Serializable {

spark.sparkContext.hadoopConfiguration.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem") spark.sqlContext.setGcpJsonKeyFile(gcpKeyFilePath) spark.sqlContext.setGSProjectId(bqProjectId) spark.sqlContext.setBigQueryProjectId(bqProjectId) spark.sqlContext.setBigQueryGcsBucket(bqBucket) spark.sqlContext.setBigQueryDatasetLocation(bqRegion)

if(bqAllowSchemaUpdates) spark.sqlContext.setAllowSchemaUpdates()

def writeStreamToBigQueryTable(streamingDf: DataFrame, bqTable: String, outputMode: OutputMode = OutputMode.Append): StreamingQuery = { streamingDf .writeStream .format("com.samelamin.spark.bigquery") .option("checkpointLocation", checkpoint) .option("tableReferenceSink", s"$bqProjectId:$bqDataset.$bqTable") .outputMode(outputMode) .start() }

This is part of this reusable library I mentioned that gets loaded into dbicks fter my builds and onto the cluster. But I cant see this being an issue, unless Iver got this wrong, as the instance of this is created in the context of the isolated jobs jvm? Or am I missing an very obvious thing here?

The job aborts at this point, and retries again as it is set up to, and usually works first re-attempt, if not a couple. I never loose data, but I do get dupliates as it retries the batch again from the last checkpoint. Which itself isnt an issue, I need to accomodate them anyway, and I dont loose data. But it does just pick back up on the next run and work.....so its intermittent, anoying, but not a show stopper.

Any of your expert help appreciated, when you are feeling up to it of course no rush please heal and get better mate.

Regards and best wishes Kurt

— You are receiving this because you commented. Reply to this email directly, view it on GitHub https://github.com/samelamin/spark-bigquery/issues/26#issuecomment-313694685, or mute the thread https://github.com/notifications/unsubscribe-auth/AEHLm5wk8ZJh5CZBWXUm1U6Rg9qWdLYxks5sLj2-gaJpZM4ODLy1 .

kurtmaile commented 7 years ago

Hi Sam,

I can confirm this yes.

Although, maybe Im going to look stupid here, or my 1 hour per night sleep taking affect, but what do you mean by 'transactionLog' location exactly?

Each streaming query has its own unique gcp bucket for staging data, check, S3 bucket / folder for the checkpoint most certainly.

Where is a transactionLog defined/configured though?

Hey thanks for even replying dont stress on the reply.

Cheers, K

On 7 July 2017 at 15:41, Sam Elamin notifications@github.com wrote:

Hi Kurt

I will try and replicate when I have time, can you just confirm that not only are you using seperate buckets but also different checkpoint and transactionLog locations

Try separating everything at all to rule out any of these issues

Regards Sam

On Fri, Jul 7, 2017 at 5:16 PM, kurtmaile notifications@github.com wrote:

Hi Sam,

On this issue, I know you are still recovering so only when you get some time to look, Im oddly still getting intermittent issues with the gcp staging bucket. I've split my jobs out so each is an isolated streaming notebook (but using a shared 'Library' - more on this later) - with its gcp bucket and s3 checkpoint in the spark session.

In databricks, I am using the 3.0 Dbricks runtime (so v2.2RC with dbricks extra bells and whistles). The cluster by default is set to NOT share the spark session on the cluster, so each job should be isolated and have its own SS.

However......refer below

[image: ioerror] https://user-images.githubusercontent.com/20066181/27960800-7ddd9634- 6324-11e7-906b-0d9d49dec913.png

You can see that the gcpp staging bucket reported in the ERROR is not the one for this notebook job in the cell above when configured, rather, its the one for one of the others! These are highlighted in red. Its reporting on the inventory bucket, but its configured in the above cell to the customeraccountevents bucket (which is the config for a seperate job)?

[image: screenshot 2017-07-07 15 01 03] https://user-images.githubusercontent.com/20066181/27960997-2ca82daa- 6325-11e7-87fa-ad4124366307.png

Now I absolutely dont think dbricks would get this ss sharing wrong, so its something Im doing likely. But I have totally isolate the jobs so a little bit stummped and there seems no way it should be looking for that staging bucket.

One thing I can say, I try keep the notebooks as light as possible, mostly just configuring the srreaing job, and they use a reusable library I created for our solution to hide some details from users. Fairly standard pattern. So the BQ streaming sink class below (use in the notebook you can see in the image) is as follows, pretty simple.

class BigQueryStreamingSink(spark: SparkSession = SparkSession.builder(). getOrCreate(), gcpKeyFilePath: String, bqProjectId: String, bqDataset: String, bqBucket: String, checkpoint: String, bqRegion: String = "EU", bqAllowSchemaUpdates: Boolean = true) extends Serializable {

spark.sparkContext.hadoopConfiguration.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem") spark.sqlContext.setGcpJsonKeyFile(gcpKeyFilePath) spark.sqlContext.setGSProjectId(bqProjectId) spark.sqlContext.setBigQueryProjectId(bqProjectId) spark.sqlContext.setBigQueryGcsBucket(bqBucket) spark.sqlContext.setBigQueryDatasetLocation(bqRegion)

if(bqAllowSchemaUpdates) spark.sqlContext.setAllowSchemaUpdates()

def writeStreamToBigQueryTable(streamingDf: DataFrame, bqTable: String, outputMode: OutputMode = OutputMode.Append): StreamingQuery = { streamingDf .writeStream .format("com.samelamin.spark.bigquery") .option("checkpointLocation", checkpoint) .option("tableReferenceSink", s"$bqProjectId:$bqDataset.$bqTable") .outputMode(outputMode) .start() }

This is part of this reusable library I mentioned that gets loaded into dbicks fter my builds and onto the cluster. But I cant see this being an issue, unless Iver got this wrong, as the instance of this is created in the context of the isolated jobs jvm? Or am I missing an very obvious thing here?

The job aborts at this point, and retries again as it is set up to, and usually works first re-attempt, if not a couple. I never loose data, but I do get dupliates as it retries the batch again from the last checkpoint. Which itself isnt an issue, I need to accomodate them anyway, and I dont loose data. But it does just pick back up on the next run and work.....so its intermittent, anoying, but not a show stopper.

Any of your expert help appreciated, when you are feeling up to it of course no rush please heal and get better mate.

Regards and best wishes Kurt

— You are receiving this because you commented. Reply to this email directly, view it on GitHub https://github.com/samelamin/spark-bigquery/issues/26# issuecomment-313694685, or mute the thread https://github.com/notifications/unsubscribe-auth/ AEHLm5wk8ZJh5CZBWXUm1U6Rg9qWdLYxks5sLj2-gaJpZM4ODLy1 .

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/samelamin/spark-bigquery/issues/26#issuecomment-313701524, or mute the thread https://github.com/notifications/unsubscribe-auth/ATIvhbtUCIjgoJFYc_wbI36miE166F77ks5sLkOzgaJpZM4ODLy1 .

kurtmaile commented 7 years ago

note: sometimes can take a good few hours for it to occur....

On 7 July 2017 at 16:07, Kurt Maile kurt.maile@xiatech.co.uk wrote:

Hi Sam,

I can confirm this yes.

Although, maybe Im going to look stupid here, or my 1 hour per night sleep taking affect, but what do you mean by 'transactionLog' location exactly?

Each streaming query has its own unique gcp bucket for staging data, check, S3 bucket / folder for the checkpoint most certainly.

Where is a transactionLog defined/configured though?

Hey thanks for even replying dont stress on the reply.

Cheers, K

On 7 July 2017 at 15:41, Sam Elamin notifications@github.com wrote:

Hi Kurt

I will try and replicate when I have time, can you just confirm that not only are you using seperate buckets but also different checkpoint and transactionLog locations

Try separating everything at all to rule out any of these issues

Regards Sam

On Fri, Jul 7, 2017 at 5:16 PM, kurtmaile notifications@github.com wrote:

Hi Sam,

On this issue, I know you are still recovering so only when you get some time to look, Im oddly still getting intermittent issues with the gcp staging bucket. I've split my jobs out so each is an isolated streaming notebook (but using a shared 'Library' - more on this later) - with its gcp bucket and s3 checkpoint in the spark session.

In databricks, I am using the 3.0 Dbricks runtime (so v2.2RC with dbricks extra bells and whistles). The cluster by default is set to NOT share the spark session on the cluster, so each job should be isolated and have its own SS.

However......refer below

[image: ioerror] https://user-images.githubusercontent.com/20066181/ 27960800-7ddd9634-6324-11e7-906b-0d9d49dec913.png

You can see that the gcpp staging bucket reported in the ERROR is not the one for this notebook job in the cell above when configured, rather, its the one for one of the others! These are highlighted in red. Its reporting on the inventory bucket, but its configured in the above cell to the customeraccountevents bucket (which is the config for a seperate job)?

[image: screenshot 2017-07-07 15 01 03] https://user-images.githubusercontent.com/20066181/ 27960997-2ca82daa-6325-11e7-87fa-ad4124366307.png

Now I absolutely dont think dbricks would get this ss sharing wrong, so its something Im doing likely. But I have totally isolate the jobs so a little bit stummped and there seems no way it should be looking for that staging bucket.

One thing I can say, I try keep the notebooks as light as possible, mostly just configuring the srreaing job, and they use a reusable library I created for our solution to hide some details from users. Fairly standard pattern. So the BQ streaming sink class below (use in the notebook you can see in the image) is as follows, pretty simple.

class BigQueryStreamingSink(spark: SparkSession = SparkSession.builder().getOrCreate(), gcpKeyFilePath: String, bqProjectId: String, bqDataset: String, bqBucket: String, checkpoint: String, bqRegion: String = "EU", bqAllowSchemaUpdates: Boolean = true) extends Serializable {

spark.sparkContext.hadoopConfiguration.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem") spark.sqlContext.setGcpJsonKeyFile(gcpKeyFilePath) spark.sqlContext.setGSProjectId(bqProjectId) spark.sqlContext.setBigQueryProjectId(bqProjectId) spark.sqlContext.setBigQueryGcsBucket(bqBucket) spark.sqlContext.setBigQueryDatasetLocation(bqRegion)

if(bqAllowSchemaUpdates) spark.sqlContext.setAllowSchemaUpdates()

def writeStreamToBigQueryTable(streamingDf: DataFrame, bqTable: String, outputMode: OutputMode = OutputMode.Append): StreamingQuery = { streamingDf .writeStream .format("com.samelamin.spark.bigquery") .option("checkpointLocation", checkpoint) .option("tableReferenceSink", s"$bqProjectId:$bqDataset.$bqTable") .outputMode(outputMode) .start() }

This is part of this reusable library I mentioned that gets loaded into dbicks fter my builds and onto the cluster. But I cant see this being an issue, unless Iver got this wrong, as the instance of this is created in the context of the isolated jobs jvm? Or am I missing an very obvious thing here?

The job aborts at this point, and retries again as it is set up to, and usually works first re-attempt, if not a couple. I never loose data, but I do get dupliates as it retries the batch again from the last checkpoint. Which itself isnt an issue, I need to accomodate them anyway, and I dont loose data. But it does just pick back up on the next run and work.....so its intermittent, anoying, but not a show stopper.

Any of your expert help appreciated, when you are feeling up to it of course no rush please heal and get better mate.

Regards and best wishes Kurt

— You are receiving this because you commented. Reply to this email directly, view it on GitHub https://github.com/samelamin/spark-bigquery/issues/26#issue comment-313694685, or mute the thread https://github.com/notifications/unsubscribe-auth/AEHLm5wk8 ZJh5CZBWXUm1U6Rg9qWdLYxks5sLj2-gaJpZM4ODLy1 .

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/samelamin/spark-bigquery/issues/26#issuecomment-313701524, or mute the thread https://github.com/notifications/unsubscribe-auth/ATIvhbtUCIjgoJFYc_wbI36miE166F77ks5sLkOzgaJpZM4ODLy1 .

samelamin commented 7 years ago

Hey @kurtmaile you can change the transaction log path by adding an option for it on the write stream, similar to what you would do with the the tableReferenceSink

.option("transactionLog",pathToLog)

kurtmaile commented 7 years ago

Hey thanks Sam.

I've not set this (is this the WAL location)?, is this a problem do you think?

Assume it is a local file system path?

Cheers Kurt

On Tue, 11 Jul 2017 at 09:19, Sam Elamin notifications@github.com wrote:

Hey @kurtmaile https://github.com/kurtmaile you can change the transaction log path by adding an option for it on the write stream, similar to what you would do with the the tableReferenceSink

.option("transactionLog",pathToLog)

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/samelamin/spark-bigquery/issues/26#issuecomment-314370050, or mute the thread https://github.com/notifications/unsubscribe-auth/ATIvhe-RToW84jhJn2qsPIH547vTZSjBks5sMzAQgaJpZM4ODLy1 .

samelamin commented 7 years ago

It is yes, but I think you can also point it to cloud storage

Try separating the transaction log and see if it helps, I am skeptical whether it will fix it though but it is best to rule it out from the start

kurtmaile commented 7 years ago

Hey,

I am going to implement this change today - quick question, if I explicitly set the WAL to something different will this cause any issues with existing streaming jobs WAL / metadata? Im not doing anything stateful - just consume off kinesis stream and write to BQ. Ill stop my running streaming jobs, make the upgrade and restart with new WAL location.

Is there any restrictions on setting the WAL - i.e existing directory needs to be in place, or has to be at a root level? Naming conventions / extensions for the file? Does it need a 'file:' prefix or is this assumed?

Thinking '/(jobname)'

Not alot on setting this, probably not very common I guess. Sorry for the questions on it!

I think as you say this is unlikely to be it but will do anyway.

Kurt

kurtmaile commented 7 years ago

Hey,

I could reproduce this by 'very quickly' starting two jobs (subscribed to same kinesis stream) in quick succession via the databricks jobs screen - very odd, almost race condition/threading issue.

Ill keep investigating. The code in the shared library I load into Databricks is below FYI incase you notice anything squify.

My notebooks each instantiate an instance of the BigQueryStreamingSink, passing in their SparkContext, so seems threadsafe? (as the sparkContext is NOT shared as mentioned).

package com.xiatechs.analytics.datasource.google

import com.samelamin.spark.bigquery._
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.streaming.StreamingQuery
import org.apache.spark.sql.streaming.Trigger

class BigQueryStreamingSink(spark: SparkSession = SparkSession.builder().getOrCreate(),
                            gcpKeyFilePath: String,
                            bqProjectId: String,
                            bqDataset: String,
                            bqBucket: String,
                            checkpoint: String,
                            bqRegion: String = "EU",
                            bqAllowSchemaUpdates: Boolean = true) extends Serializable {

  spark.sparkContext.hadoopConfiguration.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
  spark.sqlContext.setGcpJsonKeyFile(gcpKeyFilePath)
  spark.sqlContext.setGSProjectId(bqProjectId)
  spark.sqlContext.setBigQueryProjectId(bqProjectId)
  spark.sqlContext.setBigQueryGcsBucket(bqBucket)
  spark.sqlContext.setBigQueryDatasetLocation(bqRegion)

  if(bqAllowSchemaUpdates)
    spark.sqlContext.setAllowSchemaUpdates()

  def writeStreamToBigQueryTable(streamingDf: DataFrame,
                                 bqTable: String,
                                 outputMode: OutputMode = OutputMode.Append): StreamingQuery = {
    streamingDf
      .writeStream
      .format("com.samelamin.spark.bigquery")
      .option("checkpointLocation", checkpoint)
      .option("tableReferenceSink", s"$bqProjectId:$bqDataset.$bqTable")
     // .option("transactionLog","/")
      .trigger(Trigger.ProcessingTime("2 minutes"))
      .outputMode(outputMode)
      .start()
  }
}

Kurt

kurtmaile commented 7 years ago

Hi Sam,

Thanks heaps fo fixing the timestamp issue so quickly - one this topic, I had this occurr again, and interestingly the GS bucket it seems to think it cant find is the one from the LAST job started, maybe indicating in some way isolaton isnt happening? Be interesting to see if you can reproduce yourself.

Cheers

samelamin commented 7 years ago

I couldnt reproduce it but I am curious to see if it is only for kinesis streams or can be caused by any other source

I suggest streaming from local storage or S3 to rule out it being an issue from the kinesis side

Or stream into an S3 directory and write to parquet.

Also as a last attempt, try running one query on Databricks and another locally on your machine to rule out any shared context on the databricks side

kurtmaile commented 7 years ago

Hey,

Thanks for replying - we went live last night so there is a slew of activity and events Im processing. But, I am seeing alot more of these errors today, and in BQ during ingest there are some 409s I can see (conflicts?) in the dashboard

Dbricks use Kinesis themselves extensively and they've GA the connector, Id doubt it is that, more like something I am doing.

I am not loosing any data, but Ive had to restart the cluster twice to 'reset' the driver applications as a few jobs 'got stuck'. Spark seems to keep the integrity of the checkpointing so I get the data ( duplicates though) for which my query in BQ filters out. But ive had to reset the cluster which is of course not fun.

Do you think it is worth moving my BQ streaming query code out of the shared library direct into the notebook? Its the only thing I can think of at the moment Im doing wrong, somehow its not isolated, but each job has its own executor java processes so should be isolated.

Thanks for the tips Ill keep investigating, arh.....bad for my confidence! :(

K

samelamin commented 7 years ago

If you are already in production then certainly its better to move into its own notebook. The cluster will still be running so you should be fine

Manually restarting is def not fun so I will do anything I can to keep the lights on with no manual intervention and try to isolate the issue. Also if you convert the notebook to a job on Databricks, cant you set it to restart when it fails? I seem tor recall you

Have a look at this if you havent already https://docs.databricks.com/spark/latest/structured-streaming/production.html

On Tue, Jul 18, 2017 at 3:19 PM, kurtmaile notifications@github.com wrote:

Hey,

Thanks for replying - we went live last night so there is a slew of activity and events Im processing. But, I am seeing alot more of these errors today, and in BQ during ingest there are some 409s I can see (conflicts?) in the dashboard

Dbricks use Kinesis themselves extensively and they've GA the connector, Id doubt it is that, more like something I am doing.

I am not loosing any data, but Ive had to restart the cluster twice to 'reset' the driver applications as a few jobs 'got stuck'. Spark seems to keep the integrity of the checkpointing so I get the data ( duplicates though) for which my query in BQ filters out. But ive had to reset the cluster which is of course not fun.

Do you think it is worth moving my BQ streaming query code out of the shared library direct into the notebook? Its the only thing I can think of at the moment Im doing wrong, somehow its not isolated, but each job has its own executor java processes so should be isolated.

Thanks for the tips Ill keep investigating, arh.....bad for my confidence! :(

K

— You are receiving this because you commented. Reply to this email directly, view it on GitHub https://github.com/samelamin/spark-bigquery/issues/26#issuecomment-316078858, or mute the thread https://github.com/notifications/unsubscribe-auth/AEHLm8AYtjJ3F69rniZVaq2LpvnkKJ7Gks5sPL8PgaJpZM4ODLy1 .

kurtmaile commented 7 years ago

Yes the notebook is running as a job in dbricks and running with retries (thanks databricks!) so a good amount of time db restarts the job automatically and it picks up again....but seems to be failing at the moment every 20 mins with its confused gs bucket problem...but then works on restart. So yes its still a notebook, but its running as a streaming job, form which that job uses a library I create and upload via the libraries API.....the notebooks are thin top level orchestration. But thinking I could move the BQ specific code I had in there up to the notebook level. i tried to wrap it in some convenience method a simple 2 liner to create a streaming sink...

Thanks for taking the time to reply!

Rebooting the cluster has only happened today, because even though the retry worked, there was no data flowing through so it got stuck somehow....

kurtmaile commented 7 years ago

....the shared library is where I set the spark.sqlContext.xxxx for all the variables for BQ SS sink, including the gs bucket, but the spark session variable should still be isolated, thats the confusing part....

samelamin commented 7 years ago

Hmm it is very difficult to debug without pairing on it. It is most likely a bug in the connector if the shared library isnt setting anything with regards to the bucket.

Are you not able to set any BQ specific variables outside the shared library? It might duplicate your code but atleast it will decouple it and help narrow down the problem

On Tue, Jul 18, 2017 at 3:33 PM, kurtmaile notifications@github.com wrote:

....the shared library is where I set the spark.sqlContext.xxxx for all the variables for BQ SS sink, including the gs bucket, but the spark session variable should still be isolated, thats the confusing part....

— You are receiving this because you commented. Reply to this email directly, view it on GitHub https://github.com/samelamin/spark-bigquery/issues/26#issuecomment-316083366, or mute the thread https://github.com/notifications/unsubscribe-auth/AEHLm-lE_fGH-FTH0aS6YQf8X7Zx4BiRks5sPMJRgaJpZM4ODLy1 .

kurtmaile commented 7 years ago

yea no problems having someone as a sounding board (even if not pairing) is nice so thank you. I need to be able to sort my own sh!t out, so will keep trying. My plan tonight is to move the BQ stuff up into the top level notebook, see how it goes - feels like the best next step. Thanks a lot mate youre a star appreciate your support.

kurtmaile commented 7 years ago

Hi Sam,

Unfortunately this hasnt worked, I pulled all the BQ code up into the top level notebooks so it isnt in the shared library at all, and still get this wrong buckets issue. Are you running multiple SS jobs in databricks in prod at your company? At a loss as to why I am seeing this, my jobs are so simple...

I thought about trying to set this cluster spark env var (spark.driver.allowMultipleContexts to true, its false in dbricks, but thats the sparkContext anyay so not sure it would make a difference), its the SQL context where these things get set.

Kurt

Ive logged a support ticket with databricks, but they arent that helpful as they have not 'certified' your library.

kurtmaile commented 7 years ago

Hi Sam,

Just a thought, is it in theory possible to move the storgage bucket configuraton out of the SQL context, maybe define it as an option on the DataStreamWriter much like the other options?

e.g

val orderEventsStreamingQuery = 
  orderEventDeltaStreamingOutputDF
    .writeStream
    .format("com.samelamin.spark.bigquery")
    .option("checkpointLocation", bqStreamingOrderEventQueryCheckpointS3Path)
    .option("tableReferenceSink", s"$bqProjectId:$bqSDVDataDataset.$bqOrderEventsTable")
    .option("stagingBucket", "xxxxxxxxxx") <- NEW
    .trigger(Trigger.ProcessingTime("2 minutes"))
    .start()

Would mean I could have more then 1 sink too per job, at the moment its just 1 sink per job, some of the jobs use the same source. Just a thought, racking my brain thinking of options.

samelamin commented 7 years ago

Hi @kurtmaile Thats an idea worth trying, ill push a separate branch and give it a go

In the meantime can you stick in a stack trace. It might give me more information

kurtmaile commented 7 years ago

cool that would be super thanks - will do

samelamin commented 7 years ago

Hey @kurtmaile please try running your notebooks using the above branch and let me know. If this doesnt fix it then Ill add an override to the gcs path

I think the problem is we are storing the gcs bucket in the hadoop config and that gets overwritten

I am assuming in the screenshots you attached earlier, the first job was "customeraccountsevents" then it got overwritten in another job by "inventoryrefdata"

is that correct?

kurtmaile commented 7 years ago

Hey yes that is correct - awesome will do that and report back! Legend thanks so much.

On Fri, 21 Jul 2017 at 11:02, Sam Elamin notifications@github.com wrote:

Hey @kurtmaile https://github.com/kurtmaile please try running your notebooks using the above branch and let me know. If this doesnt fix it then Ill add an override to the gcs path

I think the problem is we are storing the gcs bucket in the hadoop config and that gets overwritten

I am assuming in the screenshots you attached earlier, the first job was "customeraccountsevents" then it got overwritten in another job by "inventoryrefdata"

is that correct?

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/samelamin/spark-bigquery/issues/26#issuecomment-316961378, or mute the thread https://github.com/notifications/unsubscribe-auth/ATIvhWYzWlWNOtBM_hcOCbXQbos8s4rdks5sQHdMgaJpZM4ODLy1 .

kurtmaile commented 7 years ago

Morning Sam,

Thanks for trying this fix branch.

I tried this in UAT this morning and got the following error:

screenshot 2017-07-22 08 41 33
samelamin commented 7 years ago

Hey @kurtmaile try the latest branch, I can see the tables being created. But because I cant replicate your error it might be a false positive

If that is the case we will have to pass in an override every time we write

samelamin commented 7 years ago

Also @kurtmaile I noticed that you are instantiating a new BigQuerySink per job. This might seem silly but its best to rule it out, are you sure you are instantiating a new sparkSession? because the default parameter to your bigquerysink is to "SparkSession.builder().getOrCreate()" which might be the reason you are sharing contexts

Lets just rule it out, if you add the below code to your bigquerysink class then instantiate it in both notebooks.

The first line should always return null, if it doesnt then the values are being overwritten and thats where your problem is

// Add to BigquerySink class
println(spark.sparkContext.hadoopConfiguration.get("test"))
spark.sparkContext.hadoopConfiguration.set("test", bqBucket)
println(spark.sparkContext.hadoopConfiguration.get("test"))
kurtmaile commented 7 years ago

Morning Sam - thanks for this.

I had mentioned that I moved the whole code from that library up into the notebook, so I dont use that bigQuerySink class of mine and the top level notebook has the code now.

Should I add the lines above then into the notebook? Ill send you my notebook via email anyway.

Thanks

samelamin commented 7 years ago

Hey @kurtmaile I set up the gcs bucket to only be written once, have a go but this should stop any overwriting happening

Please build off the gcs_fix branch

kurtmaile commented 7 years ago

Hi Sam have just upgraded to the latest on the gcs_fix branch - will let it run and see what happens, hopefully (fingers crossed) fixes this pesky gcs bucket issue once and for all! Cheers for trying multiple options and helping to try resolve.

kurtmaile commented 7 years ago

so far so good :)

samelamin commented 7 years ago

@kurtmaile Added an override for the gcs bucket, you need to set it via the option

.option("gcsBucket", bucket)
.option("transaction_log", path)
samelamin commented 7 years ago

Fixed on master 0.2.0