PhilippGerstung / etl

Python ETL Process that loads data from tankerkoenig and loads them into a duckdb table
0 stars 0 forks source link

Setup python pipeline #1

Closed The-Aniliner-Gerstung closed 3 weeks ago

The-Aniliner-Gerstung commented 1 month ago
The-Aniliner-Gerstung commented 1 month ago

Check this script

import duckdb
import config
from loguru import logger

DAYS_TO_LOAD = 5

# establish connection to DuckDB
connection = duckdb.connect(database=(config.DB_DIR / "prices_2.db").as_posix())

# create a cursor object
cursor = connection.cursor()

# specify the table name and CSV file path
table_name = 'prices'
stations_table = "stations"
view_name = 'gas_prices_view'
csv_file_path = config.PRICES_EXAMPLE_FILE.as_posix()

logger.info(f"Creating table {table_name}")
# drop table if already exists
cursor.execute(f"DROP TABLE IF EXISTS {table_name};")
cursor.execute(f"CREATE TABLE IF NOT EXISTS {table_name} AS FROM read_csv({[p.as_posix() for p in config.ALL_PRICES_CSV_FILE_PATHS[-DAYS_TO_LOAD:]]});")
logger.success(f"Table {table_name} created successfully")

logger.info(f"Creating date index")
cursor.execute(f"CREATE INDEX date_idx ON {table_name} (\"date\");")
logger.success("Date index created successfully")

# Create index on column station_uuid
logger.info(f"Creating station_uuid index")
cursor.execute(f"CREATE INDEX station_uuid_idx ON {table_name} (\"station_uuid\");")
logger.success("station_uuid index created successfully")

logger.info("Creating stations table")
cursor.execute(f"""
CREATE OR REPLACE TABLE {stations_table} (
    uuid VARCHAR,
    name VARCHAR,
    brand VARCHAR,
    street VARCHAR,
    house_number VARCHAR,
    post_code VARCHAR,
    city VARCHAR,
    latitude DOUBLE,
    longitude DOUBLE,
    first_active TIMESTAMP WITH TIME ZONE,
    openingtimes_json JSON
);""")

cursor.execute(f"CREATE UNIQUE INDEX uuid_idx ON {stations_table} (\"uuid\");")

def load_and_merge_csv_files():
        for file_path in config.ALL_STATIONS_CSV_FILE_PATHS[-DAYS_TO_LOAD:]:
                print(f"Processing file: {file_path}")

                # Upsert logic using DuckDB's read_csv function
                upsert_query = f"""
                INSERT INTO {stations_table}
                SELECT * FROM read_csv_auto('{file_path.as_posix()}')
                ON CONFLICT (uuid) DO UPDATE SET
                    name = EXCLUDED.name,
                    brand = EXCLUDED.brand,
                    street = EXCLUDED.street,
                    house_number = EXCLUDED.house_number,
                    post_code = EXCLUDED.post_code,
                    city = EXCLUDED.city,
                    latitude = EXCLUDED.latitude,
                    longitude = EXCLUDED.longitude,
                    first_active = EXCLUDED.first_active,
                    openingtimes_json = EXCLUDED.openingtimes_json;
                """
                cursor.execute(upsert_query)

# Create view
logger.info(f"Creating view")
cursor.execute(f"""
    CREATE OR REPLACE VIEW {view_name} AS
        SELECT 
        station_uuid,
        WEEKDAY(date) AS weekday,
        HOUR(date) AS hour_of_day,
        AVG(e5) AS avg_e5,
        MEDIAN(e5) AS median_e5,
        MIN(e5) AS min_e5,
        MAX(e5) AS max_e5,
        AVG(e10) AS avg_e10,
        MEDIAN(e10) AS median_e10,
        MIN(e10) AS min_e10,
        MAX(e10) AS max_e10,
        AVG(diesel) AS avg_diesel,
        MEDIAN(diesel) AS median_diesel,
        MIN(diesel) AS min_diesel,
        MAX(diesel) AS max_diesel
    FROM 
        {table_name}
    GROUP BY 
        station_uuid,
        WEEKDAY(date),
        HOUR(date);
""")
logger.success("View created successfully")

load_and_merge_csv_files()

# commit the changes
connection.commit()

# close the connection
connection.close()