GoogleCloudDataproc / spark-bigquery-connector

BigQuery data source for Apache Spark: Read data from BigQuery into DataFrames, write DataFrames into BigQuery tables.
Apache License 2.0
376 stars 197 forks source link

INVALID_ARGUMENT: request failed: Row filter for table #165

Closed kumgaurav closed 4 years ago

kumgaurav commented 4 years ago

I did a simple select using spark.read.bigquery. it works fine, the moment I do join with other table it breaks with error saying invalid filter. Below is the code snippet val objkdf = spark.read.bigquery(tableNameObjk).select("obknr","matnr","sernr") val zorvbakdf = vbakdf.filter("auart = 'ZOR'") val zor_p01_df = zorvbakdf.join(vbfa_g_df, zorvbakdf("vbeln") === vbfa_g_df("vbelv"), "left_outer") log.info("zor_p01_df -> "+zor_p01_df.count())

20/04/29 18:45:58 ERROR com.paloaltonetworks.sap.CreateSapTableForLookupV1$: com.google.cloud.spark.bigquery.repackaged.io.grpc.StatusRuntimeException: INVALID_ARGUMENT: request failed: Row filter for table itd-aia-datalake:ecpclnt900._sbc_d5276f0927cd4289ae93360bb75d0899 is invalid. Filter is '(auart = 'ZOR' AND auart IS NOT NULL)' com.google.cloud.spark.bigquery.repackaged.com.google.api.gax.rpc.InvalidArgumentException: com.google.cloud.spark.bigquery.repackaged.io.grpc.StatusRuntimeException: INVALID_ARGUMENT: request failed: Row filter for table itd-aia-datalake:ecpclnt900._sbc_d5276f0927cd4289ae93360bb75d0899 is invalid. Filter is '(auart = 'ZOR' AND auart IS NOT NULL)'

davidrabinowitz commented 4 years ago

Hi @kumgaurav ,

I don't see the definition of vbakdf, but I assume that either this table or zorvbakdf are views. It seems that the view has been materialized and cached without the auart field. Can you please run the equivalent SELECT ... FROM the_view where (auart = 'ZOR' AND auart IS NOT NULL) in the BigQuery console just to make sure it is valid? Also, can you please share a more accurate code snippet? Thanks!

kumgaurav commented 4 years ago

Sorry it was typo, you are correct I am reading view but in my select I have the column selected here is the correct snippet-

val tableNameVbak = env + "." + sapSchema + "." + "vbak"
val vbakdf = spark.read.bigquery(tableNameVbak).select("vbeln", "auart")
vbfadf = spark.read.bigquery(tableNameVbfa).select("vbelv", "vbeln", "vbtyp_n")
val vbfa_g_df = vbfadf.filter("vbtyp_n = 'G'")
val zorvbakdf = vbakdf.filter("auart = 'ZOR'")
log.info("zorvbakdf -> "+zorvbakdf.count())
val zor_p01_df = zorvbakdf.join(vbfa_g_df, zorvbakdf("vbeln") === vbfa_g_df("vbelv"), "left_outer")
//zor_p01_df.show(10)
log.info("zor_p01_df -> "+zor_p01_df.count())

Below is the output from query directly in BQ

SELECT vbeln, auart FROM `###.ecpclnt900.vbak` vbak where (auart = 'ZOR' AND auart IS NOT NULL) limit 1;

Row | vbeln | auart |  
-- | -- | -- | --
1 | 0010000000 | ZOR |  
davidrabinowitz commented 4 years ago

Sorry for the nitpicking, but I assume that the actual dataframes are read using spark.read.format("bigquery").option("table",...).load().select(...)

Also, can you add the logs of the application? the connector emits INFO logs about the data read from BigQuery, and DEBUG logs with the queries creating the materialized views tables.

kumgaurav commented 4 years ago

the above code is used for reading actual data frame. I initialize the spark conf like below as per documentation- val sparkConf = new SparkConf().setAppName("SapLookup").set("credentialsFile", conf.credentialsStreamFilePath).set("viewsEnabled", "true")

this allows me to read view as table val vbakdf = spark.read.bigquery(tableNameVbak).select("vbeln", "auart")

should I try the way above you mentioned ?

davidrabinowitz commented 4 years ago

Sorry, I guess I missed that. Your version is fine :-)

Can you please add the logs?

KhileshChauhan commented 2 years ago

Hi David, I was experimenting with some data analysis task on my personal PC and faced similar issue. Reading from view works fine, but reading for view and applying filter gives error. Is there any workaround/resolution for this error ?