cameronmaske / celery-once

Celery Once allows you to prevent multiple execution and queuing of celery tasks.
https://pypi.python.org/pypi/celery_once/
BSD 2-Clause "Simplified" License
661 stars 90 forks source link

Getting celery beat error 'SchedulingError: Couldn't apply scheduled task' #72

Closed kowsari closed 5 years ago

kowsari commented 6 years ago

Hi, Thank you for writing a useful project for celery.

I am attempting to use this with scheduled tasks that run every minute that are configured in settings.py using celery beat config like below:

CELERYBEAT_SCHEDULE = {
    'check_products': {
      'task': 'com.ixlayer.tasks.check_products', 'schedule': timedelta(minutes=1),
    },
}

However sometimes I get an exception in the process that runs celery beat.

Traceback (most recent call last):
  File "/Users/kowsari/workspace/ixlayer-be/venv/lib/python2.7/site-packages/celery/beat.py", line 222, in apply_entry
    result = self.apply_async(entry, producer=producer, advance=False)
  File "/Users/kowsari/workspace/ixlayer-be/venv/lib/python2.7/site-packages/celery/beat.py", line 328, in apply_async
    entry, exc=exc)), sys.exc_info()[2])
  File "/Users/kowsari/workspace/ixlayer-be/venv/lib/python2.7/site-packages/celery/beat.py", line 320, in apply_async
    **entry.options)
  File "/Users/kowsari/workspace/ixlayer-be/venv/lib/python2.7/site-packages/celery_once/tasks.py", line 99, in apply_async
    raise e
SchedulingError: Couldn't apply scheduled task check_products:

I am using this configuration.

app.conf.ONCE = {
  'backend': 'celery_once.backends.Redis',
  'settings': {
    'url': settings.BROKER_URL,
    'default_timeout': 60 * 60
  }
}

Is this celerybeat getting the AlreadyQueued exception? Or is this something else? Help is much appreciated.

Thanks

cameronmaske commented 6 years ago

Hey @kowsari This is tricky! We currently don't have any tests for scheduled tasks, so this could be a bug with celery_once. I'm not 100% sure, but I think it is masking the AlreadyQueued exception (which makes sense if you get the exceptions only occasionally)

Looking at celery's code, it catches all exceptions and raises them as SchedulingErrors. https://github.com/celery/celery/blob/master/celery/beat.py#L355

The AlreadyQueued exception currently doesn't have a repr or str, so it may be "invisible" when logged as a SchedulingError

To help debug this further, what version of celery are you using?

kowsari commented 6 years ago

Thanks @cameronmaske, I am using Celery 4.1.0.

So I added a base=QueueOnce to my tasks which makes the problem go away, which makes sense to me. This could be the way to avoid the SechedulingErors.

But the problem that I have now sometimes (which I am not sure it is related to this) is that sometimes when my worker and beat starts...beat send the tasks, but they never get picked up by the workers. If I change the code and remove the QueueOnce then the workers pickup the tasks.

Thanks for your help.!

xuhcc commented 5 years ago

I had a very similar problem with periodic task, and solution was to add graceful=True to its options:

@shared_task(base=QueueOnce, once={'graceful': True})
def my_task():
    ...

In my case AlreadyQueued exception was invisible due to logging config, so everything looked like a worker not picking up tasks.

fanchunke commented 5 years ago

recently, we met a very similar problem with periodic task. we use redis backend and redis-once.

In my case, when celery beat scheduled a task, it published a message to redis. But in some reasons, redis closed the connection, so it raised an Exception: SchedulingError: Couldn't apply scheduled task.

Actually, the message had not been published to redis, but celery-once had set a lock for the task, and it isn't released until the default timeout has expired. So celery beat would't send another task, and everything just looked like celery worker not picking up tasks.

So any solutions for this problem? Help is much appreciated.

dulobanov commented 5 years ago

I have very similar issue. It looks like celery beat can not connect to redis. I'm able to connect to redis from the same container and save data in it.

celerybeat_1 | Traceback (most recent call last): celerybeat_1 | File "/usr/local/lib/python3.7/site-packages/redis/connection.py", line 665, in send_packed_command celerybeat_1 | self._sock.sendall(item) celerybeat_1 | ConnectionResetError: [Errno 104] Connection reset by peer celerybeat_1 | celerybeat_1 | During handling of the above exception, another exception occurred: celerybeat_1 | celerybeat_1 | Traceback (most recent call last): celerybeat_1 | File "/usr/local/lib/python3.7/site-packages/celery/beat.py", line 359, in apply_async celerybeat_1 | **entry.options) celerybeat_1 | File "/usr/local/lib/python3.7/site-packages/celery/app/task.py", line 570, in apply_async celerybeat_1 | **options celerybeat_1 | File "/usr/local/lib/python3.7/site-packages/celery/app/base.py", line 755, in send_task celerybeat_1 | self.backend.on_task_call(P, task_id) celerybeat_1 | File "/usr/local/lib/python3.7/site-packages/celery/backends/redis.py", line 294, in on_task_call celerybeat_1 | self.result_consumer.consume_from(task_id) celerybeat_1 | File "/usr/local/lib/python3.7/site-packages/celery/backends/redis.py", line 136, in consume_from celerybeat_1 | self._consume_from(task_id) celerybeat_1 | File "/usr/local/lib/python3.7/site-packages/celery/backends/redis.py", line 142, in _consume_from celerybeat_1 | self._pubsub.subscribe(key) celerybeat_1 | File "/usr/local/lib/python3.7/site-packages/redis/client.py", line 3246, in subscribe celerybeat_1 | ret_val = self.execute_command('SUBSCRIBE', *iterkeys(new_channels)) celerybeat_1 | File "/usr/local/lib/python3.7/site-packages/redis/client.py", line 3141, in execute_command celerybeat_1 | self._execute(connection, connection.send_command, *args, **kwargs) celerybeat_1 | File "/usr/local/lib/python3.7/site-packages/redis/client.py", line 3145, in _execute celerybeat_1 | return command(*args, **kwargs) celerybeat_1 | File "/usr/local/lib/python3.7/site-packages/redis/connection.py", line 686, in send_command celerybeat_1 | check_health=kwargs.get('check_health', True)) celerybeat_1 | File "/usr/local/lib/python3.7/site-packages/redis/connection.py", line 678, in send_packed_command celerybeat_1 | (errno, errmsg)) celerybeat_1 | redis.exceptions.ConnectionError: Error 104 while writing to socket. Connection reset by peer. celerybeat_1 | celerybeat_1 | During handling of the above exception, another exception occurred: celerybeat_1 | celerybeat_1 | Traceback (most recent call last): celerybeat_1 | File "/usr/local/lib/python3.7/site-packages/celery/beat.py", line 242, in apply_entry celerybeat_1 | result = self.apply_async(entry, producer=producer, advance=False) celerybeat_1 | File "/usr/local/lib/python3.7/site-packages/celery/beat.py", line 367, in apply_async celerybeat_1 | entry, exc=exc)), sys.exc_info()[2]) celerybeat_1 | File "/usr/local/lib/python3.7/site-packages/vine/five.py", line 194, in reraise celerybeat_1 | raise value.with_traceback(tb) celerybeat_1 | File "/usr/local/lib/python3.7/site-packages/celery/beat.py", line 359, in apply_async celerybeat_1 | **entry.options) celerybeat_1 | File "/usr/local/lib/python3.7/site-packages/celery/app/task.py", line 570, in apply_async celerybeat_1 | **options celerybeat_1 | File "/usr/local/lib/python3.7/site-packages/celery/app/base.py", line 755, in send_task celerybeat_1 | self.backend.on_task_call(P, task_id) celerybeat_1 | File "/usr/local/lib/python3.7/site-packages/celery/backends/redis.py", line 294, in on_task_call celerybeat_1 | self.result_consumer.consume_from(task_id) celerybeat_1 | File "/usr/local/lib/python3.7/site-packages/celery/backends/redis.py", line 136, in consume_from celerybeat_1 | self._consume_from(task_id) celerybeat_1 | File "/usr/local/lib/python3.7/site-packages/celery/backends/redis.py", line 142, in _consume_from celerybeat_1 | self._pubsub.subscribe(key) celerybeat_1 | File "/usr/local/lib/python3.7/site-packages/redis/client.py", line 3246, in subscribe celerybeat_1 | ret_val = self.execute_command('SUBSCRIBE', *iterkeys(new_channels)) celerybeat_1 | File "/usr/local/lib/python3.7/site-packages/redis/client.py", line 3141, in execute_command celerybeat_1 | self._execute(connection, connection.send_command, *args, **kwargs) celerybeat_1 | File "/usr/local/lib/python3.7/site-packages/redis/client.py", line 3145, in _execute celerybeat_1 | return command(*args, **kwargs) celerybeat_1 | File "/usr/local/lib/python3.7/site-packages/redis/connection.py", line 686, in send_command celerybeat_1 | check_health=kwargs.get('check_health', True)) celerybeat_1 | File "/usr/local/lib/python3.7/site-packages/redis/connection.py", line 678, in send_packed_command celerybeat_1 | (errno, errmsg)) celerybeat_1 | celery.beat.SchedulingError: Couldn't apply scheduled task pressure: Error 104 while writing to socket. Connection reset by peer.

Redis is looks fine. ` /data # redis-cli 127.0.0.1:6379> info

Server

redis_version:5.0.5 redis_git_sha1:00000000 redis_git_dirty:0 redis_build_id:9b87770715cb6bc0 redis_mode:standalone os:Linux 4.19.62-sunxi armv7l arch_bits:32 multiplexing_api:epoll atomicvar_api:atomic-builtin gcc_version:8.3.0 process_id:1 run_id:dfe190cef7f5fc67de7449ec1d605398bfe187fe tcp_port:6379 uptime_in_seconds:87835 uptime_in_days:1 hz:10 configured_hz:10 lru_clock:5837155 executable:/data/redis-server config_file:

Clients

connected_clients:15 client_recent_max_input_buffer:2 client_recent_max_output_buffer:0 blocked_clients:0

Memory

used_memory:3449576 used_memory_human:3.29M used_memory_rss:126824448 used_memory_rss_human:120.95M used_memory_peak:117888992 used_memory_peak_human:112.43M used_memory_peak_perc:2.93% used_memory_overhead:935014 used_memory_startup:649864 used_memory_dataset:2514562 used_memory_dataset_perc:89.82% allocator_allocated:3852408 allocator_active:7606272 allocator_resident:126087168 total_system_memory:1047597056 total_system_memory_human:999.07M used_memory_lua:26624 used_memory_lua_human:26.00K used_memory_scripts:280 used_memory_scripts_human:280B number_of_cached_scripts:1 maxmemory:3221225472 maxmemory_human:3.00G maxmemory_policy:noeviction allocator_frag_ratio:1.97 allocator_frag_bytes:3753864 allocator_rss_ratio:16.58 allocator_rss_bytes:118480896 rss_overhead_ratio:1.01 rss_overhead_bytes:737280 mem_fragmentation_ratio:37.22 mem_fragmentation_bytes:123416872 mem_not_counted_for_evict:0 mem_replication_backlog:0 mem_clients_slaves:0 mem_clients_normal:284802 mem_aof_buffer:0 mem_allocator:jemalloc-5.1.0 active_defrag_running:0 lazyfree_pending_objects:0

Persistence

loading:0 rdb_changes_since_last_save:1678167 rdb_bgsave_in_progress:0 rdb_last_save_time:1566030408 rdb_last_bgsave_status:ok rdb_last_bgsave_time_sec:-1 rdb_current_bgsave_time_sec:-1 rdb_last_cow_size:0 aof_enabled:0 aof_rewrite_in_progress:0 aof_rewrite_scheduled:0 aof_last_rewrite_time_sec:-1 aof_current_rewrite_time_sec:-1 aof_last_bgrewrite_status:ok aof_last_write_status:ok aof_last_cow_size:0

Stats

total_connections_received:1776 total_commands_processed:4142084 instantaneous_ops_per_sec:0 total_net_input_bytes:38913954840 total_net_output_bytes:850284086 instantaneous_input_kbps:0.00 instantaneous_output_kbps:0.00 rejected_connections:0 sync_full:0 sync_partial_ok:0 sync_partial_err:0 expired_keys:429762 expired_stale_perc:0.11 expired_time_cap_reached_count:0 evicted_keys:0 keyspace_hits:410581 keyspace_misses:505 pubsub_channels:0 pubsub_patterns:0 latest_fork_usec:0 migrate_cached_sockets:0 slave_expires_tracked_keys:0 active_defrag_hits:0 active_defrag_misses:0 active_defrag_key_hits:0 active_defrag_key_misses:0

Replication

role:master connected_slaves:0 master_replid:440f6a9c88dced3066d937f2cf0a3add43d88627 master_replid2:0000000000000000000000000000000000000000 master_repl_offset:0 second_repl_offset:-1 repl_backlog_active:0 repl_backlog_size:1048576 repl_backlog_first_byte_offset:0 repl_backlog_histlen:0

CPU

used_cpu_sys:928.422655 used_cpu_user:12469.498184 used_cpu_sys_children:0.002682 used_cpu_user_children:0.005847

Cluster

cluster_enabled:0

Keyspace

db0:keys=1,expires=0,avg_ttl=0 127.0.0.1:6379> `

cameronmaske commented 5 years ago

As a workaround, for now, I'd suggest using a pure celery task to trigger celery once task, i.e.

CELERYBEAT_SCHEDULE = {
    'example': {
      'task': 'tasks.scheduled_task', 'schedule': timedelta(minutes=1),
    },
}

@task()
def scheduled_task():
    scheduled_once_task.delay()

@task(base=QueueOnce)
def scheduled_once_task():
    ....

I don't have the bandwidth currently to fix this, but a PR (while it may be tricky, please include a test case for a scheduled task to aid the review) is welcome.