move-coop / parsons

A python library of connectors for the progressive community.
https://www.parsonsproject.org/
Other
260 stars 132 forks source link

[Bug] Parsons' BigQuery Upsert is Broken #992

Closed Jason94 closed 7 months ago

Jason94 commented 7 months ago

The upsert functionality in the Parsons BigQuery connector seems to be broken.

Detailed Description

Exception has occurred: TypeError       (note: full exception trace is shown but execution is paused at: _run_module_as_main)
Client.load_table_from_uri() got an unexpected keyword argument 'template_table'
  File "C:\...\parsons\google\google_bigquery.py", line 461, in copy_from_gcs
    load_job = self.client.load_table_from_uri(
  File "C:\...\parsons\google\google_bigquery.py", line 818, in copy
    self.copy_from_gcs(
  File "C:\...\parsons\google\google_bigquery.py", line 949, in upsert
    self.copy(
  File "C:\...\upsert_test.py", line 29, in main
    bq.upsert(data2, table_name, "a")  # This errors
  File "C:\...\upsert_test.py", line 35, in <module>
    main()
  File "C:\Python310\Lib\runpy.py", line 86, in _run_code
    exec(code, run_globals)
  File "C:\Python310\Lib\runpy.py", line 196, in _run_module_as_main (Current frame)
    return _run_code(code, main_globals, None,
TypeError: Client.load_table_from_uri() got an unexpected keyword argument 'template_table'

The error seems to be that the Google Python BigQuery client's method load_table_from_uri does not take a template_table argument. But that is what's passed into the parsons.BigQuery.copy() method here.

To Reproduce

  1. Make sure you have a .env file with a JSON service account credential set up under the GOOGLE_APPLICATION_CREDENTIALS environmental variable. I also set up a GCS_TEMP_BUCKET env variable as well.
  2. Create a test dataset in your project (or change the dataset in the script below).
  3. Run this script:
import dotenv
from parsons import Table
from parsons.google.google_bigquery import GoogleBigQuery

dotenv.load_dotenv(override=True)

def main():
    bq = GoogleBigQuery()
    table_name = "test.upsert_test_data"

    data = Table([{"a": 1, "b": 2}, {"a": 3, "b": 4}])

    bq.copy(data, table_name, if_exists="drop")

    data2 = Table([{"a": 1, "b": 20}, {"a": 5, "b": 6}])

    bq.upsert(data2, table_name, "a")  # This errors

    print(bq.query(f"SELECT * FROM {table_name}"))

if __name__ == "__main__":
    main()

Your Environment

Priority

This is high priority for me, since we're migrating to BigQuery with TMC and have several scripts that rely on upsert functionality.