django-es / django-elasticsearch-dsl

This is a package that allows indexing of django models in elasticsearch with elasticsearch-dsl-py.
Other
1.02k stars 264 forks source link

Implement CelerySignalProcessor #34

Open barseghyanartur opened 7 years ago

barseghyanartur commented 7 years ago

Now that Signal Processors are merged into master, it would be handy to have a built-in Celery signal processor.

I'm gonna work on it, if you don't mind, @sabricot.

sabricot commented 7 years ago

Yes it would be nice :)

MightySCollins commented 7 years ago

@barseghyanartur did you manage to make any progress on this? If not I might have to try something myself.

barseghyanartur commented 7 years ago

@MightySCollins:

Not yet. I'll probably postpone it for a couple of weeks. If could spend time on this now, go on, please.

hemil commented 6 years ago

Any updates on this? @barseghyanartur

barseghyanartur commented 6 years ago

@hemil:

No, unfortunately not, but that's on my list.

CorrosiveKid commented 6 years ago

Check out #87. It is currently working with Celery 4, but need to write tests around the new signal processor, and also need to add some README/documentation updates.

hemil commented 6 years ago

Alright thanks, will check it out

janwout commented 4 years ago

@safwanrahman @barseghyanartur @CorrosiveKid what is needed for a celery signal processor to be added to the project?

andreasnuesslein commented 4 years ago

@janwout I'd think a PR would help ;)

I started to build this a while back, but ran into some "logic" problems because you can't pass along instances to celery tasks (as you probably know):

@shared_task
def task_handle_save(pk, content_type):
    ct_model = ContentType.objects.get_by_natural_key(*content_type)
    instance = ct_model.get_object_for_this_type(pk=pk)
    registry.update(instance)
    registry.update_related(instance)

class CelerySignalProcessor(RealTimeSignalProcessor):
    def handle_save(self, sender, instance, **kwargs):
        ct = ContentType.objects.get_for_model(instance)
        task_handle_save.delay(instance.pk, ct.natural_key())

So basically what I did was to just move the registry logic into a task. The I get the content_type string and the instances id and pass it to that celery task. Now the problem is: As soon as you do that for deletion, Celery won't find your instance anymore because it will already have been deleted.

Maybe this helps you to get started though :)

alexgarel commented 4 years ago

Yes the good way of handling this with celery is to pass the id of the database object to save.

But there is another pitfall: if your database is running transactions, you have to defer the task until the commit. Otherwise, you may have a race condition, where celery get the object while the transaction is not finished and thus index an old version of the object. In this case, the right way of doing it is to replace:

task_handle_save.delay(instance.pk, ct.natural_key())

by:

transaction.on_commit(lambda: task_handle_save.delay(instance.pk, ct.natural_key()))

Though on_commit is not supported by all databases.

So I would recommend providing two classes, which only change on this very particular (using inheritance). I fear that if you don't provide the on_commit version, many people will be misleaded, and we will have to deal with bug reports because of this. Also really advertise the difference in the documentation.

Also testing celery well is really not easy, as celery does not really provide the right infrastructure for testing. The use of eager, is not too simplistic a strategy to really test it works correctly.

Good luck :sweat_smile:

safwanrahman commented 4 years ago

@janwout Some time from a human who knows Python+Django+Celery and can work without any money is needed for a celery signal processor to be added to the project! 😉

So if you know anyone, or you have the time and experties, then we will get the celery signal processor!

sevetseh28 commented 11 months ago

I see that this CelerySignalProcessor was added. However ,I also noticed that this is being used for ALL Django signals. So whenever I update a model even if it's not part of a Document in the index, it will use this new class. I would expect the SignalProcessor to be the one used only for those signals that are part of indexing, even for the base RealTimeSignalProcessor. I've just realised this is overriding the whole Django signal processing, not only for this library,

lucasmoeskops commented 3 weeks ago

I also noticed the problem you are mentioning @sevetseh28.

To fix it I added an extra check that looks if the instance is part of an indexed model before scheduling it.

Further I don't fully understand how the delete handling is supposed to work. It seems to only address the propagation of the delete to "related" models but not actually perform the delete on the index related to the model itself.