PrefectHQ / prefect

Prefect is a workflow orchestration framework for building resilient data pipelines in Python.
https://prefect.io
Apache License 2.0
15.78k stars 1.54k forks source link

Prefect 2 + Flask: (sqlite3.OperationalError) table flow_run has no column named state_timestamp #7720

Open meystingray opened 1 year ago

meystingray commented 1 year ago

First check

Bug summary

Python 3.8.10 Prefect 2.6.8 Flask 2.0.2

I'm calling a Prefect 2 flow using Flask, and am consistently getting errors with the sqlite3 flow_run table such as:

table flow_run has no column named state_timestamp

Reproduction

`import os
import json
from flask import Flask, request
import logging
import traceback
from prefect.orion.schemas.states import StateType

app = Flask(__name__)

#################################################
#
#  Flask routes
#
#################################################
@app.route('/', methods=['POST'])
def default_route_main():

    force_exception = location_success = False
    try:
        # Run flow
        flow_state = my_flow(return_state=True)

        force_exception = flow_state.result()
        flow_success = flow_state.type == (StateType.COMPLETED)

Error

`Traceback (most recent call last):
  File "/usr/local/lib/python3.8/dist-packages/flask/app.py", line 2073, in wsgi_app
    response = self.full_dispatch_request()
  File "/usr/local/lib/python3.8/dist-packages/flask/app.py", line 1518, in full_dispatch_request
    rv = self.handle_user_exception(e)
  File "/usr/local/lib/python3.8/dist-packages/flask/app.py", line 1516, in full_dispatch_request
    rv = self.dispatch_request()
  File "/usr/local/lib/python3.8/dist-packages/flask/app.py", line 1502, in dispatch_request
    return self.ensure_sync(self.view_functions)(**req.view_args)
  File "/main.py", line 134, in default_route_main
    onboardupdate_state = onboardupdate_flow(ou_event, atcore_event, prelog, return_state=True)
  File "/usr/local/lib/python3.8/dist-packages/prefect/flows.py", line 442, in __call__
    return enter_flow_run_engine_from_flow_call(
  File "/usr/local/lib/python3.8/dist-packages/prefect/engine.py", line 157, in enter_flow_run_engine_from_flow_call
    return anyio.run(begin_run)
  File "/usr/local/lib/python3.8/dist-packages/anyio/_core/_eventloop.py", line 70, in run
    return asynclib.run(func, *args, **backend_options)
  File "/usr/local/lib/python3.8/dist-packages/anyio/_backends/_asyncio.py", line 292, in run
    return native_run(wrapper(), debug=debug)
  File "/usr/lib/python3.8/asyncio/runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "/usr/lib/python3.8/asyncio/base_events.py", line 616, in run_until_complete
    return future.result()
  File "/usr/local/lib/python3.8/dist-packages/anyio/_backends/_asyncio.py", line 287, in wrapper
    return await func(*args)
  File "/usr/local/lib/python3.8/dist-packages/prefect/client/utilities.py", line 47, in with_injected_client
    return await fn(*args, **kwargs)
  File "/usr/local/lib/python3.8/dist-packages/prefect/engine.py", line 214, in create_then_begin_flow_run
    flow_run = await client.create_flow_run(
  File "/usr/local/lib/python3.8/dist-packages/prefect/client/orion.py", line 449, in create_flow_run
    response = await self._client.post("/flow_runs/", json=flow_run_create_json)
  File "/usr/local/lib/python3.8/dist-packages/httpx/_client.py", line 1848, in post
    return await self.request(
  File "/usr/local/lib/python3.8/dist-packages/httpx/_client.py", line 1533, in request
    return await self.send(request, auth=auth, follow_redirects=follow_redirects)
  File "/usr/local/lib/python3.8/dist-packages/prefect/client/base.py", line 160, in send
    await super().send(*args, **kwargs)
  File "/usr/local/lib/python3.8/dist-packages/httpx/_client.py", line 1620, in send
    response = await self._send_handling_auth(
  File "/usr/local/lib/python3.8/dist-packages/httpx/_client.py", line 1648, in _send_handling_auth
    response = await self._send_handling_redirects(
  File "/usr/local/lib/python3.8/dist-packages/httpx/_client.py", line 1685, in _send_handling_redirects
    response = await self._send_single_request(request)
  File "/usr/local/lib/python3.8/dist-packages/httpx/_client.py", line 1722, in _send_single_request
    response = await transport.handle_async_request(request)
  File "/usr/local/lib/python3.8/dist-packages/httpx/_transports/asgi.py", line 152, in handle_async_request
    await self.app(scope, receive, send)
  File "/usr/local/lib/python3.8/dist-packages/fastapi/applications.py", line 270, in __call__
    await super().__call__(scope, receive, send)
  File "/usr/local/lib/python3.8/dist-packages/starlette/applications.py", line 124, in __call__
    await self.middleware_stack(scope, receive, send)
  File "/usr/local/lib/python3.8/dist-packages/starlette/middleware/errors.py", line 184, in __call__
    raise exc
  File "/usr/local/lib/python3.8/dist-packages/starlette/middleware/errors.py", line 162, in __call__
    await self.app(scope, receive, _send)
  File "/usr/local/lib/python3.8/dist-packages/starlette/middleware/cors.py", line 84, in __call__
    await self.app(scope, receive, send)
  File "/usr/local/lib/python3.8/dist-packages/starlette/middleware/exceptions.py", line 79, in __call__
    raise exc
  File "/usr/local/lib/python3.8/dist-packages/starlette/middleware/exceptions.py", line 68, in __call__
    await self.app(scope, receive, sender)
  File "/usr/local/lib/python3.8/dist-packages/fastapi/middleware/asyncexitstack.py", line 21, in __call__
    raise e
  File "/usr/local/lib/python3.8/dist-packages/fastapi/middleware/asyncexitstack.py", line 18, in __call__
    await self.app(scope, receive, send)
  File "/usr/local/lib/python3.8/dist-packages/starlette/routing.py", line 706, in __call__
    await route.handle(scope, receive, send)
  File "/usr/local/lib/python3.8/dist-packages/starlette/routing.py", line 443, in handle
    await self.app(scope, receive, send)
  File "/usr/local/lib/python3.8/dist-packages/fastapi/applications.py", line 270, in __call__
    await super().__call__(scope, receive, send)
  File "/usr/local/lib/python3.8/dist-packages/starlette/applications.py", line 124, in __call__
    await self.middleware_stack(scope, receive, send)
  File "/usr/local/lib/python3.8/dist-packages/starlette/middleware/errors.py", line 184, in __call__
    raise exc
  File "/usr/local/lib/python3.8/dist-packages/starlette/middleware/errors.py", line 162, in __call__
    await self.app(scope, receive, _send)
  File "/usr/local/lib/python3.8/dist-packages/starlette/middleware/exceptions.py", line 79, in __call__
    raise exc
  File "/usr/local/lib/python3.8/dist-packages/starlette/middleware/exceptions.py", line 68, in __call__
    await self.app(scope, receive, sender)
  File "/usr/local/lib/python3.8/dist-packages/fastapi/middleware/asyncexitstack.py", line 21, in __call__
    raise e
  File "/usr/local/lib/python3.8/dist-packages/fastapi/middleware/asyncexitstack.py", line 18, in __call__
    await self.app(scope, receive, send)
  File "/usr/local/lib/python3.8/dist-packages/starlette/routing.py", line 706, in __call__
    await route.handle(scope, receive, send)
  File "/usr/local/lib/python3.8/dist-packages/starlette/routing.py", line 276, in handle
    await self.app(scope, receive, send)
  File "/usr/local/lib/python3.8/dist-packages/starlette/routing.py", line 66, in app
    response = await func(request)
  File "/usr/local/lib/python3.8/dist-packages/prefect/orion/utilities/server.py", line 101, in handle_response_scoped_depends
    response = await default_handler(request)
  File "/usr/local/lib/python3.8/dist-packages/fastapi/routing.py", line 235, in app
    raw_response = await run_endpoint_function(
  File "/usr/local/lib/python3.8/dist-packages/fastapi/routing.py", line 161, in run_endpoint_function
    return await dependant.call(**values)
  File "/usr/local/lib/python3.8/dist-packages/prefect/orion/api/flow_runs.py", line 60, in create_flow_run
    model = await models.flow_runs.create_flow_run(
  File "/usr/local/lib/python3.8/dist-packages/prefect/orion/database/dependencies.py", line 117, in async_wrapper
    return await fn(*args, **kwargs)
  File "/usr/local/lib/python3.8/dist-packages/prefect/orion/models/flow_runs.py", line 71, in create_flow_run
    await session.flush()
  File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/ext/asyncio/session.py", line 405, in flush
    await greenlet_spawn(self.sync_session.flush, objects=objects)
  File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/util/_concurrency_py3k.py", line 128, in greenlet_spawn
    result = context.switch(value)
  File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/orm/session.py", line 3444, in flush
    self._flush(objects)
  File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/orm/session.py", line 3584, in _flush
    transaction.rollback(_capture_exception=True)
  File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/util/langhelpers.py", line 70, in __exit__
    compat.raise_(
  File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/util/compat.py", line 210, in raise_
    raise exception
  File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/orm/session.py", line 3544, in _flush
    flush_context.execute()
  File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/orm/unitofwork.py", line 456, in execute
    rec.execute(self)
  File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/orm/unitofwork.py", line 630, in execute
    util.preloaded.orm_persistence.save_obj(
  File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/orm/persistence.py", line 245, in save_obj
    _emit_insert_statements(
  File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/orm/persistence.py", line 1238, in _emit_insert_statements
    result = connection._execute_20(
  File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/engine/base.py", line 1705, in _execute_20
    return meth(self, args_10style, kwargs_10style, execution_options)
  File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/sql/elements.py", line 334, in _execute_on_connection
    return connection._execute_clauseelement(
  File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/engine/base.py", line 1572, in _execute_clauseelement
    ret = self._execute_context(
  File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/engine/base.py", line 1943, in _execute_context
    self._handle_dbapi_exception(
  File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/engine/base.py", line 2124, in _handle_dbapi_exception
    util.raise_(
  File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/util/compat.py", line 210, in raise_
    raise exception
  File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/engine/base.py", line 1900, in _execute_context
    self.dialect.do_execute(
  File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/engine/default.py", line 736, in do_execute
    cursor.execute(statement, parameters)
  File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/dialects/sqlite/aiosqlite.py", line 108, in execute
    self._adapt_connection._handle_exception(error)
  File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/dialects/sqlite/aiosqlite.py", line 236, in _handle_exception
    raise error
  File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/dialects/sqlite/aiosqlite.py", line 90, in execute
    self.await_(_cursor.execute(operation, parameters))
  File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/util/_concurrency_py3k.py", line 68, in await_only
    return current.driver.switch(awaitable)
  File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/util/_concurrency_py3k.py", line 121, in greenlet_spawn
    value = await result
  File "/usr/local/lib/python3.8/dist-packages/aiosqlite/cursor.py", line 37, in execute
    await self._execute(self._cursor.execute, sql, parameters)
  File "/usr/local/lib/python3.8/dist-packages/aiosqlite/cursor.py", line 31, in _execute
    return await self._conn._execute(fn, *args, **kwargs)
  File "/usr/local/lib/python3.8/dist-packages/aiosqlite/core.py", line 129, in _execute
    return await future
  File "/usr/local/lib/python3.8/dist-packages/aiosqlite/core.py", line 102, in run
    result = function()
sqlalchemy.exc.OperationalError: (sqlite3.OperationalError) table flow_run has no column named state_timestamp`

Versions

Python 3.8.10
Prefect 2.6.8
Flask 2.0.2

Additional context

No response

meystingray commented 1 year ago

Just for context, I noticed this error previous, when the Flask app is initializing:

`Traceback (most recent call last): 
File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/engine/base.py", line 1900, in _execute_context self.dialect.do_execute(
File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/engine/default.py", line 736, in do_execute cursor.execute(statement, parameters) 
File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/dialects/sqlite/aiosqlite.py", line 108, in execute self._adapt_connection._handle_exception(error) 
File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/dialects/sqlite/aiosqlite.py", line 236, in _handle_exception raise error
File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/dialects/sqlite/aiosqlite.py", line 90, in execute self.await_(_cursor.execute(operation, parameters)) 
File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/util/_concurrency_py3k.py", line 68, in await_only return current.driver.switch(awaitable) 
File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/util/_concurrency_py3k.py", line 121, in greenlet_spawn value = await result
File "/usr/local/lib/python3.8/dist-packages/aiosqlite/cursor.py", line 37, in execute await self._execute(self._cursor.execute, sql, parameters) 
File "/usr/local/lib/python3.8/dist-packages/aiosqlite/cursor.py", line 31, in _execute return await self._conn._execute(fn, *args, **kwargs) 
File "/usr/local/lib/python3.8/dist-packages/aiosqlite/core.py", line 129, in _execute return await future 
File "/usr/local/lib/python3.8/dist-packages/aiosqlite/core.py", line 102, in run result = function() sqlite3.OperationalError: no such table: flow_run`
meystingray commented 1 year ago

OperationalError Exception raised for errors that are related to the database’s operation and not necessarily under the control of the programmer, e.g. an unexpected disconnect occurs, the data source name is not found, a transaction could not be processed, a memory allocation error occurred during processing, etc.

This error is a DBAPI Error and originates from the database driver (DBAPI), not SQLAlchemy itself.

The OperationalError is the most common (but not the only) error class used by drivers in the context of the database connection being dropped, or not being able to connect to the database. For tips on how to deal with this, see the section Dealing with Disconnects.