epoch8 / airflow-exporter

Airflow plugin to export dag and task based metrics to Prometheus.
Other
240 stars 75 forks source link

Sqlalchemy error with Airflow 2.0.1 #103

Open Jeoffreybauvin opened 3 years ago

Jeoffreybauvin commented 3 years ago

I just installed airflow-prometheus on my Airflow setup with Kubernetes :

airflow@airflow-uat-test2-web-6db768d5d5-rwjzd:/opt/airflow$ airflow version
2.0.1
airflow@airflow-uat-test2-web-6db768d5d5-rwjzd:/opt/airflow$ pip freeze | grep airflow
airflow-exporter==1.5.1
apache-airflow @ file:///opt/airflow
apache-airflow-providers-apache-cassandra==1.0.1
apache-airflow-providers-apache-hdfs==1.0.1

I'm using Mysql 5.7 (Percona server).

When I hit /admin/metrics/ :

Something bad has happened.
Please consider letting us know by creating a bug report using GitHub.

Python version: 3.8.10
Airflow version: 2.0.1
Node: airflow-uat-test2-web-6db768d5d5-rwjzd
-------------------------------------------------------------------------------
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1276, in _execute_context
    self.dialect.do_execute(
  File "/home/airflow/.local/lib/python3.8/site-packages/sqlalchemy/engine/default.py", line 608, in do_execute
    cursor.execute(statement, parameters)
  File "/home/airflow/.local/lib/python3.8/site-packages/MySQLdb/cursors.py", line 255, in execute
    self.errorhandler(self, exc, value)
  File "/home/airflow/.local/lib/python3.8/site-packages/MySQLdb/connections.py", line 50, in defaulterrorhandler
    raise errorvalue
  File "/home/airflow/.local/lib/python3.8/site-packages/MySQLdb/cursors.py", line 252, in execute
    res = self._query(query)
  File "/home/airflow/.local/lib/python3.8/site-packages/MySQLdb/cursors.py", line 378, in _query
    db.query(q)
  File "/home/airflow/.local/lib/python3.8/site-packages/MySQLdb/connections.py", line 280, in query
    _mysql.connection.query(self, query)
_mysql_exceptions.ProgrammingError: (1064, "You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near '(PARTITION BY dag_run.dag_id ORDER BY dag_run.execution_date DESC) AS `row_numbe' at line 2")

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.8/site-packages/flask/app.py", line 2447, in wsgi_app
    response = self.full_dispatch_request()
  File "/home/airflow/.local/lib/python3.8/site-packages/flask/app.py", line 1952, in full_dispatch_request
    rv = self.handle_user_exception(e)
  File "/home/airflow/.local/lib/python3.8/site-packages/flask/app.py", line 1821, in handle_user_exception
    reraise(exc_type, exc_value, tb)
  File "/home/airflow/.local/lib/python3.8/site-packages/flask/_compat.py", line 39, in reraise
    raise value
  File "/home/airflow/.local/lib/python3.8/site-packages/flask/app.py", line 1950, in full_dispatch_request
    rv = self.dispatch_request()
  File "/home/airflow/.local/lib/python3.8/site-packages/flask/app.py", line 1936, in dispatch_request
    return self.view_functions[rule.endpoint](**req.view_args)
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow_exporter/prometheus_exporter.py", line 306, in list
    return Response(generate_latest(), mimetype='text')
  File "/home/airflow/.local/lib/python3.8/site-packages/prometheus_client/exposition.py", line 106, in generate_latest
    for metric in registry.collect():
  File "/home/airflow/.local/lib/python3.8/site-packages/prometheus_client/registry.py", line 82, in collect
    for metric in collector.collect():
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow_exporter/prometheus_exporter.py", line 229, in collect
    last_dagrun_info = get_last_dagrun_info()
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow_exporter/prometheus_exporter.py", line 72, in get_last_dagrun_info
    sql_res = Session.query(
  File "/home/airflow/.local/lib/python3.8/site-packages/sqlalchemy/orm/query.py", line 3373, in all
    return list(self)
  File "/home/airflow/.local/lib/python3.8/site-packages/sqlalchemy/orm/query.py", line 3535, in __iter__
    return self._execute_and_instances(context)
  File "/home/airflow/.local/lib/python3.8/site-packages/sqlalchemy/orm/query.py", line 3560, in _execute_and_instances
    result = conn.execute(querycontext.statement, self._params)
  File "/home/airflow/.local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1011, in execute
    return meth(self, multiparams, params)
  File "/home/airflow/.local/lib/python3.8/site-packages/sqlalchemy/sql/elements.py", line 298, in _execute_on_connection
    return connection._execute_clauseelement(self, multiparams, params)
  File "/home/airflow/.local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1124, in _execute_clauseelement
    ret = self._execute_context(
  File "/home/airflow/.local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1316, in _execute_context
    self._handle_dbapi_exception(
  File "/home/airflow/.local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1510, in _handle_dbapi_exception
    util.raise_(
  File "/home/airflow/.local/lib/python3.8/site-packages/sqlalchemy/util/compat.py", line 182, in raise_
    raise exception
  File "/home/airflow/.local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1276, in _execute_context
    self.dialect.do_execute(
  File "/home/airflow/.local/lib/python3.8/site-packages/sqlalchemy/engine/default.py", line 608, in do_execute
    cursor.execute(statement, parameters)
  File "/home/airflow/.local/lib/python3.8/site-packages/MySQLdb/cursors.py", line 255, in execute
    self.errorhandler(self, exc, value)
  File "/home/airflow/.local/lib/python3.8/site-packages/MySQLdb/connections.py", line 50, in defaulterrorhandler
    raise errorvalue
  File "/home/airflow/.local/lib/python3.8/site-packages/MySQLdb/cursors.py", line 252, in execute
    res = self._query(query)
  File "/home/airflow/.local/lib/python3.8/site-packages/MySQLdb/cursors.py", line 378, in _query
    db.query(q)
  File "/home/airflow/.local/lib/python3.8/site-packages/MySQLdb/connections.py", line 280, in query
    _mysql.connection.query(self, query)
sqlalchemy.exc.ProgrammingError: (_mysql_exceptions.ProgrammingError) (1064, "You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near '(PARTITION BY dag_run.dag_id ORDER BY dag_run.execution_date DESC) AS `row_numbe' at line 2")
[SQL: SELECT anon_1.dag_id AS anon_1_dag_id, anon_1.state AS anon_1_state, anon_1.`row_number` AS anon_1_row_number, dag.owners AS dag_owners 
FROM (SELECT dag_run.dag_id AS dag_id, dag_run.state AS state, row_number() OVER (PARTITION BY dag_run.dag_id ORDER BY dag_run.execution_date DESC) AS `row_number` 
FROM dag_run) AS anon_1 INNER JOIN dag ON dag.dag_id = anon_1.dag_id 
WHERE anon_1.`row_number` = %s]
[parameters: (1,)]
(Background on this error at: http://sqlalche.me/e/13/f405)
elephantum commented 3 years ago

I see, MySQL 5.7 does not support window functions.

I think it is possible to rewrite SQL without them.

Jeoffreybauvin commented 3 years ago

I can confirm it's working with Mysql 8.

Have you any idea how to fix this query ?

brandon-fryslie commented 2 years ago

I'm also running into this. Any progress on a fix?

elephantum commented 2 years ago

@brandon-fryslie @Jeoffreybauvin can you please check if the fix here works for you? https://github.com/epoch8/airflow-exporter/pull/111

ibeltek commented 2 years ago

I have the same problem

$ pip freeze | grep airflow airflow-exporter==1.5.3 apache-airflow==2.2.4

Ooops! Something bad has happened.

Python version: 3.6.15 Airflow version: 2.2.4 Node: airflow-2-web-7d7bb76754-js5sv

Traceback (most recent call last): File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1277, in _execute_context cursor, statement, parameters, context File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/engine/default.py", line 608, in do_execute cursor.execute(statement, parameters) File "/home/airflow/.local/lib/python3.6/site-packages/MySQLdb/cursors.py", line 206, in execute res = self._query(query) File "/home/airflow/.local/lib/python3.6/site-packages/MySQLdb/cursors.py", line 319, in _query db.query(q) File "/home/airflow/.local/lib/python3.6/site-packages/MySQLdb/connections.py", line 254, in query _mysql.connection.query(self, query) MySQLdb._exceptions.ProgrammingError: (1064, "You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near '(PARTITION BY dag_run.dag_id ORDER BY dag_run.execution_date DESC) AS `row_numbe' at line 2")

The above exception was the direct cause of the following exception:

Traceback (most recent call last): File "/home/airflow/.local/lib/python3.6/site-packages/flask/app.py", line 2447, in wsgi_app response = self.full_dispatch_request() File "/home/airflow/.local/lib/python3.6/site-packages/flask/app.py", line 1952, in full_dispatch_request rv = self.handle_user_exception(e) File "/home/airflow/.local/lib/python3.6/site-packages/flask/app.py", line 1821, in handle_user_exception reraise(exc_type, exc_value, tb) File "/home/airflow/.local/lib/python3.6/site-packages/flask/_compat.py", line 39, in reraise raise value File "/home/airflow/.local/lib/python3.6/site-packages/flask/app.py", line 1950, in full_dispatch_request rv = self.dispatch_request() File "/home/airflow/.local/lib/python3.6/site-packages/flask/app.py", line 1936, in dispatch_request return self.view_functionsrule.endpoint File "/home/airflow/.local/lib/python3.6/site-packages/airflow_exporter/prometheus_exporter.py", line 333, in list return Response(generate_latest(), mimetype='text') File "/home/airflow/.local/lib/python3.6/site-packages/prometheus_client/exposition.py", line 171, in generate_latest for metric in registry.collect(): File "/home/airflow/.local/lib/python3.6/site-packages/prometheus_client/registry.py", line 83, in collect yield from collector.collect() File "/home/airflow/.local/lib/python3.6/site-packages/airflow_exporter/prometheus_exporter.py", line 256, in collect last_dagrun_info = get_last_dagrun_info() File "/home/airflow/.local/lib/python3.6/site-packages/airflow_exporter/prometheus_exporter.py", line 85, in get_last_dagrun_info .join(SerializedDagModel, SerializedDagModel.dag_id == last_dagrun_query.c.dag_id) File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/orm/query.py", line 3373, in all return list(self) File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/orm/query.py", line 3535, in iter return self._execute_and_instances(context) File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/orm/query.py", line 3560, in _execute_and_instances result = conn.execute(querycontext.statement, self._params) File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1011, in execute return meth(self, multiparams, params) File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/sql/elements.py", line 298, in _execute_on_connection return connection._execute_clauseelement(self, multiparams, params) File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1130, in _execute_clauseelement distilled_params, File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1317, in _execute_context e, statement, parameters, cursor, context File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1511, in _handle_dbapi_exception sqlalchemy_exception, with_traceback=excinfo[2], from=e File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/util/compat.py", line 182, in raise_ raise exception File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1277, in _execute_context cursor, statement, parameters, context File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/engine/default.py", line 608, in do_execute cursor.execute(statement, parameters) File "/home/airflow/.local/lib/python3.6/site-packages/MySQLdb/cursors.py", line 206, in execute res = self._query(query) File "/home/airflow/.local/lib/python3.6/site-packages/MySQLdb/cursors.py", line 319, in _query db.query(q) File "/home/airflow/.local/lib/python3.6/site-packages/MySQLdb/connections.py", line 254, in query _mysql.connection.query(self, query) sqlalchemy.exc.ProgrammingError: (MySQLdb._exceptions.ProgrammingError) (1064, "You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near '(PARTITION BY dag_run.dag_id ORDER BY dag_run.execution_date DESC) AS row_numbe' at line 2") [SQL: SELECT anon_1.dag_id AS anon_1_dag_id, anon_1.state AS anon_1_state, anon_1.row_numberAS anon_1_row_number, dag.owners AS dag_owners FROM (SELECT dag_run.dag_id AS dag_id, dag_run.state AS state, row_number() OVER (PARTITION BY dag_run.dag_id ORDER BY dag_run.execution_date DESC) ASrow_number FROM dag_run) AS anon_1 INNER JOIN dag ON dag.dag_id = anon_1.dag_id INNER JOIN serialized_dag ON serialized_dag.dag_id = anon_1.dag_id WHERE anon_1.row_number` = %s] [parameters: (1,)] (Background on this error at: http://sqlalche.me/e/13/f405)