PrefectHQ / prefect

Prefect is a workflow orchestration framework for building resilient data pipelines in Python.
https://prefect.io
Apache License 2.0
15.88k stars 1.56k forks source link

flow(retries) incompatible with Beautiful Soup #8987

Closed CloudsProgram closed 1 year ago

CloudsProgram commented 1 year ago

First check

Bug summary

When I use beautiful soup, when this line of code takes place soup = BeautifulSoup(response.content, 'lxml') <<(I think this parses the entire html from the page)

and when there are retries for the flow, it throws an error.

I tried putting the retries in task, which doesn't throw an error. But I'm just wondering if this is a bug or not. If it is I'm wondering what is causing the error.

Reproduction

import requests
from bs4 import BeautifulSoup
from prefect import flow, task

@task()
def scrape():
    url = "https://quotes.toscrape.com/"
    response = requests.get(url)
    print(response)
    soup = BeautifulSoup(response.content, 'lxml')
    return soup

@flow(retries=2)
def main_flow():
    soup = scrape()

if __name__ == "__main__":
    main_flow()

Error

8:55:01.879 | ERROR   | Task run 'scrape-0' - Crash detected! Execution was interrupted by an unexpected exception: Traceback (most recent call 
last):
  File "C:\Users\cloud\anaconda3\envs\pltr_project\lib\site-packages\cloudpickle\cloudpickle_fast.py", line 632, in dump
    return Pickler.dump(self, obj)
  File "C:\Users\cloud\anaconda3\envs\pltr_project\lib\site-packages\cloudpickle\cloudpickle_fast.py", line 723, in reducer_override
    if sys.version_info[:2] < (3, 7) and _is_parametrized_type_hint(obj):  # noqa  # pragma: no branch
RecursionError: maximum recursion depth exceeded in comparison

The above exception was the direct cause of the following exception:

_pickle.PicklingError: Could not pickle object as excessively deep recursion required.

18:55:02.001 | ERROR   | Flow run 'organic-robin' - Encountered exception during execution:
Traceback (most recent call last):
  File "C:\Users\cloud\anaconda3\envs\pltr_project\lib\site-packages\cloudpickle\cloudpickle_fast.py", line 632, in dump
    return Pickler.dump(self, obj)
  File "C:\Users\cloud\anaconda3\envs\pltr_project\lib\site-packages\cloudpickle\cloudpickle_fast.py", line 723, in reducer_override
    if sys.version_info[:2] < (3, 7) and _is_parametrized_type_hint(obj):  # noqa  # pragma: no branch
RecursionError: maximum recursion depth exceeded in comparison

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "C:\Users\cloud\anaconda3\envs\pltr_project\lib\site-packages\prefect\engine.py", line 665, in orchestrate_flow_run
    result = await run_sync(flow_call)
  File "C:\Users\cloud\anaconda3\envs\pltr_project\lib\site-packages\prefect\utilities\asyncutils.py", line 156, in run_sync_in_interruptible_worker_thread
    tg.start_soon(
  File "C:\Users\cloud\anaconda3\envs\pltr_project\lib\site-packages\anyio\_backends\_asyncio.py", line 662, in __aexit__
    raise exceptions[0]
  File "C:\Users\cloud\anaconda3\envs\pltr_project\lib\site-packages\anyio\to_thread.py", line 31, in run_sync
    return await get_asynclib().run_sync_in_worker_thread(
  File "C:\Users\cloud\anaconda3\envs\pltr_project\lib\site-packages\anyio\_backends\_asyncio.py", line 937, in run_sync_in_worker_thread        
    return await future
  File "C:\Users\cloud\anaconda3\envs\pltr_project\lib\site-packages\anyio\_backends\_asyncio.py", line 867, in run
    result = context.run(func, *args)
  File "C:\Users\cloud\anaconda3\envs\pltr_project\lib\site-packages\prefect\utilities\asyncutils.py", line 135, in capture_worker_thread_and_result
    result = __fn(*args, **kwargs)
  File "c:\Users\cloud\OneDrive\Desktop\programming\zoomcamp_2023\project\tester.py", line 17, in main_flow
    soup = scrape()
  File "C:\Users\cloud\anaconda3\envs\pltr_project\lib\site-packages\prefect\tasks.py", line 469, in __call__
    return enter_task_run_engine(
  File "C:\Users\cloud\anaconda3\envs\pltr_project\lib\site-packages\prefect\engine.py", line 965, in enter_task_run_engine
    return run_async_from_worker_thread(begin_run)
  File "C:\Users\cloud\anaconda3\envs\pltr_project\lib\site-packages\prefect\utilities\asyncutils.py", line 177, in run_async_from_worker_thread 
    return anyio.from_thread.run(call)
  File "C:\Users\cloud\anaconda3\envs\pltr_project\lib\site-packages\anyio\from_thread.py", line 49, in run
    return asynclib.run_async_from_thread(func, *args)
  File "C:\Users\cloud\anaconda3\envs\pltr_project\lib\site-packages\anyio\_backends\_asyncio.py", line 970, in run_async_from_thread
    return f.result()
  File "C:\Users\cloud\anaconda3\envs\pltr_project\lib\concurrent\futures\_base.py", line 446, in result
    return self.__get_result()
  File "C:\Users\cloud\anaconda3\envs\pltr_project\lib\concurrent\futures\_base.py", line 391, in __get_result
    raise self._exception
  File "C:\Users\cloud\anaconda3\envs\pltr_project\lib\site-packages\prefect\engine.py", line 1114, in get_task_call_return_value
    return await future._result()
  File "C:\Users\cloud\anaconda3\envs\pltr_project\lib\site-packages\prefect\futures.py", line 237, in _result
    return await final_state.result(raise_on_failure=raise_on_failure, fetch=True)
  File "C:\Users\cloud\anaconda3\envs\pltr_project\lib\site-packages\prefect\states.py", line 91, in _get_state_result
    raise await get_state_exception(state)
  File "C:\Users\cloud\anaconda3\envs\pltr_project\lib\site-packages\prefect\task_runners.py", line 207, in submit
    result = await call()
  File "C:\Users\cloud\anaconda3\envs\pltr_project\lib\site-packages\prefect\engine.py", line 1351, in begin_task_run
    state = await orchestrate_task_run(
  File "C:\Users\cloud\anaconda3\envs\pltr_project\lib\site-packages\prefect\engine.py", line 1558, in orchestrate_task_run
    terminal_state = await return_value_to_state(
  File "C:\Users\cloud\anaconda3\envs\pltr_project\lib\site-packages\prefect\states.py", line 291, in return_value_to_state
    return Completed(data=await result_factory.create_result(data))
  File "C:\Users\cloud\anaconda3\envs\pltr_project\lib\site-packages\prefect\results.py", line 317, in create_result
    return await PersistedResult.create(
  File "C:\Users\cloud\anaconda3\envs\pltr_project\lib\site-packages\prefect\results.py", line 477, in create
    data = serializer.dumps(obj)
  File "C:\Users\cloud\anaconda3\envs\pltr_project\lib\site-packages\prefect\serializers.py", line 144, in dumps
    blob = pickler.dumps(obj)
  File "C:\Users\cloud\anaconda3\envs\pltr_project\lib\site-packages\cloudpickle\cloudpickle_fast.py", line 73, in dumps
    cp.dump(obj)
  File "C:\Users\cloud\anaconda3\envs\pltr_project\lib\site-packages\cloudpickle\cloudpickle_fast.py", line 639, in dump
    raise pickle.PicklingError(msg) from e
_pickle.PicklingError: Could not pickle object as excessively deep recursion required.
18:55:02.138 | ERROR   | Flow run 'organic-robin' - Finished in state Failed('Flow run encountered an exception. Traceback (most recent call last):\n  File "C:\\Users\\cloud\\anaconda3\\envs\\pltr_project\\lib\\site-packages\\cloudpickle\\cloudpickle_fast.py", line 632, in dump\n    return Pickler.dump(self, obj)\n  File "C:\\Users\\cloud\\anaconda3\\envs\\pltr_project\\lib\\site-packages\\cloudpickle\\cloudpickle_fast.py", line 723, in reducer_override\n    if sys.version_info[:2] < (3, 7) and _is_parametrized_type_hint(obj):  # noqa  # pragma: no branch\nRecursionError: maximum recursion depth exceeded in comparison\n\nThe above exception was the direct cause of the following exception:\n\n_pickle.PicklingError: Could not pickle object as excessively deep recursion required.\n')
Traceback (most recent call last):
  File "C:\Users\cloud\anaconda3\envs\pltr_project\lib\site-packages\cloudpickle\cloudpickle_fast.py", line 632, in dump
    return Pickler.dump(self, obj)
  File "C:\Users\cloud\anaconda3\envs\pltr_project\lib\site-packages\cloudpickle\cloudpickle_fast.py", line 723, in reducer_override
    if sys.version_info[:2] < (3, 7) and _is_parametrized_type_hint(obj):  # noqa  # pragma: no branch
RecursionError: maximum recursion depth exceeded in comparison

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "c:\Users\cloud\OneDrive\Desktop\programming\zoomcamp_2023\project\tester.py", line 20, in <module>
    main_flow()
  File "C:\Users\cloud\anaconda3\envs\pltr_project\lib\site-packages\prefect\flows.py", line 468, in __call__
    return enter_flow_run_engine_from_flow_call(
  File "C:\Users\cloud\anaconda3\envs\pltr_project\lib\site-packages\prefect\engine.py", line 175, in enter_flow_run_engine_from_flow_call       
    return anyio.run(begin_run)
  File "C:\Users\cloud\anaconda3\envs\pltr_project\lib\site-packages\anyio\_core\_eventloop.py", line 70, in run
    return asynclib.run(func, *args, **backend_options)
  File "C:\Users\cloud\anaconda3\envs\pltr_project\lib\site-packages\anyio\_backends\_asyncio.py", line 292, in run
    return native_run(wrapper(), debug=debug)
  File "C:\Users\cloud\anaconda3\envs\pltr_project\lib\asyncio\runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "C:\Users\cloud\anaconda3\envs\pltr_project\lib\asyncio\base_events.py", line 647, in run_until_complete
    return future.result()
  File "C:\Users\cloud\anaconda3\envs\pltr_project\lib\site-packages\anyio\_backends\_asyncio.py", line 287, in wrapper
    return await func(*args)
  File "C:\Users\cloud\anaconda3\envs\pltr_project\lib\site-packages\prefect\client\utilities.py", line 47, in with_injected_client
    return await fn(*args, **kwargs)
  File "C:\Users\cloud\anaconda3\envs\pltr_project\lib\site-packages\prefect\engine.py", line 256, in create_then_begin_flow_run
    return await state.result(fetch=True)
  File "C:\Users\cloud\anaconda3\envs\pltr_project\lib\site-packages\prefect\states.py", line 91, in _get_state_result
    raise await get_state_exception(state)
  File "C:\Users\cloud\anaconda3\envs\pltr_project\lib\site-packages\prefect\engine.py", line 665, in orchestrate_flow_run
    result = await run_sync(flow_call)
  File "C:\Users\cloud\anaconda3\envs\pltr_project\lib\site-packages\prefect\utilities\asyncutils.py", line 156, in run_sync_in_interruptible_worker_thread
    tg.start_soon(
  File "C:\Users\cloud\anaconda3\envs\pltr_project\lib\site-packages\anyio\_backends\_asyncio.py", line 662, in __aexit__
    raise exceptions[0]
  File "C:\Users\cloud\anaconda3\envs\pltr_project\lib\site-packages\anyio\to_thread.py", line 31, in run_sync
    return await get_asynclib().run_sync_in_worker_thread(
  File "C:\Users\cloud\anaconda3\envs\pltr_project\lib\site-packages\anyio\_backends\_asyncio.py", line 937, in run_sync_in_worker_thread        
    return await future
  File "C:\Users\cloud\anaconda3\envs\pltr_project\lib\site-packages\anyio\_backends\_asyncio.py", line 867, in run
    result = context.run(func, *args)
  File "C:\Users\cloud\anaconda3\envs\pltr_project\lib\site-packages\prefect\utilities\asyncutils.py", line 135, in capture_worker_thread_and_result
    result = __fn(*args, **kwargs)
  File "c:\Users\cloud\OneDrive\Desktop\programming\zoomcamp_2023\project\tester.py", line 17, in main_flow
    soup = scrape()
  File "C:\Users\cloud\anaconda3\envs\pltr_project\lib\site-packages\prefect\tasks.py", line 469, in __call__
    return enter_task_run_engine(
  File "C:\Users\cloud\anaconda3\envs\pltr_project\lib\site-packages\prefect\engine.py", line 965, in enter_task_run_engine
    return run_async_from_worker_thread(begin_run)
  File "C:\Users\cloud\anaconda3\envs\pltr_project\lib\site-packages\prefect\utilities\asyncutils.py", line 177, in run_async_from_worker_thread 
    return anyio.from_thread.run(call)
  File "C:\Users\cloud\anaconda3\envs\pltr_project\lib\site-packages\anyio\from_thread.py", line 49, in run
    return asynclib.run_async_from_thread(func, *args)
  File "C:\Users\cloud\anaconda3\envs\pltr_project\lib\site-packages\anyio\_backends\_asyncio.py", line 970, in run_async_from_thread
    return f.result()
  File "C:\Users\cloud\anaconda3\envs\pltr_project\lib\concurrent\futures\_base.py", line 446, in result
    return self.__get_result()
  File "C:\Users\cloud\anaconda3\envs\pltr_project\lib\concurrent\futures\_base.py", line 391, in __get_result
    raise self._exception
  File "C:\Users\cloud\anaconda3\envs\pltr_project\lib\site-packages\prefect\engine.py", line 1114, in get_task_call_return_value
    return await future._result()
  File "C:\Users\cloud\anaconda3\envs\pltr_project\lib\site-packages\prefect\futures.py", line 237, in _result
    return await final_state.result(raise_on_failure=raise_on_failure, fetch=True)
  File "C:\Users\cloud\anaconda3\envs\pltr_project\lib\site-packages\prefect\states.py", line 91, in _get_state_result
    raise await get_state_exception(state)
  File "C:\Users\cloud\anaconda3\envs\pltr_project\lib\site-packages\prefect\task_runners.py", line 207, in submit
    result = await call()
  File "C:\Users\cloud\anaconda3\envs\pltr_project\lib\site-packages\prefect\engine.py", line 1351, in begin_task_run
    state = await orchestrate_task_run(
  File "C:\Users\cloud\anaconda3\envs\pltr_project\lib\site-packages\prefect\engine.py", line 1558, in orchestrate_task_run
    terminal_state = await return_value_to_state(
  File "C:\Users\cloud\anaconda3\envs\pltr_project\lib\site-packages\prefect\states.py", line 291, in return_value_to_state
    return Completed(data=await result_factory.create_result(data))
  File "C:\Users\cloud\anaconda3\envs\pltr_project\lib\site-packages\prefect\results.py", line 317, in create_result
    return await PersistedResult.create(
  File "C:\Users\cloud\anaconda3\envs\pltr_project\lib\site-packages\prefect\results.py", line 477, in create
    data = serializer.dumps(obj)
  File "C:\Users\cloud\anaconda3\envs\pltr_project\lib\site-packages\prefect\serializers.py", line 144, in dumps
    blob = pickler.dumps(obj)
  File "C:\Users\cloud\anaconda3\envs\pltr_project\lib\site-packages\cloudpickle\cloudpickle_fast.py", line 73, in dumps
    cp.dump(obj)
  File "C:\Users\cloud\anaconda3\envs\pltr_project\lib\site-packages\cloudpickle\cloudpickle_fast.py", line 639, in dump
    raise pickle.PicklingError(msg) from e
_pickle.PicklingError: Could not pickle object as excessively deep recursion required.

Versions

Requirements I have for the envrionment:
beautifulsoup4==4.12.0
lxml==4.9.2
pandas==1.5.2
prefect==2.8.5
prefect-sqlalchemy==0.2.2
protobuf==4.21.11
pyarrow==10.0.1
psycopg2-binary==2.9.5
sqlalchemy==1.4.46

Additional context

No response

zanieb commented 1 year ago

Hi @CloudsProgram — turning on flow retries enables task result persistence by default. See https://docs.prefect.io/concepts/results/#toggling-persistence for more details.

It looks like the beautiful soup object cannot be pickled so it's not compatible with our default result serializer. You could work around this by either disabling persistence for the specific task (it should just run again on retry) or changing your return value to something we can cache the result of.

CloudsProgram commented 1 year ago

Gotcha, thank you much for the response!