PrefectHQ / prefect

Prefect is a workflow orchestration framework for building resilient data pipelines in Python.
https://prefect.io
Apache License 2.0
17.6k stars 1.65k forks source link

3.x Task Runs went missing from 3.0.0rc18 to 3.0.0rc19 #15153

Open tothandor opened 2 months ago

tothandor commented 2 months ago

Bug summary

Up until 3.0.0rc18 task run completions were registered correctly with a local Prefect server, but after that with 3.0.0rc19 and 3.0.0rc20 the list of task runs remain empty even after all tasks were completed successfully. So, nothing will be listed on a dashboard URL like that: http://localhost:4200/runs/flow-run/a161f610-630f-43a5-8d22-93845c4659a2?tab=Task+Runs

I am still investigating the issue, but downgrading helped to solved it for me.

The flow that I use loads an sqlalchemy-connector block in the first task, and pass the engine to downstream tasks to be used for issuing queries. Maybe this is not a viable method, so I am planning to check this with a simplified flow with dummy tasks.

Version info (prefect version output)

Version:             3.0.0rc18
API version:         0.8.4
Python version:      3.9.19
Git commit:          99428dea
Built:               Thu, Aug 15, 2024 1:17 PM
OS/Arch:             linux/x86_64
Profile:             ephemeral
Server type:         server
Pydantic version:    2.8.2
Integrations:
  prefect-sqlalchemy: 0.5.0rc2

Additional context

I get a lot errors like that from the worker:

12:45:07.687 | ERROR   | GlobalEventLoopThread | prefect._internal.concurrency - Service 'EventsWorker' failed with 1 pending items.
cicdw commented 2 months ago

Hi @tothandor, thanks for the bug report! Could you confirm two things for me:

Thank you!

tothandor commented 2 months ago

Hi @cicdw , Yes, I do start the server with prefect server start, except that I add --hostname $HOSTNAME option, so the server is accessible in the local network. The server version is 3.0.0rc20, according to http://{prefecthost}:4200/settings.

I was trying to reproduce the missing task runs with a simplified flow, but all I could achieve was an empty "run graph". prefect3 0 0rc20_empty_run_graph

This is the flow code so far.

# dbflow/dbflow_class.py
import logging                                                                                                                                                                                     [5/1835]

import pandas as pd
import sqlalchemy as sqla

from prefect import flow, task
from prefect_sqlalchemy import SqlAlchemyConnector

log = logging.getLogger()

class DBFlow:
    @task
    def dbconnect(self):
        log.info("Connecting to database")
        self.db = SqlAlchemyConnector.load("dbconn").get_engine()

    @staticmethod
    @task
    def q1(db):
        log.info(f"QUERY 1")
        df = pd.read_sql('select * from cities limit 10', db)
        return df

    @task
    def q2(self):
        log.info(f"QUERY 2")
        df = pd.read_sql('select * from forecast limit 10', self.db)
        return df

    @task
    def l2(self, df: pd.DataFrame):
        log.info(f"LOADING TO DATABASE")
        df.to_sql('forecast_test', self.db, schema='test', if_exists='replace')

    @flow(name='DBFlow')
    def run(self=None):
        self.dbconnect()

        self.cities_ = self.q1(self.db)
        self.forecast_ = self.q2()
        self.l2(self.forecast_)

dbflow = DBFlow()
dbflow_flow = dbflow.run
# prefect.yaml
name: prefect3
prefect-version: 3.0.0rc20

# build section allows you to manage and build docker images
build:

# push section allows you to manage if and how this project is uploaded to remote locations
push:

# pull section allows you to provide instructions for cloning this project in remote locations
pull:
- prefect.deployments.steps.set_working_directory:
    directory: /home/andor.toth/work/prefect3

# the deployments section allows you to provide configuration for deploying flows
deployments:
- name: dbflow
  version:
  tags: []
  concurrency_limit: 1
  description:
  entrypoint: dbflow.dbflow_class:dbflow_flow
  parameters: {}
  work_pool:
    name: default
    work_queue_name:
    job_variables: {}
  enforce_parameter_schema: true
  schedules:
  - cron: '*/5 * * * *'
    timezone: Europe/Budapest
    day_or: true
    active: true
    max_active_runs:
    catchup: false
tothandor commented 2 months ago

I have retried the upgrade from 3.0.0rc18 to 3.0.0rc20 with particular attention to every steps, including prefect server database upgrade and changes in pip packages, but the result is the same, task runs are not registered somehow, altough the flow exits cleanly.

image

image

Maybe it's not important, but I get a lot of these errors. Both in rc18 and rc20. ERROR | GlobalEventLoopThread | prefect._internal.concurrency - Service 'EventsWorker' failed with 1 pending items.

And in most cases when a flow fails, I get this stack trace.

...
  File "/srv/prefect/venv/lib64/python3.9/site-packages/prefect/utilities/engine.py", line 736, in emit_task_run_state_change_event
    return emit_event(
  File "/srv/prefect/venv/lib64/python3.9/site-packages/prefect/events/utilities.py", line 84, in emit_event
    worker_instance.send(event_obj)
  File "/srv/prefect/venv/lib64/python3.9/site-packages/prefect/_internal/concurrency/services.py", line 108, in send
    raise RuntimeError("Cannot put items in a stopped service instance.")

If I revert back to rc18, then everything is fine again. I have also tried resetting the database, but makes no difference.

tothandor commented 2 months ago

I have tried some intricate dummy flows, but all I could achieve is an incomplete task run graph. But the task runs appeared correctly in the list.

# dbflow/forkmerge.py
import logging                                                                                                                                                                                    

from prefect import flow, task
# from prefect_sqlalchemy import SqlAlchemyConnector

log = logging.getLogger()

class ForkMergeFlow:
    @task
    def dbconnect(self):
        log.info("Connecting to database")
        self.db = lambda: 'dummyresource'
        # self.db = SqlAlchemyConnector.load("dbconn").get_engine()

    @staticmethod
    @task
    def q1(db):
        log.info(f"QUERY 1")
        return [1,2,3,4]

    @task
    def q2(self):
        log.info(f"QUERY 2")
        return list('abcd')

    @classmethod
    @task
    def q3(cls, db):
        log.info(f"QUERY 3")
        return dict(c=3, d=4, e=5, f=6)

    @task
    def p11(self, data): # result #1 - processor #1
        log.info('PROCESS 1 RESULT 1')
        return sorted(data)

    @task
    def p21(self, data): # result #2 - processor #1
        log.info('PROCESS 1 RESULT 2')
        return list(enumerate(data))

    @task
    def p22(self, data): # result #2 - processor #2
        log.info('PROCESS 2 RESULT 2')
        return list(reversed(data))

    @task                                                                                                                                                                                         [15/1893]
    def m12(self, r1, r2): # merge process result #1 & #2
        log.info('MERGE RESULT 1 & 2')
        return list(zip(r1, r2))

    @task
    def l1(self, data):
        log.info('LOAD 1')
        print(data)

    @task
    def l2(self, data):
        log.info(f"LOAD 2")
        print(data)

    @task
    def l22(self, data):
        log.info(f"LOAD 2 PROCESSED RESULT 2")
        print(data)

    @task
    def lm12(self, data):
        log.info(f"LOADING MERGED DATA")
        print(data)

    @flow(name='ForkMerge')
    def run(self=None):
        self.dbconnect()

        # raw results
        self.r1_ = self.q1(self.db)
        self.l1(self.r1_)

        self.r2_ = self.q2()
        self.l2(self.r2_)

        self.r2_ = self.q2() # once more
        self.rp21_ = self.p21(self.r2_)
        self.rp22_ = self.p22(self.r2_)
        self.l22(self.rp22_)

        self.r3_ = self.q3(self.db)

        # processed results
        self.rp11_ = self.p11(self.r1_)
        self.l1(self.rp11_)

        # merged results
        self.rm12_ = self.m12(self.rp11_, self.rp21_)

        # load results
        self.lm12(self.rm12_)

fmflow = ForkMergeFlow()
fmflow_run = fmflow.run

The entrypoint for the deployment is dbflow.forkmerge:fmflow_run

jakekaplan commented 2 months ago

@tothandor thanks for this! A few more questions to help diagnose:

3.0.0rc19 changed how task run data is sent to the server. It's now sent via websocket with the rest Event data. Is there anything in your setup that would block a ws connection from being created?

tothandor commented 2 months ago

Hi @jakekaplan ,

I have added PREFECT_DEBUG_MODE=True to the configuration (prefect config set PREFECT_DEBUG_MODE=True) and restarted the server and the worker. Both became very verbose.

The 'EventWorker' failed error is gone, and I get this stack trace instead.

10:49:14.506 | DEBUG   | APILogWorkerThread | prefect._internal.concurrency - Running call get(timeout=1.9999814350157976) in thread 'APILogWorkerThread'
10:49:14.506 | DEBUG   | APILogWorkerThread | prefect._internal.concurrency - <WatcherThreadCancelScope, name='get' RUNNING, runtime=0.00> entered
10:49:16.507 | DEBUG   | APILogWorkerThread | prefect._internal.concurrency - <WatcherThreadCancelScope, name='get' COMPLETED, runtime=2.00> exited
10:49:16.507 | DEBUG   | APILogWorkerThread | prefect._internal.concurrency - Encountered exception in call get(<dropped>)
Traceback (most recent call last):
  File "/home/andor.toth/work/prefect3/venv/lib64/python3.12/site-packages/prefect/_internal/concurrency/calls.py", line 346, in _run_sync
    result = self.fn(*self.args, **self.kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/lib64/python3.12/queue.py", line 179, in get
    raise Empty
_queue.Empty

The server show some sqlite3 exceptions:

sqlalchemy.exc.OperationalError: (sqlite3.OperationalError) near ",": syntax error
[SQL: WITH intervals AS
(
                -- recursive CTE to mimic the behavior of `generate_series`,
                -- which is only available as a compiled extension
                WITH RECURSIVE intervals(interval_start, interval_end, counter) AS (
                    VALUES(
                        strftime('%Y-%m-%d %H:%M:%f000', :start_time),
                        strftime('%Y-%m-%d %H:%M:%f000', :start_time, :interval),
                        1
                        )

                    UNION ALL

                    SELECT interval_end, strftime('%Y-%m-%d %H:%M:%f000', interval_end, :interval), counter + 1
                    FROM intervals
                    -- subtract interval because recursive where clauses are effectively evaluated on a t-1 lag
                    WHERE
                        interval_start < strftime('%Y-%m-%d %H:%M:%f000', :end_time, :negative_interval)
                        -- don't compute more than 500 intervals
                        AND counter < 500
                )
 SELECT counts.interval_start, counts.interval_end, coalesce(json_group_array(json(counts.state_agg)) FILTER (WHERE counts.state_agg IS NOT NULL), '[]') AS states
FROM (SELECT intervals.interval_start AS interval_start, intervals.interval_end AS interval_end, CASE WHEN (count(runs.id) = :count_1) THEN NULL ELSE json_object(:json_object_1, runs.state_type, :json_object_2, runs.state_name, :json_object_3, count(runs.id), :json_objec
t_4, sum(max(:max_1, CAST(STRFTIME('%s', runs.estimated_run_time) AS INTEGER))), :json_object_5, sum(max(:max_2, CAST(STRFTIME('%s', runs.estimated_start_time_delta) AS INTEGER)))) END AS state_agg
FROM intervals LEFT OUTER JOIN (SELECT task_run.id AS id, task_run.expected_start_time AS expected_start_time, (SELECT CASE WHEN (task_run.state_type = :state_type_1) THEN strftime(:strftime_1, (julianday(task_run.total_run_time) + julianday(strftime(:strftime_2, (:julia
nday_1 + julianday(strftime('%Y-%m-%d %H:%M:%f000', 'now'))) - julianday(task_run.state_timestamp)))) - :param_1) ELSE task_run.total_run_time END AS anon_1) AS estimated_run_time, CASE WHEN (task_run.start_time > task_run.expected_start_time) THEN strftime(:strftime_3,
(:julianday_2 + julianday(task_run.start_time)) - julianday(task_run.expected_start_time)) WHEN (task_run.start_time IS NULL AND (task_run.state_type NOT IN (:state_type_2_1, :state_type_2_2, :state_type_2_3, :state_type_2_4)) AND task_run.expected_start_time < strftime(
'%Y-%m-%d %H:%M:%f000', 'now')) THEN strftime(:strftime_4, (:julianday_3 + julianday(strftime('%Y-%m-%d %H:%M:%f000', 'now'))) - julianday(task_run.expected_start_time)) ELSE :param_2 END AS estimated_start_time_delta, task_run.state_type AS state_type, task_run.state_na
me AS state_name
FROM task_run
WHERE task_run.start_time <= :start_time_1 AND task_run.start_time >= :start_time_2 AND (EXISTS (SELECT flow_run.flow_id, flow_run.deployment_id, flow_run.work_queue_name, flow_run.flow_version, flow_run.deployment_version, flow_run.parameters, flow_run.idempotency_key,
flow_run.context, flow_run.empirical_policy, flow_run.tags, flow_run.created_by, flow_run.infrastructure_pid, flow_run.job_variables, flow_run.infrastructure_document_id, flow_run.parent_task_run_id, flow_run.auto_scheduled, flow_run.state_id, flow_run.work_queue_id, flo
w_run.name, flow_run.state_type, flow_run.state_name, flow_run.state_timestamp, flow_run.run_count, flow_run.expected_start_time, flow_run.next_scheduled_start_time, flow_run.start_time, flow_run.end_time, flow_run.total_run_time, flow_run.id, flow_run.created, flow_run.
updated
FROM flow_run
WHERE flow_run.id = task_run.flow_run_id))) AS runs ON runs.expected_start_time >= intervals.interval_start AND runs.expected_start_time < intervals.interval_end GROUP BY intervals.interval_start, intervals.interval_end, runs.state_type, runs.state_name) AS counts GROUP
BY counts.interval_start, counts.interval_end ORDER BY counts.interval_start
 LIMIT :param_3 OFFSET :param_4]
[parameters: {'count_1': 0, 'json_object_1': 'state_type', 'json_object_2': 'state_name', 'json_object_3': 'count_runs', 'json_object_4': 'sum_estimated_run_time', 'max_1': 0, 'json_object_5': 'sum_estimated_lateness', 'max_2': 0, 'start_time': '2024-09-02 08:48:35+00:00
', 'interval': '+4320.0 seconds', 'end_time': '2024-09-03 08:48:35+00:00', 'negative_interval': '-4320.0 seconds', 'state_type_1': 'RUNNING', 'strftime_1': '%Y-%m-%d %H:%M:%f000', 'strftime_2': '%Y-%m-%d %H:%M:%f000', 'julianday_1': 2440587.5, 'param_1': 2440587.5, 'strf
time_3': '%Y-%m-%d %H:%M:%f000', 'julianday_2': 2440587.5, 'strftime_4': '%Y-%m-%d %H:%M:%f000', 'julianday_3': 2440587.5, 'param_2': '1970-01-01 00:00:00.000000', 'start_time_1': '2024-09-03 08:48:35.000000', 'start_time_2': '2024-09-02 08:48:35.000000', 'param_3': 500,
 'param_4': 0, 'state_type_2_1': 'COMPLETED', 'state_type_2_2': 'CRASHED', 'state_type_2_3': 'CANCELLED', 'state_type_2_4': 'FAILED'}]
(Background on this error at: https://sqlalche.me/e/20/e3q8)

And there's an other regarding database locking.

...
  File "/home/andor.toth/work/prefect3/venv/lib64/python3.12/site-packages/prefect/server/api/concurrency_limits_v2.py", line 256, in bulk_decrement_active_slots                                                                                                    [831/1962]
    await models.concurrency_limits_v2.bulk_decrement_active_slots(
  File "/home/andor.toth/work/prefect3/venv/lib64/python3.12/site-packages/prefect/server/database/dependencies.py", line 168, in async_wrapper
    return await func(db, *args, **kwargs)  # type: ignore
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/andor.toth/work/prefect3/venv/lib64/python3.12/site-packages/prefect/server/models/concurrency_limits_v2.py", line 296, in bulk_decrement_active_slots
    result = await session.execute(query)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/andor.toth/work/prefect3/venv/lib64/python3.12/site-packages/sqlalchemy/ext/asyncio/session.py", line 461, in execute
    result = await greenlet_spawn(
             ^^^^^^^^^^^^^^^^^^^^^
  File "/home/andor.toth/work/prefect3/venv/lib64/python3.12/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 201, in greenlet_spawn
    result = context.throw(*sys.exc_info())
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/andor.toth/work/prefect3/venv/lib64/python3.12/site-packages/sqlalchemy/orm/session.py", line 2362, in execute
    return self._execute_internal(
           ^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/andor.toth/work/prefect3/venv/lib64/python3.12/site-packages/sqlalchemy/orm/session.py", line 2247, in _execute_internal
    result: Result[Any] = compile_state_cls.orm_execute_statement(
                          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/andor.toth/work/prefect3/venv/lib64/python3.12/site-packages/sqlalchemy/orm/bulk_persistence.py", line 1649, in orm_execute_statement
    return super().orm_execute_statement(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/andor.toth/work/prefect3/venv/lib64/python3.12/site-packages/sqlalchemy/orm/context.py", line 293, in orm_execute_statement
    result = conn.execute(
             ^^^^^^^^^^^^^
  File "/home/andor.toth/work/prefect3/venv/lib64/python3.12/site-packages/sqlalchemy/engine/base.py", line 1418, in execute
    return meth(
           ^^^^^
  File "/home/andor.toth/work/prefect3/venv/lib64/python3.12/site-packages/sqlalchemy/sql/elements.py", line 515, in _execute_on_connection
    return connection._execute_clauseelement(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/andor.toth/work/prefect3/venv/lib64/python3.12/site-packages/sqlalchemy/engine/base.py", line 1640, in _execute_clauseelement
    ret = self._execute_context(
          ^^^^^^^^^^^^^^^^^^^^^^
  File "/home/andor.toth/work/prefect3/venv/lib64/python3.12/site-packages/sqlalchemy/engine/base.py", line 1846, in _execute_context
    return self._exec_single_context(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/andor.toth/work/prefect3/venv/lib64/python3.12/site-packages/sqlalchemy/engine/base.py", line 1986, in _exec_single_context
    self._handle_dbapi_exception(
  File "/home/andor.toth/work/prefect3/venv/lib64/python3.12/site-packages/sqlalchemy/engine/base.py", line 2355, in _handle_dbapi_exception
    raise sqlalchemy_exception.with_traceback(exc_info[2]) from e
  File "/home/andor.toth/work/prefect3/venv/lib64/python3.12/site-packages/sqlalchemy/engine/base.py", line 1967, in _exec_single_context
    self.dialect.do_execute(
  File "/home/andor.toth/work/prefect3/venv/lib64/python3.12/site-packages/sqlalchemy/engine/default.py", line 941, in do_execute
    cursor.execute(statement, parameters)
  File "/home/andor.toth/work/prefect3/venv/lib64/python3.12/site-packages/sqlalchemy/dialects/sqlite/aiosqlite.py", line 147, in execute
    self._adapt_connection._handle_exception(error)
  File "/home/andor.toth/work/prefect3/venv/lib64/python3.12/site-packages/sqlalchemy/dialects/sqlite/aiosqlite.py", line 298, in _handle_exception
    raise error
  File "/home/andor.toth/work/prefect3/venv/lib64/python3.12/site-packages/sqlalchemy/dialects/sqlite/aiosqlite.py", line 129, in execute
    self.await_(_cursor.execute(operation, parameters))
  File "/home/andor.toth/work/prefect3/venv/lib64/python3.12/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 132, in await_only
    return current.parent.switch(awaitable)  # type: ignore[no-any-return,attr-defined] # noqa: E501
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/andor.toth/work/prefect3/venv/lib64/python3.12/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 196, in greenlet_spawn
    value = await result
            ^^^^^^^^^^^^
  File "/home/andor.toth/work/prefect3/venv/lib64/python3.12/site-packages/aiosqlite/cursor.py", line 48, in execute
    await self._execute(self._cursor.execute, sql, parameters)
  File "/home/andor.toth/work/prefect3/venv/lib64/python3.12/site-packages/aiosqlite/cursor.py", line 40, in _execute
    return await self._conn._execute(fn, *args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/andor.toth/work/prefect3/venv/lib64/python3.12/site-packages/aiosqlite/core.py", line 132, in _execute
    return await future
           ^^^^^^^^^^^^
  File "/home/andor.toth/work/prefect3/venv/lib64/python3.12/site-packages/aiosqlite/core.py", line 115, in run
    result = function()
             ^^^^^^^^^^
sqlalchemy.exc.OperationalError: (sqlite3.OperationalError) database is locked
[SQL: UPDATE concurrency_limit_v2 SET active_slots=CASE WHEN (CASE WHEN (concurrency_limit_v2.active_slots - floor(concurrency_limit_v2.slot_decay_per_second * (julianday(:julianday_1) - julianday(concurrency_limit_v2.updated)) * :param_1) < :param_2) THEN :param_3 ELSE
concurrency_limit_v2.active_slots - floor(concurrency_limit_v2.slot_decay_per_second * (julianday(:julianday_1) - julianday(concurrency_limit_v2.updated)) * :param_1) END - :param_4 < :param_5) THEN :param_6 ELSE CASE WHEN (concurrency_limit_v2.active_slots - floor(concu
rrency_limit_v2.slot_decay_per_second * (julianday(:julianday_2) - julianday(concurrency_limit_v2.updated)) * :param_7) < :param_8) THEN :param_9 ELSE concurrency_limit_v2.active_slots - floor(concurrency_limit_v2.slot_decay_per_second * (julianday(:julianday_2) - julian
day(concurrency_limit_v2.updated)) * :param_7) END - :param_10 END, denied_slots=CASE WHEN (concurrency_limit_v2.denied_slots - floor(CASE WHEN (concurrency_limit_v2.slot_decay_per_second > :slot_decay_per_second_1) THEN concurrency_limit_v2.slot_decay_per_second ELSE :p
aram_11 / (CAST(concurrency_limit_v2.avg_slot_occupancy_seconds AS FLOAT) + 0.0) END * (julianday(:julianday_3) - julianday(concurrency_limit_v2.updated)) * :param_12) < :param_13) THEN :param_14 ELSE concurrency_limit_v2.denied_slots - floor(CASE WHEN (concurrency_limit
_v2.slot_decay_per_second > :slot_decay_per_second_1) THEN concurrency_limit_v2.slot_decay_per_second ELSE :param_11 / (CAST(concurrency_limit_v2.avg_slot_occupancy_seconds AS FLOAT) + 0.0) END * (julianday(:julianday_3) - julianday(concurrency_limit_v2.updated)) * :para
m_12) END, avg_slot_occupancy_seconds=((concurrency_limit_v2.avg_slot_occupancy_seconds + :param_15 / ((concurrency_limit_v2."limit" * :limit_1) + 0.0)) - concurrency_limit_v2.avg_slot_occupancy_seconds / ((concurrency_limit_v2."limit" * :limit_2) + 0.0)), updated=strfti
me('%Y-%m-%d %H:%M:%f000', 'now') WHERE concurrency_limit_v2.id IN (SELECT 1 FROM (SELECT 1) WHERE 1!=1) AND concurrency_limit_v2.active = 1]
[parameters: {'julianday_1': 'now', 'param_1': 86400.0, 'param_2': 0, 'param_3': 0, 'param_4': 1, 'param_5': 0, 'param_6': 0, 'julianday_2': 'now', 'param_7': 86400.0, 'param_8': 0, 'param_9': 0, 'param_10': 1, 'slot_decay_per_second_1': 0.0, 'param_11': 1.0, 'julianday_
3': 'now', 'param_12': 86400.0, 'param_13': 0, 'param_14': 0, 'param_15': 4.316197, 'limit_1': 2, 'limit_2': 2}]
(Background on this error at: https://sqlalche.me/e/20/e3q8)

EventsWorker errors were present in earlier versions too.

The API URL is set in the profile config to the following value: PREFECT_API_URL='http://t84:4200/api' t84 is a the hostname, and it resolves correctly, TCP port 4200 is accessible (nc -vz t84 4200).

I hope this helps!

jakekaplan commented 2 months ago

Thank you @tothandor that is helpful! The EventsWorker errors being present in prior versions indicates to me that you haven't been able to send prefect Events to your server for awhile. You're not seeing task runs starting in 3.0.0rc19 because task runs started to utilize the events connection to send data. The underlying issues seems to not being able to establish this websocket connection.

Heres a simplified test you can run:

import asyncio

from prefect.events.clients import PrefectEventsClient

async def main():
    async with PrefectEventsClient() as client:
        print(f"Connected to: {client._events_socket_url}")
        pong = await client._websocket.ping()
        pong_time = await pong
        print(f"Response received in: {pong_time}")

if __name__ == '__main__':
    asyncio.run(main())

when connecting successfully gives me:

Connected to: ws://127.0.0.1:4200/api/events/in
Response received in: 0.0001704159949440509

Is your server traffic whitelisted or proxied in some way? I've seen instances in the past where http:// urls are setup to forward/be accepted and ws:// is not.

lucasdepetrisd commented 2 months ago

Hi!

I’m encountering the same issue after upgrading from Prefect 2.x to the new 3.0.0. My setup includes PREFECT_API_URL='http://192.168.1.13:442/api' and PREFECT_HOME='C:\Users\tareas\.prefect'.

Even though the flows execute successfully, task runs are not always registered in the dashboard. When running a simple flow with PREFECT_DEBUG_MODE=True, I observed that while tasks complete successfully, an error occurs when Prefect tries to access the stored result:

From the UI: image

From the terminal: image

The error ValueError: Path C:\Users\tareas\.prefect\storage\2845aef2c337538faed6bb7ab7c99317 does not exist. is the same task ID as shown in the UI image, so I suspect there might be a connection. However, the strange part is that this error still occurs even in runs where the tasks are visible in the dashboard.

It’s strange because this issue doesn’t occur consistently; in some runs, tasks are visible in the dashboard even though the ValueError appears anyway. Any insights on resolving this would be greatly appreciated.

Additionally, the function provided results in the following: Connected to: ws://192.168.1.13:442/api/events/in Response received in: 0.000411600005463697

Thanks!

tothandor commented 2 months ago

Hi @jakekaplan , I have this virtual machine (VM) on a local network without any obstacles or proxies. No firewalls or anything in between. The client connection test runs without a problem.

Connected to: ws://t84:4200/api/events/in
Response received in: 0.00022578425705432892

And since I have upgraded to 3.0.0, but it doesn't make a difference.

cicdw commented 2 months ago

Hey @lucasdepetrisd -- the ValueError is unrelated; we will suppress this in a release soon. Tasks always look up their configured result persistence location, and if a result is found the task is cached, but when there is no result the task executes normally. This error is being raised by the internal machinery of Prefect, but is captured so shouldn't bubble up to users like this. Apologies for the confusion on that point!

jakekaplan commented 2 months ago

hey @tothandor thanks trying out the test script. Unfortunately I'm not able to reproduce what you're seeing here so I may need some more help on your end investigating! Let me give a brief overview of what's happening and hopefully we can try and piece things together.

3.0.0rc19 introduced client side task run orchestration. Previously every time a task run changed state (going to RUNNING etc.) it would make a POST request to the server "asking permission" if it could go to that state.

After 3.0.0rc19 any determination about a task run moving to the next state, is done totally on the client. The client will still record those state changes after the fact, so the underlying information about your task run still gets recorded on the server. It does that by doing the following (you'll need to have PREFECT_DEBUG_MODE=1 to see most of these logs):

Client Side :

Server Side:

I know this is a lot of information. But I'm hoping you may be able to run a no-op task and track the events from the client -> server and following the above see if you notice anything out of the ordinary. Happy to answer any other questions as well, thanks in advance for your assistance in working through this!

lucasdepetrisd commented 2 months ago

Thanks for the clarification @cicdw! It makes sense that the ValueError is unrelated.

lucasdepetrisd commented 2 months ago

Hey @jakekaplan, thanks for the explanation! I ran a basic flow with a no-op task and checked the database and logs as you suggested.

Here’s what I found in the logs about the task:

01:32:13.692 | DEBUG   | MainThread   | prefect._internal.concurrency - Service <prefect.events.worker.EventsWorker object at 0x000001AE78587F10> enqueuing item Event(occurred=DateTime(2024, 9, 5, 4, 32, 13, 692362, tzinfo=Timezone('UTC')), event='prefect.task-run.Running', resource=Resource(root={'prefect.resource.id': 'prefect.task-run.c0734825-cabd-4a79-a77e-4c4e462f29f7', 'prefect.resource.name': 'my_task-4a4', 'prefect.state-message': '', 'prefect.state-name': 'Running', 'prefect.state-timestamp': '2024-09-05T04:32:13.692362+00:00', 'prefect.state-type': 'RUNNING', 'prefect.orchestration': 'client'}), related=[], payload={'intended': {'from': 'PENDING', 'to': 'RUNNING'}, 'initial_state': {'type': 'PENDING', 'name': 'Pending', 'message': '', 'state_details': {}}, 'validated_state': {'type': 'RUNNING', 'name': 'Running', 'message': '', 'state_details': {}, 'data': None}, 'task_run': {'name': 'my_task-4a4', 'task_key': 'my_task-2d616e3a', 'dynamic_key': '4a42ee4d-1709-47c3-a9c5-4d32cc73cc77', 'empirical_policy': {'max_retries': 0, 'retry_delay_seconds': 0.0, 'retries': 0, 'retry_delay': 0}, 'tags': [], 'task_inputs': {}, 'run_count': 1, 'flow_run_run_count': 1, 'expected_start_time': '2024-09-05T04:32:13.641478Z', 'start_time': '2024-09-05T04:32:13.692362Z', 'total_run_time': 0.0}}, id=UUID('a3e96151-50da-463e-bd8e-e2b64511c8cd'), follows=UUID('359b6948-718e-4f79-9f10-6ba8393ca689'))

01:32:13.748 | DEBUG   | EventsWorkerThread | prefect._internal.concurrency - Running call get() in thread 'EventsWorkerThread'

01:32:13.839 | INFO    | Task 'my_task' - Finished in state Completed(message=None, type=COMPLETED, result=PersistedResult(type='reference', serializer_type='pickle', storage_key='C:\\Users\\tareas\\.prefect\\storage\\7dfc6a8d5f5969a6f2cbc5c770e5d4e9', storage_block_id=None, expiration=None, serialize_to_none=True))

It seems it's all as expected on the client side and no signs of an error except for that ValueError @cicdw explained before.

However, on the server side, I queried the database and found:

For SELECT * FROM events WHERE resource_id = 'prefect.task-run.c0734825-cabd-4a79-a77e-4c4e462f29f7';, returns data for the three states of the flow and for the resource column of the pending state it contains:

{
  "prefect.resource.id": "prefect.task-run.c0734825-cabd-4a79-a77e-4c4e462f29f7",
  "prefect.resource.name": "my_task-4a4",
  "prefect.state-message": "",
  "prefect.state-name": "Pending",
  "prefect.state-timestamp": "2024-09-05T04:32:13.641478+00:00",
  "prefect.state-type": "PENDING",
  "prefect.orchestration": "client"
}

image

But for SELECT * FROM task_run_state WHERE task_run_id = 'c0734825-cabd-4a79-a77e-4c4e462f29f7';, there were no results and no new tasks recently with or without that ID.

Additionally, I observed that when restarting the local server, tasks are briefly stored and then stop registering after a few minutes.

Any ideas on why this might be happening?

jakekaplan commented 2 months ago

@lucasdepetrisd thanks for doing that. Can you confirm for me that both your client and server versions are >=3.0.0 (not an rc)?

lucasdepetrisd commented 2 months ago

Yes, I ran the prefect version command and got:

prefect version
13:39:07.366 | DEBUG   | prefect.profiles - Using profile 'dev'
Version:             3.0.0
API version:         0.8.4
Python version:      3.10.6
Git commit:          c40d069d
Built:               Tue, Sep 3, 2024 11:13 AM
OS/Arch:             win32/AMD64
Profile:             dev
Server type:         server
Pydantic version:    2.8.2

I also checked the server version using the endpoint http://192.168.1.13:442/api/admin/version and confirmed its reporting 3.0.0.

However, when querying http://192.168.1.13:442/api/version (described in the docs as the server version), I got API version 0.8.4.

tothandor commented 2 months ago

Hi @jakekaplan ,

Thanks a lot for the detailed explanation!

I have an other development server (dev15) that runs Prefect 3 with a bit more difficult workflow called CountryWeatherFlow, which collects and processes weather data. After failing to upgrade it to rc19, I have modelled the dummy ForkMergeFlow to the test server (t84) from that, and did my experimentation and debugging on that simplified flow.

Now, I have made a new try with the upgrade on the dev server (from rc18 to GA), resetting the database, recreating blocks, and redeploying the workflows. Unlike the test server, the dev server uses an Apache 2.4 reverse proxy, therefore I have aligned its configuration so it could handle both http and websockets (with wstunnel module). It seems that the Task Runs are now registered correctly, only the depencies are not recognized, and some of the tasks do not appear on the run graph.

I am sorry, but I may have mixed up some of the conclusions from the different servers, and beleived that task runs could be reliably checked on the run graph. After all the blocked websockets caused the problem as you have guessed it early.

Thanks for all your time and efforts to solve this issue!

lucasdepetrisd commented 2 months ago

Hi @jakekaplan! I upgraded to version 3.0.1, restarted the server, and let it run over the weekend, but the issue persists. Would you prefer I open a new issue, or should we keep this one closed and continue the conversation here or at the new #15274 issue?

jakekaplan commented 2 months ago

@lucasdepetrisd I'm going to re-open this issue. The other issue while showing similar symptom seems to be from the client not being able to open a websocket connection to the server, which is how you would get the Service 'EventsWorker' failed with 'n' pending error log. From what I can see above, you are able to send events successfully to the server, but they are not being inserted as task runs.

We've added some additional server side debug logging. If I could ask you to enable the following settings server side before restarting your server again for them to kick in:

PREFECT_LOGGING_SERVER_LEVEL=DEBUG
PREFECT_DEBUG_MODE=1

(also double check you're on 3.0.1 for both the client and server, although it seems like you are). I'd expect you to now see the following:

  1. Indication of the TaskRunRecorder service starting

    11:27:36.517 | INFO    | prefect.server - TaskRunRecorder service scheduled to start in-app
    ...
    11:27:36.517 | DEBUG   | prefect.server.services.task_run_recorder - TaskRunRecorder started
  2. Record of your task run event being received by the server:

    11:28:26.267 | DEBUG   | prefect.server.events.messaging - Publishing event: prefect.task-run.Pending with id: 5900a1a5-d3ad-479c-931b-ef9c9c73d151 for resource: prefect.task-run.ae14085a-88f1-43f4-b3d5-c5afb799beb4
  3. Record of your task run event being recieved by the internal service:

    11:28:26.267 | DEBUG   | prefect.server.services.task_run_recorder - Received event: prefect.task-run.Pending with id: 5900a1a5-d3ad-479c-931b-ef9c9c73d151 for resource: prefect.task-run.ae14085a-88f1-43f4-b3d5-c5afb799beb4
  4. Record of your task run being inserted to the DB:

    11:28:26.301 | DEBUG   | prefect.server.services.task_run_recorder - Recorded task run state change

The server debug logs can be pretty noisy as there is a lot going on, if you're having difficultly parsing the logs you can opt to temporarily disable some of the other services (PREFECT_API_SERVICES_SCHEDULER_ENABLED=False for example) while debugging. Thanks for your assistance trying to work through this!

jakekaplan commented 2 months ago

@lucasdepetrisd were you able to investigate any further?

jakekaplan commented 2 months ago

If anyone is coming to this issue unable to view task runs from showing up please following these debug steps (consolidating a couple comments from above).

When you execute a task run, it will emit events via websocket to the server. These events are converted and inserted into the task_run and task_run_state table via the TaskRunRecorder service on the server.

Following the below, you should be able to track a task run from execution time, through client side event emission, server side receipt and finally db insertion. We're looking for any error logs or indication what point in the process something is breaking down.

Debug Steps:

1. Confirm that your client and server are both on 3.0.2 (or whatever the latest version is)

2. Setup debug env vars

@task(log_prints=True) def hello_task(): print("Hello, world!")

### 4. Track the task run through the client:

- When you run a task, prefect will emit events to the `EventsWorker`. You'll see a log like this, sending a task run event to the `EventsWorker`.

16:47:09.303 | DEBUG | MainThread | prefect._internal.concurrency - Service <prefect.events.worker.EventsWorker object at 0x106671300> enqueuing item Event(occurred=DateTime(2024, 9, 4, 20, 47, 9, 303661, tzinfo=Timezone('UTC')), event='prefect.task-run.Running', resource=Resource(root={'prefect.resource.id': 'prefect.task-run.49d25245-7972-4b4b-83d8-3868ce7af378', ..., ))

- The `EventsWorker` has client that maintains a websocket connection to the server. When it receives an event you should see a matching log to the one above like this:

16:47:09.330 | DEBUG | GlobalEventLoopThread | prefect._internal.concurrency - Service <prefect.events.worker.EventsWorker object at 0x106671300> handling item Event(occurred=DateTime(2024, 9, 4, 20, 47, 9, 303661, tzinfo=Timezone('UTC')), event='prefect.task-run.Running', resource=Resource(root={'prefect.resource.id': 'prefect.task-run.49d25245-7972-4b4b-83d8-3868ce7af378', ..., )

- If the `EventsWorker` cannot send an event for any reason you'll see a log like below, followed by a traceback of what went wrong:

16:58:51.007 | ERROR | GlobalEventLoopThread | prefect._internal.concurrency - Service 'EventsWorker' failed with 1 pending items. Traceback (most recent call last): ...

You should see 3 task run state events make their way to the `EventsWorker` and emitted., one for `PENDING`, `RUNNING` and `COMPLETED`

### 5. Track the task run through the server:
The server will accept the task run events via the weboscket and you should see the following debug logs:

1. Record of your task run event being received by the server:

11:28:26.267 | DEBUG | prefect.server.events.messaging - Publishing event: prefect.task-run.Pending with id: 5900a1a5-d3ad-479c-931b-ef9c9c73d151 for resource: prefect.task-run.ae14085a-88f1-43f4-b3d5-c5afb799beb4


2. Record of your task run event being recieved by the internal service that will convert it into a full task run:

11:28:26.267 | DEBUG | prefect.server.services.task_run_recorder - Received event: prefect.task-run.Pending with id: 5900a1a5-d3ad-479c-931b-ef9c9c73d151 for resource: prefect.task-run.ae14085a-88f1-43f4-b3d5-c5afb799beb4

3. Record of your task run actually  being inserted to the DB:

11:28:26.301 | DEBUG | prefect.server.services.task_run_recorder - Recorded task run state change

4. After which you should be able to observe the task run and task run states in the DB directly:

-- SELECT * FROM events where resource_id = 'prefect.task-run.49d25245-7972-4b4b-83d8-3868ce7af378' and event = 'prefect.task-run.Running'

{ "occurred": "2024-09-04 21:04:10.780012", "event": "prefect.task-run.Running", "resource_id": "prefect.task-run.49d25245-7972-4b4b-83d8-3868ce7af378", ..., }

- Another service will record the event to the `task_run` and `task_run_state` tables:

-- SELECT * FROM task_run_state where task_run_id = '49d25245-7972-4b4b-83d8-3868ce7af378' and type = 'RUNNING' { "id": "9679e882-890a-43fe-a537-7e1be6409beb", "created": "2024-09-04 20:47:09.351794", "updated": "2024-09-04 20:47:09.352106", "type": "RUNNING", ..., }

taylor-curran commented 2 months ago

If you are communicating with Prefect through a proxy #15474 may be related.

jbw-vtl commented 2 weeks ago

Hi All,

We seem to have run into the same issue using the current prefect server version, running open source.

prefect version output

Version:             3.1.1
API version:         0.8.4
Python version:      3.11.10
Git commit:          6b50a2b9
Built:               Fri, Nov 8, 2024 12:38 PM
OS/Arch:             win32/AMD64
Profile:             default
Server type:         server
Pydantic version:    2.9.2

We are using a docker agent, both the agent & server running on the same VM within docker compose. Everything is executing as expected, however task runs are not generated.

Now for the odd part, found this thread and changed the two debug options recommended above, recreated both server & agent and the issue seemed to have gone away.

We will do further debugging this week, seemingly some odd behaviour here.

So far the logs do not indicate any errors except the below, however assuming this is expected

prefect-agent   | 21:25:31.971 | DEBUG   | APILogWorkerThread | prefect._internal.concurrency - Encountered exception in call get(<dropped>)
prefect-agent   | Traceback (most recent call last):
prefect-agent   |   File "/usr/local/lib/python3.12/site-packages/prefect/_internal/concurrency/calls.py", line 346, in _run_sync
prefect-agent   |     result = self.fn(*self.args, **self.kwargs)
prefect-agent   |              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
prefect-agent   |   File "/usr/local/lib/python3.12/queue.py", line 179, in get
prefect-agent   |     raise Empty
prefect-agent   | _queue.Empty
jakekaplan commented 2 weeks ago

Hi @jbw-vtl. That is interesting. Can you confirm that your client and server were both on prefect==3.1.1 the whole time?

Your are correct that the _queue.Empty error is expected, that just indicates that the queue is shutting down.

Did you observe any logs in your run execution like:

16:58:51.007 | ERROR   | GlobalEventLoopThread | prefect._internal.concurrency - Service 'EventsWorker' failed with 1 pending items.

or

16:58:51.007| WARNING | prefect.evens.clients - Unable to connect to 'wss://api.prefect.cloud/api/...'

those would indicate that a websocket connection wasn't able to be made. Is it possible the websocket port (80 or 443 usually) was not available?

That being said, the fact that runs weren't visible and then after restart it began to work makes me suspicious of the server. We use an in memory queue server side to accept the task run events and eventually record them to the database. Now that you have debugging logs enabled server side, you should be able to tell if things are processing through the queue normally. If you see something strange or runs no longer are visible, would you be able to report back?

noemtz commented 1 week ago

Hi @jakekaplan,

I'm facing the same issue where no task runs are showing on UI nevertheless flow completes ok and tasks are visible on flow logs. on our case we are running the flow from a deployment on a Kubernetes work pool, we are running both prefect server and prefect worker on same k8s cluster, both server and worker are running prefect version 3.1.1.

We are getting the errors below on worker logs:


prefect.events.clients - Unable to connect to 'wss://psapi-blue.msciapps.com/api/events/in'. Please check your network settings to ensure websocket connections to the API are allowed. Otherwise event data (including task run data) may be lost. Reason: [SSL: CERTIFICATE_VERIFY_FAILED] certificate verify failed: unable to get local issuer certificate (_ssl.c:1129). Set PREFECT_DEBUG_MODE=1 to see the full error.

Traceback (most recent call last):
  File "/opt/conda/envs/model_engine/lib/python3.9/site-packages/prefect/events/clients.py", line 297, in _reconnect
    self._websocket = await self._connect.__aenter__()
  File "/opt/conda/envs/model_engine/lib/python3.9/site-packages/websockets/legacy/client.py", line 635, in __aenter__
    return await self
  File "/opt/conda/envs/model_engine/lib/python3.9/site-packages/websockets/legacy/client.py", line 654, in __await_impl__
    _transport, protocol = await self._create_connection()
  File "/opt/conda/envs/model_engine/lib/python3.9/asyncio/base_events.py", line 1090, in create_connection
    transport, protocol = await self._create_connection_transport(

From the prefect worker pod logs we can confirm there is a socker connectivity problem but when I open a shell on the running worker pod and run a test to validate websocket connectivity to my prefect server api the connection goes through without any errors:

 I have no name!@prefect-worker-bd684b59b-svhwl:~$ cat > ws.py

import asyncio

from prefect.events.clients import PrefectEventsClient

async def main():
    async with PrefectEventsClient() as client:
        print(f"Connected to: {client._events_socket_url}")
        pong = await client._websocket.ping()
        pong_time = await pong
        print(f"Response received in: {pong_time}")

if __name__ == '__main__':
    asyncio.run(main())
        ^C
I have no name!@prefect-worker-bd684b59b-svhwl:~$ pwd
/home/prefect
 I have no name!@prefect-worker-bd684b59b-svhwl:~$ python ./ws.py 
Connected to: wss://psapi-blue.msciapps.com/api/events/in
Response received in: 0.0015839770203456283

See prefect config from the worker pod

 I have no name!@prefect-worker-bd684b59b-svhwl:~$ prefect config view
00:39:36.037 | DEBUG   | prefect.profiles - Using profile 'ephemeral'
🚀 you are connected to:
https://psapi-blue.msciapps.com
PREFECT_PROFILE='ephemeral'
PREFECT_API_ENABLE_HTTP2='true' (from env)
PREFECT_API_SSL_CERT_FILE='/etc/ssl/certs/ca-certificates.crt' (from env)
PREFECT_API_URL='https://psapi-blue.msciapps.com/api' (from env)
PREFECT_DEBUG_MODE='true' (from env)
PREFECT_SERVER_ALLOW_EPHEMERAL_MODE='true' (from profile)
PREFECT_WORKER_PREFETCH_SECONDS='10' (from env)
PREFECT_WORKER_QUERY_SECONDS='5' (from env)

server api is protected by a self signed certificate and the root CA certificate is installed on prefect worker image cert store /etc/ssl/certs/ca-certificates.crt

do you have any clue about why prefect events client from worker process could be failing to verify the ssl certificate?

jakekaplan commented 1 week ago

Hi @noemtz. If you're using a k8s work pool, the worker will spin up k8s jobs in-cluster to execute your flow runs.

While the worker pod may be able to connect and have the right cert, can you confirm whether the job pods that are spun up have your cert installed? The PREFECT_API_SSL_CERT_FILE setting should be copied over from the worker pod to the job pod, but the cert file won't be there unless the image you're using has it.

GalLadislav commented 6 days ago

Hello @jakekaplan, i am experiencing the same issue but probably for different reasons. I am using my own authentication mechanism (middleware) to verify PREFECT_API_KEY. The issue lies with PrefectEventSubscriber not setting api key which is set only for PrefectCloudEventSubscriber. I do not know if my issue is related but sure it is very similar.

Is it reasonable to request implementing passing api key for PrefectEventSubscriber as well? I could make the PR.

jakekaplan commented 6 days ago

Hi @GalLadislav, I think you are correct that it is a similar symptom but unrelated to the above. Would you mind opening a separate issue for your use case/enhancement and can discuss there?

noemtz commented 5 days ago

Hi @jakekaplan, you were exactly right, Jobs were not able to communicate with prefect server API due to incorrect certificate store configuration (missing CA certificates). I was confused about the failed GlobalEventLoopThread logs seen from the worker pod specially because I was able to communicate to prefect server through websocket connection when testing from the worker pod but I didn't realize the failed logs were coming from the jobs, I could confirm looking at the jobs logs and finding the same GlobalEventLoopThread failures, also my ad-hoc websocket test was failing when using jobs container image.

Problem got solved fixed fater adding the right certificates into the jobs/flows container image.