GoogleCloudDataproc / spark-bigquery-connector

BigQuery data source for Apache Spark: Read data from BigQuery into DataFrames, write DataFrames into BigQuery tables.
Apache License 2.0
378 stars 198 forks source link

`bigquery.tables.create` Permission required on appending data to a BigQuery Table #1277

Open smic-datalabs-jdcastro opened 3 months ago

smic-datalabs-jdcastro commented 3 months ago

I am trying to restrict the permissions of a service account to only be able to execute DML statements (e.g. Insert, Update and Delete queries) to a BigQuery Table.

I have created a custom IAM Role derived from BigQuery Data Editor predefined role and essentially removed other unnecessary permissions including the bigquery.tables.create permission.

I have assigend this custom role to the Service Account, but upon execution it outputs an error: "Permission bigquery.tables.create denied on dataset..."

Here is the code snippet on how I append data to the table:

save_df_stream = ( df_stream.writeStream
    .outputMode("append")
    .format("bigquery")
    .options(**options_config)
    .trigger(availableNow = True)
    .start()  
)

Does outputMode("append") really create a table before it loads the data into the table?

isha97 commented 3 months ago

Hi @smic-datalabs-jdcastro ,

Can you please share the options_config that you are using?

smic-datalabs-jdcastro commented 3 months ago

Hi @smic-datalabs-jdcastro ,

Can you please share the options_config that you are using?

Hi @isha97,

Just a bunch of custom fields:

{
  "partitionType": ...,
  "partitionField": ...,
  "temporaryGcsBucket": ...,
  "project": ...,
  "dataset": ...,
  "table": ...,
  "checkpointLocation": ...,
  "allowFieldAddition": True
}
vishalkarve15 commented 3 weeks ago

@smic-datalabs-jdcastro Can you please share how df_stream is created? It might give some insight into debugging this issue.

smic-datalabs-jdcastro commented 3 weeks ago

Hi @vishalkarve15 , kindly see code snippet below for your reference:

stream_config = {
    "cloudFiles.format": file_format,
    "cloudFiles.validateOptions": "true",
    "cloudFiles.inferColumnTypes": "false",
    "cloudFiles.schemaEvolutionMode": "rescue",
    "cloudFiles.schemaLocation": "<path_to_schema>",
    "ignoreMissingFiles": "true",
    "ignoreLeadingWhiteSpace": "false",
    "ignoreTrailingWhiteSpace": "false",
    "readerCaseSensitive": "false"
}

df_stream = (
        spark.readStream
             .format("cloudFiles")
             .options(**stream_config)
             .load("/mnt/gcs_bucket/path/to/object")
             .withColumn("data_ingest_timestamp", lit(ingestion_time).cast("timestamp"))
             .withColumn("raw_file_path", "<path_to_filename>")
    )

Thank you