GoogleCloudDataproc / spark-bigquery-connector

BigQuery data source for Apache Spark: Read data from BigQuery into DataFrames, write DataFrames into BigQuery tables.
Apache License 2.0
369 stars 193 forks source link

Reading a view results in a empty data frame #1266

Open tom-s-powell opened 1 month ago

tom-s-powell commented 1 month ago

I'm running into a strange situation reading a view from BigQuery using DIRECT method. The view in question contains a Boolean column which I'm attempting to filter on. Running a count on the dataframe returns the correct number of rows, but when collecting the results the dataframe appears empty.

The filter is being pushed down and there are streams returned from the BigQuery storage API.

INFO  [2024-07-23T09:09:57.690200Z] com.google.cloud.bigquery.connector.common.BigQueryClient: DestinationTable is GenericData{classInfo=[datasetId, projectId, tableId], {datasetId=<redacted>, projectId=<redacted>, tableId=<redacted>}}
INFO  [2024-07-23T09:09:57.701162Z] com.google.cloud.bigquery.connector.common.BigQueryClient: running query SELECT * FROM `<redacted>.hub.txn_sales_order_header` FOR SYSTEM_TIME AS OF TIMESTAMP_MILLIS(1721692800000)
INFO  [2024-07-23T09:10:05.171100Z] com.google.cloud.spark.bigquery.direct.DirectBigQueryRelation: |Querying table <redacted>.<redacted>.<redacted> created from "SELECT * FROM `<redacted>.hub.txn…", parameters sent from Spark:|requiredColumns=[],|filter=[(`boolean_flag` = true)]
INFO  [2024-07-23T09:10:06.522624Z] com.google.cloud.spark.bigquery.direct.DirectBigQueryRelation: Used optimized BQ count(*) path. Count: 7906080
INFO  [2024-07-23T09:10:18.972066Z] com.google.cloud.spark.bigquery.direct.DirectBigQueryRelation: |Querying table <redacted>.<redacted>.<redacted> created from "SELECT * FROM `<redacted>.hub.txn…", parameters sent from Spark:|requiredColumns=[<redacted>],|filter=[(`boolean_flag` = true)]
INFO  [2024-07-23T09:10:19.210747Z] com.google.cloud.bigquery.connector.common.BigQueryClientFactory: Channel pool size set to 2
INFO  [2024-07-23T09:10:19.483697Z] com.google.cloud.bigquery.connector.common.ReadSessionCreator: |creation a read session for table {}, parameters: |selectedFields=[<redacted>],|filter=[(`boolean_flag` = true)]
INFO  [2024-07-23T09:10:20.732685Z] com.google.cloud.bigquery.connector.common.ReadSessionCreator: Read session:{"readSessionName":"projects/<redacted>/locations/us/sessions/<redacted>","readSessionCreationStartTime":"2024-07-23T09:10:18.972392754Z","readSessionCreationEndTime":"2024-07-23T09:10:20.719572186Z","readSessionPrepDuration":527,"readSessionCreationDuration":1220,"readSessionDuration":1747}
INFO  [2024-07-23T09:10:20.732949Z] com.google.cloud.bigquery.connector.common.ReadSessionCreator: Received 50 partitions from the BigQuery Storage API for session projects/<redacted>/locations/us/sessions/<redacted>. Notice that the number of streams in actual may be lower than the requested number, depending on the amount parallelism that is reasonable for the table and the maximum amount of parallelism allowed by the system.
INFO  [2024-07-23T09:10:20.735739Z] com.google.cloud.spark.bigquery.direct.BigQueryRDDFactory: Created read session for table '<redacted>.<redacted>.<redacted>': projects/<redacted>/locations/us/sessions/<redacted>

As I say, you can see in the logs above the count returning the correct number of rows - it hits this code path and as I understand it, this should be getting the row count from the TableInfo of the materialized table (since actualTable is passed in) rather than running a count query on the view.

However, I can see the metrics reported for each stream and they are all returning 0 rows and 0 bytes.

INFO  [2024-07-23T09:10:23.825977Z] com.google.cloud.bigquery.connector.common.LoggingBigQueryStorageReadRowsTracer: ReadStream Metrics :{"Stream Name":"projects/<redacted>/locations/us/sessions/<redacted>/streams/GgJpYxoCamYoAg,projects/<redacted>/locations/us/sessions/<redacted>/streams/CAEaAmljGgJqZigC","Started":"2024-07-23T09:10:21.694042175Z","Ended":"2024-07-23T09:10:23.792793440Z","Parse Timings":"Not enough samples.","Time in Spark":"Not enough samples.","Time waiting for service":"Not enough samples.","Bytes/s":0,"Rows/s":0,"Bytes":0,"Rows":0,"I/O time in ms":0} (_requestId: 13cd4c76a4b9334a, mdc.taskName: task 1.0 in stage 4.0 (TID 5))

Any idea how to debug this further? I'll note the materialization project is different to the project containing the view. Unsure this would cause issues though?

vishalkarve15 commented 1 month ago

Can you please share the connector jar being used and the Spark version?

vishalkarve15 commented 1 month ago

Also it would help if you can share the steps to reproduce the issue.

tom-s-powell commented 1 month ago

Spark 3.4 and 0.39.1 of the connector. I'm struggling to get a reliable way of reproducing the error. Will try and get that.

isha97 commented 1 month ago

Thanks, @tom-s-powell What do you mean by reading a view from BigQuery using DIRECT method?

tom-s-powell commented 1 month ago

Ah apologies ignore the "DIRECT" method comment. This is reading a BigQuery view by simply using .load("<view-name">), rather than a custom query.

isha97 commented 1 month ago

Hi @tom-s-powell, a sample code will really help to debug this issue. If you have a small testcase with this behavior, please add it.

tom-s-powell commented 1 month ago

Sorry for the slow reply here. I unfortunately don't have access to the BigQuery account where I'm running into issues so I can't provide too many details on the table/view itself. I've shared a sample of the pyspark code that is failing below. I'll note this is not limited to just views, I've seen the behaviour happen with tables as well.

df = (
    spark_session.read
        .format("bigquery")
        .option("parentProject", "<project>")
        .option("viewsEnabled", "true")
        .option("materializationProject", "<materialization-project>")
        .option("materializationDataset", "<materialization-dataset>")
        .load("<project>.<dataset>.<view>")
)
df = df.filter(F.col("<boolean-column>"))

The strange thing is this seems to be non-deterministic. The first time I ran this it succeeded whereas the second time it returned 0 results. The query plans looked identical and they had the same number of Spark tasks.

In the logs I see the same message of:

com.google.cloud.spark.bigquery.direct.DirectBigQueryRelation: Used optimized BQ count(*) path

This returns non-zero number of rows in both cases so the materialized table seems like it'd have rows given https://github.com/GoogleCloudDataproc/spark-bigquery-connector/blob/master/spark-bigquery-connector-common/src/main/java/com/google/cloud/spark/bigquery/direct/DirectBigQueryRelation.java#L194 is just getting the number of rows off of TableInfo.

In the Spark job where no rows were returned I see the following (metrics show 0 rows/bytes):

INFO  [2024-08-15T19:14:14.163484Z] com.google.cloud.bigquery.connector.common.LoggingBigQueryStorageReadRowsTracer: ReadStream Metrics :{"Stream Name":"projects/<project>/locations/us/sessions/CAISDDRma0RaeHB2Wk9oZRoCamYaAmln/streams/GgJqZhoCaWcoAg,projects/<project>/locations/us/sessions/CAISDDRma0RaeHB2Wk9oZRoCamYaAmln/streams/CAEaAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDDRma0RaeHB2Wk9oZRoCamYaAmln/streams/CAIaAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDDRma0RaeHB2Wk9oZRoCamYaAmln/streams/CAMaAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDDRma0RaeHB2Wk9oZRoCamYaAmln/streams/CAQaAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDDRma0RaeHB2Wk9oZRoCamYaAmln/streams/CAUaAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDDRma0RaeHB2Wk9oZRoCamYaAmln/streams/CAYaAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDDRma0RaeHB2Wk9oZRoCamYaAmln/streams/CAcaAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDDRma0RaeHB2Wk9oZRoCamYaAmln/streams/CAgaAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDDRma0RaeHB2Wk9oZRoCamYaAmln/streams/CAkaAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDDRma0RaeHB2Wk9oZRoCamYaAmln/streams/CAoaAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDDRma0RaeHB2Wk9oZRoCamYaAmln/streams/CAsaAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDDRma0RaeHB2Wk9oZRoCamYaAmln/streams/CAwaAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDDRma0RaeHB2Wk9oZRoCamYaAmln/streams/CA0aAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDDRma0RaeHB2Wk9oZRoCamYaAmln/streams/CA4aAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDDRma0RaeHB2Wk9oZRoCamYaAmln/streams/CA8aAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDDRma0RaeHB2Wk9oZRoCamYaAmln/streams/CBAaAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDDRma0RaeHB2Wk9oZRoCamYaAmln/streams/CBEaAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDDRma0RaeHB2Wk9oZRoCamYaAmln/streams/CBIaAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDDRma0RaeHB2Wk9oZRoCamYaAmln/streams/CBMaAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDDRma0RaeHB2Wk9oZRoCamYaAmln/streams/CBQaAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDDRma0RaeHB2Wk9oZRoCamYaAmln/streams/CBUaAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDDRma0RaeHB2Wk9oZRoCamYaAmln/streams/CBYaAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDDRma0RaeHB2Wk9oZRoCamYaAmln/streams/CBcaAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDDRma0RaeHB2Wk9oZRoCamYaAmln/streams/CBgaAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDDRma0RaeHB2Wk9oZRoCamYaAmln/streams/CBkaAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDDRma0RaeHB2Wk9oZRoCamYaAmln/streams/CBoaAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDDRma0RaeHB2Wk9oZRoCamYaAmln/streams/CBsaAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDDRma0RaeHB2Wk9oZRoCamYaAmln/streams/CBwaAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDDRma0RaeHB2Wk9oZRoCamYaAmln/streams/CB0aAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDDRma0RaeHB2Wk9oZRoCamYaAmln/streams/CB4aAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDDRma0RaeHB2Wk9oZRoCamYaAmln/streams/CB8aAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDDRma0RaeHB2Wk9oZRoCamYaAmln/streams/CCAaAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDDRma0RaeHB2Wk9oZRoCamYaAmln/streams/CCEaAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDDRma0RaeHB2Wk9oZRoCamYaAmln/streams/CCIaAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDDRma0RaeHB2Wk9oZRoCamYaAmln/streams/CCMaAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDDRma0RaeHB2Wk9oZRoCamYaAmln/streams/CCQaAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDDRma0RaeHB2Wk9oZRoCamYaAmln/streams/CCUaAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDDRma0RaeHB2Wk9oZRoCamYaAmln/streams/CCYaAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDDRma0RaeHB2Wk9oZRoCamYaAmln/streams/CCcaAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDDRma0RaeHB2Wk9oZRoCamYaAmln/streams/CCgaAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDDRma0RaeHB2Wk9oZRoCamYaAmln/streams/CCkaAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDDRma0RaeHB2Wk9oZRoCamYaAmln/streams/CCoaAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDDRma0RaeHB2Wk9oZRoCamYaAmln/streams/CCsaAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDDRma0RaeHB2Wk9oZRoCamYaAmln/streams/CCwaAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDDRma0RaeHB2Wk9oZRoCamYaAmln/streams/CC0aAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDDRma0RaeHB2Wk9oZRoCamYaAmln/streams/CC4aAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDDRma0RaeHB2Wk9oZRoCamYaAmln/streams/CC8aAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDDRma0RaeHB2Wk9oZRoCamYaAmln/streams/CDAaAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDDRma0RaeHB2Wk9oZRoCamYaAmln/streams/CDEaAmpmGgJpZygC","Started":"2024-08-15T19:14:11.844322312Z","Ended":"2024-08-15T19:14:14.130096049Z","Parse Timings":"Not enough samples.","Time in Spark":"Not enough samples.","Time waiting for service":"Not enough samples.","Bytes/s":0,"Rows/s":0,"Bytes":0,"Rows":0,"I/O time in ms":0}  

In the Spark job where rows were returned I see the following (metrics show rows/bytes were consumed):

INFO  [2024-08-15T18:50:06.712734Z] com.google.cloud.bigquery.connector.common.LoggingBigQueryStorageReadRowsTracer: ReadStream Metrics :{"Stream Name":"projects/<project>/locations/us/sessions/CAISDHhoQ2ZhNXJDYTNmeBoCamYaAmln/streams/GgJqZhoCaWcoAg,projects/<project>/locations/us/sessions/CAISDHhoQ2ZhNXJDYTNmeBoCamYaAmln/streams/CAEaAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDHhoQ2ZhNXJDYTNmeBoCamYaAmln/streams/CAIaAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDHhoQ2ZhNXJDYTNmeBoCamYaAmln/streams/CAMaAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDHhoQ2ZhNXJDYTNmeBoCamYaAmln/streams/CAQaAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDHhoQ2ZhNXJDYTNmeBoCamYaAmln/streams/CAUaAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDHhoQ2ZhNXJDYTNmeBoCamYaAmln/streams/CAYaAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDHhoQ2ZhNXJDYTNmeBoCamYaAmln/streams/CAcaAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDHhoQ2ZhNXJDYTNmeBoCamYaAmln/streams/CAgaAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDHhoQ2ZhNXJDYTNmeBoCamYaAmln/streams/CAkaAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDHhoQ2ZhNXJDYTNmeBoCamYaAmln/streams/CAoaAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDHhoQ2ZhNXJDYTNmeBoCamYaAmln/streams/CAsaAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDHhoQ2ZhNXJDYTNmeBoCamYaAmln/streams/CAwaAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDHhoQ2ZhNXJDYTNmeBoCamYaAmln/streams/CA0aAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDHhoQ2ZhNXJDYTNmeBoCamYaAmln/streams/CA4aAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDHhoQ2ZhNXJDYTNmeBoCamYaAmln/streams/CA8aAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDHhoQ2ZhNXJDYTNmeBoCamYaAmln/streams/CBAaAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDHhoQ2ZhNXJDYTNmeBoCamYaAmln/streams/CBEaAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDHhoQ2ZhNXJDYTNmeBoCamYaAmln/streams/CBIaAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDHhoQ2ZhNXJDYTNmeBoCamYaAmln/streams/CBMaAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDHhoQ2ZhNXJDYTNmeBoCamYaAmln/streams/CBQaAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDHhoQ2ZhNXJDYTNmeBoCamYaAmln/streams/CBUaAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDHhoQ2ZhNXJDYTNmeBoCamYaAmln/streams/CBYaAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDHhoQ2ZhNXJDYTNmeBoCamYaAmln/streams/CBcaAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDHhoQ2ZhNXJDYTNmeBoCamYaAmln/streams/CBgaAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDHhoQ2ZhNXJDYTNmeBoCamYaAmln/streams/CBkaAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDHhoQ2ZhNXJDYTNmeBoCamYaAmln/streams/CBoaAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDHhoQ2ZhNXJDYTNmeBoCamYaAmln/streams/CBsaAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDHhoQ2ZhNXJDYTNmeBoCamYaAmln/streams/CBwaAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDHhoQ2ZhNXJDYTNmeBoCamYaAmln/streams/CB0aAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDHhoQ2ZhNXJDYTNmeBoCamYaAmln/streams/CB4aAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDHhoQ2ZhNXJDYTNmeBoCamYaAmln/streams/CB8aAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDHhoQ2ZhNXJDYTNmeBoCamYaAmln/streams/CCAaAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDHhoQ2ZhNXJDYTNmeBoCamYaAmln/streams/CCEaAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDHhoQ2ZhNXJDYTNmeBoCamYaAmln/streams/CCIaAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDHhoQ2ZhNXJDYTNmeBoCamYaAmln/streams/CCMaAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDHhoQ2ZhNXJDYTNmeBoCamYaAmln/streams/CCQaAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDHhoQ2ZhNXJDYTNmeBoCamYaAmln/streams/CCUaAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDHhoQ2ZhNXJDYTNmeBoCamYaAmln/streams/CCYaAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDHhoQ2ZhNXJDYTNmeBoCamYaAmln/streams/CCcaAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDHhoQ2ZhNXJDYTNmeBoCamYaAmln/streams/CCgaAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDHhoQ2ZhNXJDYTNmeBoCamYaAmln/streams/CCkaAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDHhoQ2ZhNXJDYTNmeBoCamYaAmln/streams/CCoaAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDHhoQ2ZhNXJDYTNmeBoCamYaAmln/streams/CCsaAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDHhoQ2ZhNXJDYTNmeBoCamYaAmln/streams/CCwaAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDHhoQ2ZhNXJDYTNmeBoCamYaAmln/streams/CC0aAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDHhoQ2ZhNXJDYTNmeBoCamYaAmln/streams/CC4aAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDHhoQ2ZhNXJDYTNmeBoCamYaAmln/streams/CC8aAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDHhoQ2ZhNXJDYTNmeBoCamYaAmln/streams/CDAaAmpmGgJpZygC,projects/<project>/locations/us/sessions/CAISDHhoQ2ZhNXJDYTNmeBoCamYaAmln/streams/CDEaAmpmGgJpZygC","Started":"2024-08-15T18:49:52.258943954Z","Ended":"2024-08-15T18:50:06.678977966Z","Parse Timings":"Average: PT0.003503972S Samples: 278","Time in Spark":"Average: PT0.039637556S","Time waiting for service":"Average: PT0.000120159S Samples: 278","Bytes/s":1506461333,"Rows/s":164397,"Bytes":49713224,"Rows":160123,"I/O time in ms":33}