tekon92 / airflow-docker-etl

create your first mini etl using airflow.
0 stars 0 forks source link

pg_dwh not defined #1

Open designet opened 2 years ago

designet commented 2 years ago

Hi Rian,

I am trying to get your project running. Maybe you can help me with this one? I cannot find anywhere in the code where the pg_dwh connection is defined.

aee4fc0d1fcd
*** Reading local file: /opt/bitnami/airflow/logs/dag_final_project_de/run_dim_district/2022-05-15T13:21:57.416088+00:00/1.log
[2022-05-15 13:22:36,694] {taskinstance.py:1035} INFO - Dependencies all met for <TaskInstance: dag_final_project_de.run_dim_district manual__2022-05-15T13:21:57.416088+00:00 [queued]>
[2022-05-15 13:22:36,736] {taskinstance.py:1035} INFO - Dependencies all met for <TaskInstance: dag_final_project_de.run_dim_district manual__2022-05-15T13:21:57.416088+00:00 [queued]>
[2022-05-15 13:22:36,737] {taskinstance.py:1241} INFO - 
--------------------------------------------------------------------------------
[2022-05-15 13:22:36,737] {taskinstance.py:1242} INFO - Starting attempt 1 of 1
[2022-05-15 13:22:36,737] {taskinstance.py:1243} INFO - 
--------------------------------------------------------------------------------
[2022-05-15 13:22:36,762] {taskinstance.py:1262} INFO - Executing <Task(PostgresOperator): run_dim_district> on 2022-05-15 13:21:57.416088+00:00
[2022-05-15 13:22:36,771] {standard_task_runner.py:52} INFO - Started process 201 to run task
[2022-05-15 13:22:36,798] {standard_task_runner.py:76} INFO - Running: ['airflow', 'tasks', 'run', 'dag_final_project_de', 'run_dim_district', 'manual__2022-05-15T13:21:57.416088+00:00', '--job-id', '14', '--raw', '--subdir', 'DAGS_FOLDER/dag_final_project_de.py', '--cfg-path', '/tmp/tmpu88zkwdx', '--error-file', '/tmp/tmpv0xgb7x9']
[2022-05-15 13:22:36,799] {standard_task_runner.py:77} INFO - Job 14: Subtask run_dim_district
[2022-05-15 13:22:36,957] {logging_mixin.py:109} INFO - Running <TaskInstance: dag_final_project_de.run_dim_district manual__2022-05-15T13:21:57.416088+00:00 [running]> on host aee4fc0d1fcd
[2022-05-15 13:22:37,192] {taskinstance.py:1427} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=rian_pauzi
AIRFLOW_CTX_DAG_ID=dag_final_project_de
AIRFLOW_CTX_TASK_ID=run_dim_district
AIRFLOW_CTX_EXECUTION_DATE=2022-05-15T13:21:57.416088+00:00
AIRFLOW_CTX_DAG_RUN_ID=manual__2022-05-15T13:21:57.416088+00:00
[2022-05-15 13:22:37,336] {taskinstance.py:1703} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/opt/bitnami/airflow/venv/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1332, in _run_raw_task
    self._execute_task_with_callbacks(context)
  File "/opt/bitnami/airflow/venv/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1458, in _execute_task_with_callbacks
    result = self._execute_task(context, self.task)
  File "/opt/bitnami/airflow/venv/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1514, in _execute_task
    result = execute_callable(context=context)
  File "/opt/bitnami/airflow/venv/lib/python3.8/site-packages/airflow/providers/postgres/operators/postgres.py", line 69, in execute
    self.hook.run(self.sql, self.autocommit, parameters=self.parameters)
  File "/opt/bitnami/airflow/venv/lib/python3.8/site-packages/airflow/hooks/dbapi.py", line 198, in run
    with closing(self.get_conn()) as conn:
  File "/opt/bitnami/airflow/venv/lib/python3.8/site-packages/airflow/providers/postgres/hooks/postgres.py", line 88, in get_conn
    conn = deepcopy(self.connection or self.get_connection(conn_id))
  File "/opt/bitnami/airflow/venv/lib/python3.8/site-packages/airflow/hooks/base.py", line 68, in get_connection
    conn = Connection.get_connection_from_secrets(conn_id)
  File "/opt/bitnami/airflow/venv/lib/python3.8/site-packages/airflow/models/connection.py", line 410, in get_connection_from_secrets
    raise AirflowNotFoundException(f"The conn_id `{conn_id}` isn't defined")
airflow.exceptions.AirflowNotFoundException: The conn_id `pg_dwh` isn't defined
[2022-05-15 13:22:37,358] {taskinstance.py:1270} INFO - Marking task as FAILED. dag_id=dag_final_project_de, task_id=run_dim_district, execution_date=20220515T132157, start_date=20220515T132236, end_date=20220515T132237
[2022-05-15 13:22:37,415] {standard_task_runner.py:88} ERROR - Failed to execute job 14 for task run_dim_district
Traceback (most recent call last):
  File "/opt/bitnami/airflow/venv/lib/python3.8/site-packages/airflow/task/task_runner/standard_task_runner.py", line 85, in _start_by_fork
    args.func(args, dag=self.dag)
  File "/opt/bitnami/airflow/venv/lib/python3.8/site-packages/airflow/cli/cli_parser.py", line 48, in command
    return func(*args, **kwargs)
  File "/opt/bitnami/airflow/venv/lib/python3.8/site-packages/airflow/utils/cli.py", line 92, in wrapper
    return f(*args, **kwargs)
  File "/opt/bitnami/airflow/venv/lib/python3.8/site-packages/airflow/cli/commands/task_command.py", line 292, in task_run
    _run_task_by_selected_method(args, dag, ti)
  File "/opt/bitnami/airflow/venv/lib/python3.8/site-packages/airflow/cli/commands/task_command.py", line 107, in _run_task_by_selected_method
    _run_raw_task(args, ti)
  File "/opt/bitnami/airflow/venv/lib/python3.8/site-packages/airflow/cli/commands/task_command.py", line 180, in _run_raw_task
    ti._run_raw_task(
  File "/opt/bitnami/airflow/venv/lib/python3.8/site-packages/airflow/utils/session.py", line 70, in wrapper
    return func(*args, session=session, **kwargs)
  File "/opt/bitnami/airflow/venv/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1332, in _run_raw_task
    self._execute_task_with_callbacks(context)
  File "/opt/bitnami/airflow/venv/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1458, in _execute_task_with_callbacks
    result = self._execute_task(context, self.task)
  File "/opt/bitnami/airflow/venv/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1514, in _execute_task
    result = execute_callable(context=context)
  File "/opt/bitnami/airflow/venv/lib/python3.8/site-packages/airflow/providers/postgres/operators/postgres.py", line 69, in execute
    self.hook.run(self.sql, self.autocommit, parameters=self.parameters)
  File "/opt/bitnami/airflow/venv/lib/python3.8/site-packages/airflow/hooks/dbapi.py", line 198, in run
    with closing(self.get_conn()) as conn:
  File "/opt/bitnami/airflow/venv/lib/python3.8/site-packages/airflow/providers/postgres/hooks/postgres.py", line 88, in get_conn
    conn = deepcopy(self.connection or self.get_connection(conn_id))
  File "/opt/bitnami/airflow/venv/lib/python3.8/site-packages/airflow/hooks/base.py", line 68, in get_connection
    conn = Connection.get_connection_from_secrets(conn_id)
  File "/opt/bitnami/airflow/venv/lib/python3.8/site-packages/airflow/models/connection.py", line 410, in get_connection_from_secrets
    raise AirflowNotFoundException(f"The conn_id `{conn_id}` isn't defined")
airflow.exceptions.AirflowNotFoundException: The conn_id `pg_dwh` isn't defined
[2022-05-15 13:22:37,471] {local_task_job.py:154} INFO - Task exited with return code 1
[2022-05-15 13:22:37,752] {local_task_job.py:264} INFO - 0 downstream tasks scheduled from follow-on schedule check
tekon92 commented 2 years ago

hi @designet ,

you need to create new connection on admin -> connection -> add new one over there and name it pg_dwh

designet commented 2 years ago

I figured that out after I sent the message, but I'm having a lot of trouble getting the connection specified correctly in the UI.

I'm using the pg credentials (unicorn_user and magical_password)

Not sure how to specify the hostname, DB name, and schema name in the Connection UI. The UI doesn't give me a specific place to enter rainbow_database and public schema

For the host I've been trying the internal Docker network IP on 5432. It responds ok, but sometimes fails authentication.

Thoughts on what to specify in the UI? Thanks for helping.

tekon92 commented 2 years ago
Screen Shot 2022-05-15 at 21 50 51

should be like that, just dont forget to fill the password and the database should be created beforehand.

tekon92 commented 2 years ago

the port should be change to other than 5432. in my case im using 5433. because 5432 already use as postgres db airflow metastore.

except you want to run metastore and dwh db at the same db. 5432 should be fine.

designet commented 2 years ago

Making progress. localhost didn't work. Here's what I see when I inspect the docker network:

image

So, I tried the 172.18.0.7 address and this works:

image

Next thing is we don't see the mysql stg_covid_data table. Is that another connection setup problem?

aee4fc0d1fcd
*** Reading local file: /opt/bitnami/airflow/logs/dag_final_project_de/run_dim_district/2022-05-15T15:07:30.831179+00:00/1.log
[2022-05-15 15:08:10,694] {taskinstance.py:1035} INFO - Dependencies all met for <TaskInstance: dag_final_project_de.run_dim_district manual__2022-05-15T15:07:30.831179+00:00 [queued]>
[2022-05-15 15:08:10,762] {taskinstance.py:1035} INFO - Dependencies all met for <TaskInstance: dag_final_project_de.run_dim_district manual__2022-05-15T15:07:30.831179+00:00 [queued]>
[2022-05-15 15:08:10,762] {taskinstance.py:1241} INFO - 
--------------------------------------------------------------------------------
[2022-05-15 15:08:10,762] {taskinstance.py:1242} INFO - Starting attempt 1 of 1
[2022-05-15 15:08:10,762] {taskinstance.py:1243} INFO - 
--------------------------------------------------------------------------------
[2022-05-15 15:08:10,880] {taskinstance.py:1262} INFO - Executing <Task(PostgresOperator): run_dim_district> on 2022-05-15 15:07:30.831179+00:00
[2022-05-15 15:08:10,938] {standard_task_runner.py:52} INFO - Started process 344 to run task
[2022-05-15 15:08:10,989] {standard_task_runner.py:76} INFO - Running: ['airflow', 'tasks', 'run', 'dag_final_project_de', 'run_dim_district', 'manual__2022-05-15T15:07:30.831179+00:00', '--job-id', '62', '--raw', '--subdir', 'DAGS_FOLDER/dag_final_project_de.py', '--cfg-path', '/tmp/tmpxhfd8oaw', '--error-file', '/tmp/tmpvc83x87o']
[2022-05-15 15:08:11,029] {standard_task_runner.py:77} INFO - Job 62: Subtask run_dim_district
[2022-05-15 15:08:11,284] {logging_mixin.py:109} INFO - Running <TaskInstance: dag_final_project_de.run_dim_district manual__2022-05-15T15:07:30.831179+00:00 [running]> on host aee4fc0d1fcd
[2022-05-15 15:08:11,538] {taskinstance.py:1427} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=rian_pauzi
AIRFLOW_CTX_DAG_ID=dag_final_project_de
AIRFLOW_CTX_TASK_ID=run_dim_district
AIRFLOW_CTX_EXECUTION_DATE=2022-05-15T15:07:30.831179+00:00
AIRFLOW_CTX_DAG_RUN_ID=manual__2022-05-15T15:07:30.831179+00:00
[2022-05-15 15:08:11,574] {base.py:70} INFO - Using connection to: id: pg_dwh. Host: 172.18.0.7, Port: 5432, Schema: rainbow_database, Login: unicorn_user, Password: ***, extra: {}
[2022-05-15 15:08:11,581] {dbapi.py:225} INFO - Running statement: TRUNCATE TABLE dim_district;

INSERT INTO dim_district
select
    distinct
    kode_kab::int as district_id,
    kode_prov::int as province_id,
    nama_kab as district_name
from stg_covid_data;, parameters: None
[2022-05-15 15:08:11,589] {taskinstance.py:1703} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/opt/bitnami/airflow/venv/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1332, in _run_raw_task
    self._execute_task_with_callbacks(context)
  File "/opt/bitnami/airflow/venv/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1458, in _execute_task_with_callbacks
    result = self._execute_task(context, self.task)
  File "/opt/bitnami/airflow/venv/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1514, in _execute_task
    result = execute_callable(context=context)
  File "/opt/bitnami/airflow/venv/lib/python3.8/site-packages/airflow/providers/postgres/operators/postgres.py", line 69, in execute
    self.hook.run(self.sql, self.autocommit, parameters=self.parameters)
  File "/opt/bitnami/airflow/venv/lib/python3.8/site-packages/airflow/hooks/dbapi.py", line 205, in run
    self._run_command(cur, sql_statement, parameters)
  File "/opt/bitnami/airflow/venv/lib/python3.8/site-packages/airflow/hooks/dbapi.py", line 229, in _run_command
    cur.execute(sql_statement)
psycopg2.errors.UndefinedTable: relation "stg_covid_data" does not exist
LINE 9: from stg_covid_data;
             ^

[2022-05-15 15:08:11,627] {taskinstance.py:1270} INFO - Marking task as FAILED. dag_id=dag_final_project_de, task_id=run_dim_district, execution_date=20220515T150730, start_date=20220515T150810, end_date=20220515T150811
[2022-05-15 15:08:11,677] {standard_task_runner.py:88} ERROR - Failed to execute job 62 for task run_dim_district
Traceback (most recent call last):
  File "/opt/bitnami/airflow/venv/lib/python3.8/site-packages/airflow/task/task_runner/standard_task_runner.py", line 85, in _start_by_fork
    args.func(args, dag=self.dag)
  File "/opt/bitnami/airflow/venv/lib/python3.8/site-packages/airflow/cli/cli_parser.py", line 48, in command
    return func(*args, **kwargs)
  File "/opt/bitnami/airflow/venv/lib/python3.8/site-packages/airflow/utils/cli.py", line 92, in wrapper
    return f(*args, **kwargs)
  File "/opt/bitnami/airflow/venv/lib/python3.8/site-packages/airflow/cli/commands/task_command.py", line 292, in task_run
    _run_task_by_selected_method(args, dag, ti)
  File "/opt/bitnami/airflow/venv/lib/python3.8/site-packages/airflow/cli/commands/task_command.py", line 107, in _run_task_by_selected_method
    _run_raw_task(args, ti)
  File "/opt/bitnami/airflow/venv/lib/python3.8/site-packages/airflow/cli/commands/task_command.py", line 180, in _run_raw_task
    ti._run_raw_task(
  File "/opt/bitnami/airflow/venv/lib/python3.8/site-packages/airflow/utils/session.py", line 70, in wrapper
    return func(*args, session=session, **kwargs)
  File "/opt/bitnami/airflow/venv/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1332, in _run_raw_task
    self._execute_task_with_callbacks(context)
  File "/opt/bitnami/airflow/venv/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1458, in _execute_task_with_callbacks
    result = self._execute_task(context, self.task)
  File "/opt/bitnami/airflow/venv/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1514, in _execute_task
    result = execute_callable(context=context)
  File "/opt/bitnami/airflow/venv/lib/python3.8/site-packages/airflow/providers/postgres/operators/postgres.py", line 69, in execute
    self.hook.run(self.sql, self.autocommit, parameters=self.parameters)
  File "/opt/bitnami/airflow/venv/lib/python3.8/site-packages/airflow/hooks/dbapi.py", line 205, in run
    self._run_command(cur, sql_statement, parameters)
  File "/opt/bitnami/airflow/venv/lib/python3.8/site-packages/airflow/hooks/dbapi.py", line 229, in _run_command
    cur.execute(sql_statement)
psycopg2.errors.UndefinedTable: relation "stg_covid_data" does not exist
LINE 9: from stg_covid_data;
             ^

[2022-05-15 15:08:11,715] {local_task_job.py:154} INFO - Task exited with return code 1
[2022-05-15 15:08:11,830] {local_task_job.py:264} INFO - 0 downstream tasks scheduled from follow-on schedule check

Again, thanks, you're helping me get through this . . .

Jim

designet commented 2 years ago

I see we also have another postgres IP:

image

Not sure which is the one to be talking to. The other one seems to be connecting, so, I'll stick with that one.

designet commented 2 years ago

This is good news . . . data in mysql stg_covid_data.

Screen Shot 2022-05-15 at 11 28 09 AM
tekon92 commented 2 years ago

I see we also have another postgres IP:

image

Not sure which is the one to be talking to. The other one seems to be connecting, so, I'll stick with that one.

yes for this project im using 2 postgres.

  1. built in for airflow metastore, in which in my opinion should not we touch at all.
  2. another on for our dwh
tekon92 commented 2 years ago

for mysql table we need to create some table before hand.

sorry for incomplete documentation or maybe you can make pull requests regarding this documentation if yours already running fine.

designet commented 2 years ago

We can use postgres for the staging tables, right? Was there a reason to add MySQL?

tekon92 commented 2 years ago

Yea of course you can use pg , I’m using MySQL just to replicate real world use case.

On Mon, 16 May 2022 at 3:07 am, designet @.***> wrote:

We can use postgres for the staging tables, right? Was there a reason to add MySQL?

— Reply to this email directly, view it on GitHub https://github.com/tekon92/airflow-docker-etl/issues/1#issuecomment-1127016312, or unsubscribe https://github.com/notifications/unsubscribe-auth/AATGR2IGKQQF3VWFVWLFERDVKFKRPANCNFSM5V7DV77A . You are receiving this because you commented.Message ID: @.***>