msolli / proletarian

A durable job queuing and worker system for Clojure backed by PostgreSQL.
MIT License
161 stars 7 forks source link

Worker stops on org.postgresql.util.PSQLException #26

Open binodsarkar opened 1 month ago

binodsarkar commented 1 month ago

Error FATAL: terminating connection due to idle-in-transaction timeout\n\tat org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2676)\n\tat org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2366)\n\tat org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:356)\n\tat org.postgresql.jdbc.PgStatement.executeInternal(PgStatement.java:496)\n\tat org.postgresql.jdbc.PgStatement.execute(PgStatement.java:413)\n\tat org.postgresql.jdbc.PgPreparedStatement.executeWithFlags(PgPreparedStatement.java:190)\n\tat org.postgresql.jdbc.PgPreparedStatement.executeUpdate(PgPreparedStatement.java:152)\n\tat com.zaxxer.hikari.pool.ProxyPreparedStatement.executeUpdate(ProxyPreparedStatement.java:61)\n\tat com.zaxxer.hikari.pool.HikariProxyPreparedStatement.executeUpdate(HikariProxyPreparedStatement.java)\n\tat proletarian.db$archive_job_BANG_.invokeStatic(db.clj:87)\n\tat proletarian.worker$process_next_job_BANG_$fn__48219$fn__48221.invoke(worker.clj:57)\n\tat proletarian.worker$process_next_job_BANG_$fn__48219.invoke(worker.clj:53)\n\tat proletarian.db$with_tx$fn__39340.invoke(db.clj:139)\n\tat proletarian.db$with_connection.invokeStatic(db.clj:129)\n\tat proletarian.db$with_tx.invokeStatic(db.clj:131)\n\tat proletarian.worker$process_next_job_BANG_.invokeStatic(worker.clj:12)\n\tat proletarian.worker$process_next_jobs_BANG_.invokeStatic(worker.clj:81)\n\tat proletarian.worker$create_queue_worker$reify__48237$work_BANG___48243.invoke(worker.clj:208)\n\tat clojure.core$partial$fn__5908.invoke(core.clj:2640)\n\tat clojure.lang.AFn.run(AFn.java:22)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)\n\tat java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)\n\tat java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)\n\tat java.base/java.lang.Thread.run(Thread.java:833)\n"

Then "proletarian/exception": "proletarian.worker/job-worker-error"

After that "proletarian/exception": "proletarian.executor/shutting-down",

Now instead of shutting down I want to continue worker running and re connect the db.

Version: [msolli/proletarian "1.0.54-alpha" :exclusions [org.postgresql/postgresql]]

msolli commented 3 days ago

Hi @binodsarkar, sorry for the late reply.

The short answer is that you have the proletarian/on-polling-error for when you want to customize the logic for when the worker should shut down. The function is called with the exception, and you can decide for yourself:

(defn stop-worker?
  [e]
  (not (instance? org.postgresql.util.PSQLException e)))

The slightly longer answer is that the FATAL: terminating connection due to idle-in-transaction timeout message seems to indicate that the transaction was sitting idle longer than the idle_in_transaction_session_timeout configuration option. The default is no timeout. What is the value you have configured?

binodsarkar commented 3 days ago

idle_in_transaction_session_timeout is 90 minutes.

msolli commented 2 days ago

OK, and do you know how much time your handler takes to complete? Could it be that it takes longer than 90 seconds?

The way the worker works is:

  1. Open a transaction
  2. Get the next job
  3. Call the handler
  4. Archive and delete job (or update it for retry if exception was caught)

It needs to run in a transaction because that's how the database can guarantee (using FOR UPDATE SKIP LOCKED) that no other worker will take this job while is being worked on by this worker.