sibson / redbeat

RedBeat is a Celery Beat Scheduler that stores the scheduled tasks and runtime metadata in Redis.
Apache License 2.0
909 stars 130 forks source link

Redbeat starts executing tasks after a certain cooldown or doesn't even execute it #277

Open Fundi1330 opened 1 month ago

Fundi1330 commented 1 month ago

Here is what I want to do. When the user clicks on a button it sends a post request via Ajax to /api/start_mining endpoint, which creates a task to mine points. Every 5 seconds it triggers the task to mine 1000 points and after 25 seconds the task deletes itself and the user can now claim points and start the whole process again. The problem is that after Ajax sends a post request celery redbeat starts the first task after a certain cooldown or sometimes doesn't start it. Because of this, the user does not get points or get not enough

Here are my API routes: ` @api_bp.route('/start_mining', methods=['POST']) def start_mining(): now = datetime.utcnow() user = get_current_user() user.last_date_points_claimed = datetime.now() db.session.commit()

new_button = render_template('includes/mining_button.html', user=user)

schedule_name = str(uuid4())

interval = rrule('SECONDLY', dtstart=now, interval=5)
entry = RedBeatSchedulerEntry(schedule_name, 'app.celery.mine_points', interval, args=[user.id], kwargs={
    'mined_url': url_for('api.mined', _external=True),
    'end_mining_url': url_for('api.end_mining', _external=True),
    'schedule_name': schedule_name

}, app=celery_app)
entry.last_run_at = None
entry.save()

return jsonify(new_button=new_button)

@api_bp.route('/claim_coins', methods=['POST']) def claim_coins(): user = get_current_user() user.points += user.mined_points user.mined_points = 0 db.session.commit()

new_button = render_template('includes/mining_button.html', user=user)

return jsonify(new_button=new_button)

@api_bp.route('/mined', methods=['POST']) @csrf.exempt def mined(): data = request.json user: User = User.query.get_or_404(data['id']) emit('mine', {'points': user.mined_points}, namespace='/', json=True, broadcast=True)

return jsonify(ok=True)

@api_bp.route('/end_mining', methods=['POST']) @csrf.exempt def end_mining(): data = request.json user = User.query.get(data['id']) emit('end_mining', {'new_button': render_template('includes/mining_button.html', user=user)}, namespace='/', json=True, broadcast=True)

return jsonify(ok=True)

My celery file and task from celery import Celery, Task from celery import current_app as celery_app from flask import Flask, current_app from .models import User from datetime import datetime, timedelta from os import environ from redbeat import RedBeatSchedulerEntry from .config import db from requests import session

def celery_init_app(app: Flask): class SportyTask(Task): def call(self, *args, *kwargs): with app.app_context(): return self.run(args, **kwargs)

celery_app = Celery(app.name, task_cls=SportyTask)
celery_app.config_from_object(app.config['CELERY'])

celery_app.set_default()

app.extensions['celery'] = celery_app

return celery_app

@celery_app.task(ignore_result=False) def mine_points(user_id: int, mined_url: str, end_mining_url: str, schedule_name: str): user: User = User.query.get(user_id) difference: timedelta = datetime.now() - user.last_date_points_claimed

try:
    entry = RedBeatSchedulerEntry.from_key(f'redbeat:{schedule_name}', celery_app)
except KeyError:
    entry = None

headers = {
    'Content-Type': 'application/json',
    'Accept': 'application/json'
}
data = {
    'id': user.id
}
client = session()

if entry and difference.total_seconds() >= int(environ.get('START_MINING_COINS_COOLDOWN')):
    print('post')
    client.post(end_mining_url, json=data, headers=headers, verify=False)
    entry.delete()
    return

new_points = int(environ.get('POINTS_PER_HOUR'))

user.mined_points += new_points
db.session.commit()

client.post(mined_url, json=data, headers=headers, verify=False)

`

Celery config: CELERY = { 'broker_url': 'redis://localhost:6379/0', 'result_backend': 'redis://localhost:6379/0', 'task_ignore_result': True, 'beat_scheduler': 'redbeat.RedBeatScheduler', 'broker_transport_options': { 'max_retries': 3, 'interval_start': 0, 'interval_step': 0.2, 'interval_max': 0.2, } }

P.S. Redis runs on the WSL server and Celery connects to it successfully. I run my flask app on windows

controldev commented 3 weeks ago

Same problem here. This is a huge blocker.

controldev commented 3 weeks ago

Another problem that practically always occurs is that when starting the beat instance, there's a 'warmup' (usually 5-20 minutes) in which no scheduled tasks are run; they're just skipped.

This combined with the other issue makes it impossible to run this in production unfortunately.

Fundi1330 commented 3 weeks ago

I don't have the second problem

Fundi1330 commented 4 days ago

Has someone been able to solve this?