agronholm / apscheduler

Task scheduling library for Python
MIT License
6.11k stars 698 forks source link

TypeError: cannot pickle '_io.TextIOWrapper' object #933

Closed Kenchir closed 1 month ago

Kenchir commented 2 months ago

Things to check first

Version

3.10.4

What happened?

I was adding a job to my schedule. Jobstore is PostgresQL The function added is an instance method. The error comes when the function is being pickled.

Traceback (most recent call last):
  File "/Users/kenchir/Documents/bigdata/python/reporting/reporting_worker/worker_main.py", line 238, in <module>
    start()
  File "/Users/kenchir/.local/share/virtualenvs/reporting-P153UCJv/lib/python3.9/site-packages/click/core.py", line 1157, in __call__
    return self.main(*args, **kwargs)
  File "/Users/kenchir/.local/share/virtualenvs/reporting-P153UCJv/lib/python3.9/site-packages/click/core.py", line 1078, in main
    rv = self.invoke(ctx)
  File "/Users/kenchir/.local/share/virtualenvs/reporting-P153UCJv/lib/python3.9/site-packages/click/core.py", line 1434, in invoke
    return ctx.invoke(self.callback, **ctx.params)
  File "/Users/kenchir/.local/share/virtualenvs/reporting-P153UCJv/lib/python3.9/site-packages/click/core.py", line 783, in invoke
    return __callback(*args, **kwargs)
  File "/Users/kenchir/Documents/bigdata/python/reporting/reporting_worker/worker_main.py", line 58, in wrapped
    original_f(worker=worker, environment=environment, **kwargs)
  File "/Users/kenchir/Documents/bigdata/python/reporting/reporting_worker/worker_main.py", line 111, in start
    scheduling.start(worker, environment)
  File "/Users/kenchir/Documents/bigdata/python/reporting/reporting_worker/scheduling.py", line 193, in start
    scheduler.start()
  File "/Users/kenchir/.local/share/virtualenvs/reporting-P153UCJv/lib/python3.9/site-packages/apscheduler/schedulers/background.py", line 35, in start
    BaseScheduler.start(self, *args, **kwargs)
  File "/Users/kenchir/.local/share/virtualenvs/reporting-P153UCJv/lib/python3.9/site-packages/apscheduler/schedulers/base.py", line 177, in start
    self._real_add_job(job, jobstore_alias, replace_existing)
  File "/Users/kenchir/.local/share/virtualenvs/reporting-P153UCJv/lib/python3.9/site-packages/apscheduler/schedulers/base.py", line 881, in _real_add_job
    store.add_job(job)
  File "/Users/kenchir/.local/share/virtualenvs/reporting-P153UCJv/lib/python3.9/site-packages/apscheduler/jobstores/sqlalchemy.py", line 97, in add_job
    'job_state': pickle.dumps(job.__getstate__(), self.pickle_protocol)
TypeError: cannot pickle '_io.TextIOWrapper' object

It fails to store the self argument 'args': (<package.sub_package.file_name.ClassName object at 0x1062f1f40>,),

Note that ClassName also inherits some other ClassX

How can we reproduce the bug?

scheduling.py


from apscheduler.executors.pool import ThreadPoolExecutor
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.schedulers.background import BackgroundScheduler
def start(worker: Worker, environment: str) -> None:

    file_import_freq_minutes = int(os.environ.get("FILE_IMPORT_FREQ_MINUTES", "0"))

    executors = {
            'default': ThreadPoolExecutor(20),
            # 'processpool': ProcessPoolExecutor(5)
        }
    job_defaults = {
        'coalesce': False,
        'max_instances': 3
    }
    _db_engine = f"postgresql+psycopg://{os.environ['DB_USER']}:{os.environ['DB_PWD']}@{os.environ['DB_HOST']}:" \
                 f"{os.environ['DB_PORT']}/{os.environ['DB_NAME']}"

    jobstores = {
      'default': SQLAlchemyJobStore(url=_db_engine, pickle_protocol=5)
    }

    scheduler = BackgroundScheduler(executors=executors, job_defaults=job_defaults, timezone=worker.timezone, jobstores=jobstores)

    scheduler.add_job(worker.check_data, trigger='cron', hour=0,  minute=0, id='regular_check')
    scheduler.add_job(worker.foreign_connection_limitation, trigger='cron', hour=0, d='foreign_connection_limitation')
    scheduler.add_job(worker.foreign_tables_detection, trigger='cron', hour=0, id='foreign_tables_detection')

    scheduler.add_job(worker.file_import, trigger='interval', minutes=file_import_freq_minutes, id='file_import')

    try:
        scheduler.start()
        print(scheduler.print_jobs())
        while True:
            time.sleep(SLEEP_PERIOD)
    except (KeyboardInterrupt, SystemExit):
        print("shutdown scheduler")
        scheduler.shutdown()

worker.py

class Worker(abc.ABC):
    def __init__(self):

    def file_import(self, arg):

    def foreign_connection_limitation(self):

    def foreign_tables_detection(self)

Creating a project with structure above can re-produce the error.

agronholm commented 2 months ago

Structure above? What structure? I don't see anything.

agronholm commented 2 months ago

Also, why are you reporting a pickling failure if the pickling of a bound method (ie. the self argument), fails?

Kenchir commented 1 month ago

Yes. It fails due to the self argument. How is that fixed

Kenchir commented 1 month ago

Structure above? What structure? I don't see anything.

I have updated it

agronholm commented 1 month ago

The easiest way to fix this would be to obtain the worker dynamically in the target function code, instead of being passed to it implicitly via the self argument. If you can make it a global, that would be one solution.