KlubJagiellonski / pola-backend

Pola pomoże Ci odnaleźć polskie wyroby. Zabierając Polę na zakupy odnajdujesz produkty “z duszą” i wspierasz polską gospodarkę.
https://www.pola-app.pl
BSD 3-Clause "New" or "Revised" License
36 stars 12 forks source link

Replikacja inkrementalna #3700

Open mik-laj opened 2 months ago

mik-laj commented 2 months ago

Hej,

Na razie wszystkie tabele podczas replikacji z PostgresSQL do BigQuery są za każdym razem czyszczone. Musimy się zastanowić, czy nie warto by zrobić replikacji inkrementalne i replikować tylko zmienione dane, zwłaszcza dla tabeli z zapytaniami. To pewnie wymaga popatrzenia na koszty ile aktualnie kosztuje ta replikacja.

Pozdrawiam,

mik-laj commented 2 months ago

Z ciekawości spytałem się ChatGPT i dostałem taki skrypt:

import csv
import argparse
import psycopg2
from psycopg2.extras import RealDictCursor
from google.cloud import storage, bigquery

def get_last_replicated_id(bigquery_client, dataset_name, table_name, column_name):
    query = f"SELECT MAX({column_name}) AS last_id FROM `{dataset_name}.{table_name}`"
    query_job = bigquery_client.query(query)
    results = query_job.result()
    for row in results:
        return row.last_id if row.last_id else 0

def export_to_csv(cursor, filename, column_name, last_replicated_id):
    with open(filename, 'w', newline='') as csvfile:
        csvwriter = csv.writer(csvfile)
        cursor.execute(f"SELECT * FROM your_table WHERE {column_name} > %s", (last_replicated_id,))
        rows = cursor.fetchall()
        for row in rows:
            csvwriter.writerow(row)

def upload_to_gcs(bucket_name, source_file_name, destination_blob_name):
    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)
    blob = bucket.blob(destination_blob_name)
    blob.upload_from_filename(source_file_name)

def load_csv_to_bigquery(dataset_name, table_name, uri):
    bigquery_client = bigquery.Client()
    dataset_ref = bigquery_client.dataset(dataset_name)
    table_ref = dataset_ref.table(table_name)
    job_config = bigquery.LoadJobConfig()
    job_config.source_format = bigquery.SourceFormat.CSV
    job_config.autodetect = True
    load_job = bigquery_client.load_table_from_uri(uri, table_ref, job_config=job_config)
    load_job.result()

def main():
    parser = argparse.ArgumentParser(description='Data pipeline script from PostgreSQL to BigQuery.')
    parser.add_argument('--db_uri', type=str, required=True, help='Database URI for PostgreSQL connection.')
    parser.add_argument('--bucket_name', type=str, required=True, help='GCS bucket name.')
    parser.add_argument('--dataset_name', type=str, required=True, help='BigQuery dataset name.')
    parser.add_argument('--table_name', type=str, required=True, help='BigQuery table name.')
    parser.add_argument('--column_name', type=str, default='id', help='Column name used for replication.')

    args = parser.parse_args()

    # Connect to PostgreSQL database
    conn = psycopg2.connect(args.db_uri, cursor_factory=RealDictCursor)
    cursor = conn.cursor()

    # Initialize BigQuery client
    bigquery_client = bigquery.Client()

    # Get last replicated ID
    last_replicated_id = get_last_replicated_id(bigquery_client, args.dataset_name, args.table_name, args.column_name)

    # Export to CSV
    csv_file = 'output.csv'
    export_to_csv(cursor, csv_file, args.column_name, last_replicated_id)

    # Upload to GCS
    gcs_uri = f'gs://{args.bucket_name}/{csv_file}'
    upload_to_gcs(args.bucket_name, csv_file, csv_file)

    # Load to BigQuery
    load_csv_to_bigquery(args.dataset_name, args.table_name, gcs_uri)

    # Cleanup or log success
    print("Data pipeline execution completed successfully.")
    cursor.close()
    conn.close()

if __name__ == '__main__':
    main()