def connect_to_postgres(
self,
host: str = None,
dbname: str = None,
user: str = None,
password: str = None,
port: int = None,
):
"""
Connect to postgres using the psycopg2 connector. This is just a helper function to set [`vn.run_sql`][vanna.base.base.VannaBase.run_sql]
**Example:**
```python
vn.connect_to_postgres(
host="myhost",
dbname="mydatabase",
user="myuser",
password="mypassword",
port=5432
)
Args:
host (str): The postgres host.
dbname (str): The postgres database name.
user (str): The postgres user.
password (str): The postgres password.
port (int): The postgres Port.
"""
try:
import psycopg2
import psycopg2.extras
except ImportError:
raise DependencyError(
"You need to install required dependencies to execute this method,"
" run command: \npip install vanna[postgres]"
)
if not host:
host = os.getenv("HOST")
if not host:
raise ImproperlyConfigured("Please set your postgres host")
if not dbname:
dbname = os.getenv("DATABASE")
if not dbname:
raise ImproperlyConfigured("Please set your postgres database")
if not user:
user = os.getenv("PG_USER")
if not user:
raise ImproperlyConfigured("Please set your postgres user")
if not password:
password = os.getenv("PASSWORD")
if not password:
raise ImproperlyConfigured("Please set your postgres password")
if not port:
port = os.getenv("PORT")
if not port:
raise ImproperlyConfigured("Please set your postgres port")
conn = None
try:
conn = psycopg2.connect(
host=host,
dbname=dbname,
user=user,
password=password,
port=port,
)
except psycopg2.Error as e:
raise ValidationError(e)
def run_sql_postgres(sql: str) -> Union[pd.DataFrame, None]:
if conn:
try:
cs = conn.cursor()
cs.execute(sql)
results = cs.fetchall()
# Create a pandas dataframe from the results
df = pd.DataFrame(
results, columns=[desc[0] for desc in cs.description]
)
return df
except psycopg2.Error as e:
conn.rollback()
raise ValidationError(e)
except Exception as e:
conn.rollback()
raise e
self.dialect = "PostgreSQL"
self.run_sql_is_set = True
self.run_sql = run_sql_postgres