flyteorg / flyte

Scalable and flexible workflow orchestration platform that seamlessly unifies data, ML and analytics stacks.
https://flyte.org
Apache License 2.0
5.53k stars 596 forks source link

[BUG] BigQuery structured dataset shouldn't `append` data when creating #5241

Open austin362667 opened 5 months ago

austin362667 commented 5 months ago

Describe the bug

While creating

data = [
    {
        'company': 'XYZ pvt ltd',
        'location': 'London',
        'info': {
            'president': 'Rakesh Kapoor',
            'contacts': {
                'email': 'contact@xyz.com',
                'tel': '9876543210'
            }
        }
    },
    {
        'company': 'ABC pvt ltd',
        'location': 'USA',
        'info': {
            'president': 'Kapoor Rakesh',
            'contacts': {
                'email': 'contact@abc.com',
                'tel': '0123456789'
            }
        }
    }
]
df = pd.json_normalize(data, max_level=0)
bq_sd = StructuredDataset(
        dataframe=df,
        uri= "bq://flyte-austin362667-gcp:dataset.nested_type"
    )

It appends data every time rather than just overriding it.

Expected behavior

In above example, it should only show two rows always.

Company Location President Email Telephone
XYZ pvt ltd London Rakesh Kapoor contact@xyz.com 9876543210
ABC pvt ltd USA Kapoor Rakesh contact@abc.com 0123456789

Additional context to reproduce

No response

Screenshots

However, we get 8 rows when creating same structured dataset 4 times. image

Are you sure this issue hasn't been raised already?

Have you read the Code of Conduct?

dosubot[bot] commented 5 months ago

To address the issue of your data being appended instead of overwritten in BigQuery when using StructuredDataset in Flyte, you need to ensure that the BigQuery job configuration specifies to overwrite the existing data. This behavior is controlled by the WriteDisposition setting in the BigQuery job configuration, which should be set to WRITE_TRUNCATE to overwrite existing table data.

Here's a brief overview of how you might adjust your configuration or code to achieve the desired behavior:

  1. Adjust BigQuery Job Configuration: Ensure that any BigQuery job handling data operations with StructuredDataset has its WriteDisposition parameter set to WRITE_TRUNCATE. This instructs BigQuery to overwrite the existing data in the table.

  2. Code Example for Overwriting Data: If you're directly interacting with BigQuery through its API or client libraries, here's how you can configure a job to overwrite data:

    from google.cloud import bigquery
    
    # Construct a BigQuery client object.
    client = bigquery.Client()
    
    # Configure the query job to overwrite existing data
    job_config = bigquery.QueryJobConfig(
       write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE,
    )
    
    # Example query
    query = "SELECT * FROM your_source_table"
    
    # Start the query, passing in the extra configuration.
    query_job = client.query(query, job_config=job_config)
    
    # Wait for the job to complete
    query_job.result()
    
    print("Data overwritten successfully.")
  3. Flyte Configuration: If you're working within the Flyte framework, ensure that the task or workflow that triggers the BigQuery operation is correctly configured to use WRITE_TRUNCATE. This might involve adjusting the task's plugin configuration or the way you define the BigQuery operation within your Flyte tasks.

By setting the WriteDisposition to WRITE_TRUNCATE, you instruct BigQuery to overwrite the existing data, which should resolve the issue of data being appended instead of replaced.

Sources

#### About Dosu This response is meant to be useful and save you time. It is **not meant to be a precise solution**, but rather a starting point for your own research. Help me be more useful! Please leave a 👍 if this is helpful and 👎 if it is irrelevant. If you want to continue the conversation, start your reply with @dosu-bot.
arbaobao commented 2 months ago

take