MattTriano / analytics_data_where_house

An analytics engineering sandbox focusing on real estates prices in Cook County, IL
https://docs.analytics-data-where-house.dev/
GNU Affero General Public License v3.0
9 stars 0 forks source link

Develop an interface to access data tables from an external notebook or other outside analysis environment #127

Open MattTriano opened 1 year ago

MattTriano commented 1 year ago

I'm not sure if this should be in this repo or elsewhere, but for development purposes, this covers core functionality. In anything besides development on a local machine, the interface should probably change to require the user to provide the connection string.

from pathlib import Path

from sqlalchemy import create_engine
from sqlalchemy import inspect, text

class DWHInspector:
    def __init__(self, env_path: Path):
        self.engine = self.create_engine_from_env_file(env_path)

    def create_engine_from_env_file(self, env_path: Path):
        if env_path.is_file():
            with open(env_path, "r") as f:
                file_lines = f.readlines()
            contents = [l.replace("\n", "").split("=", 1) for l in file_lines if l != "\n" and not l.startswith("#")]
            dwh_conn_str = [el[1] for el in contents if el[0] == "AIRFLOW_CONN_DWH_DB_CONN"][0]
            dwh_conn_str = dwh_conn_str.replace("postgres", "postgresql", 1)
            dwh_conn_str = dwh_conn_str.replace("dwh_db:5432", "localhost:5431", 1)
            return create_engine(dwh_conn_str)

    def get_data_schema_names(self) -> List:
        insp = inspect(self.engine)
        return insp.get_schema_names()

    def get_data_table_names_in_schema(self, schema_name: str) -> List:
        insp = inspect(self.engine)
        return insp.get_table_names(schema=schema_name)

    def print_schema_names(self) -> None:
        print("Schemas in the Data Warehouse")
        for schema_name in self.get_data_schema_names():
            print(f"  - {schema_name}")

    def print_table_names_in_schema(self, schema_name: str) -> None:
        print(f"Tables in the {schema_name} schema")
        for table_name in self.get_data_table_names_in_schema(schema_name=schema_name):
            print(f"  - {schema_name}.{table_name}")

    def execute_result_returning_query(self, query: str) -> pd.DataFrame:
        with self.engine.connect() as conn:
            result = conn.execute(text(query))
            results_df = pd.DataFrame(result.fetchall(), columns=result.keys())
            if self.engine._is_future:
                conn.commit()
        return results_df

    def execute_structural_command(self, query: str) -> None:
        with self.engine.connect() as conn:
            with conn.begin():
                conn.execute(text(query))