gjedeer / celery-php

PHP client for Celery
http://massivescale.net/celery-php/
Other
422 stars 120 forks source link

Port Celery4 v2 protocol to Redis and PECL-AMQP #78

Open gjedeer opened 7 years ago

gjedeer commented 7 years ago

The celery4 branch contains a working Celery Protocol v2 implementation for PhpAmqpLib. The changes are not yet ported to the other backends.

It should be very straightforward for PECL AMQP - basically add support for application headers, and don't bind to an exchange when checking results.

For Redis we'll need to reverse engineer the serialization used for headers, which shouldn't be a major task too.

Pull requests welcome if you want this code to be released soon!

rocksfrow commented 7 years ago

@gjedeer the celery4 branch isn't working for me (all of my messages are getting deleted by celery). FYI, I also noticed while using this branch that ETA support was broken at the same time.

eta works with the 2.1.1 branch, but while testing the celery4 branch, eta tasks were executed immediately.

gjedeer commented 7 years ago

Do you have these issues with php-amqplib? Which Celery version? I believe I tested with 4.0.0.

Regarding ETA, I believe there are no tests for this, somebody contributed the code but didn't contribute a test.

rocksfrow commented 7 years ago

@gjedeer celery 4.0.2. hey I noticed you have a celery4 branch and then a celery4pr66 branch with more commits... it's just the celery4 branch right?

I forked your repo so trying to find the best base to start with.

I plan to:

Create PR once tested in production in my environment.

rocksfrow commented 7 years ago

@gjedeer I'm wondering if the issue I was having was actually the message format, because I have celery upgraded to 3.1.25 which supports both version 1 and 2 of celery tasks. The celery4 branch works with 3.1.25, but not 4.0.2.

I noticed the messages are coming through with an empty exchange though -- is that intended? Maybe 3.1.25 is better handling that in the message.

gjedeer commented 7 years ago

Hey, thanks, I have not noticed that the exchange was empty. Moreover, someone did a Java client based on my work in this branch and it works, somehow so the bug has already propagated, haha.

I was testing with 4.0.0 - don't remember if it was the release version or some rc.

Your support for the ETA would be much appreciated.

rocksfrow commented 7 years ago

@gjedeer using the celery4 branch rabbit will cmplain saying wrong destination and trash it, and shows me this raw delivery info:

delivery_info:{'consumer_tag': 'None4', 'redelivered': False, 'routing_key': 'MYQUEUE, 'delivery_tag': 7L, 'exchange': ''}

I'm looking at seeing the exchange based to amqplib basic_publish along w/ the routing key... what version of amqplib are you using? The latest 2.6.3 works for you?

gjedeer commented 7 years ago

Hi, yes, I used 2.6.3.

gjedeer commented 7 years ago

I just tried the celery4 branch with celery==4.0.2 and test.php posted the task successfully. I didn't do any changes to the protocol, just ported the settings in testscenario to the new syntax: https://github.com/gjedeer/celery-php/blob/celery4/testscenario/celeryconfig.py

cd testscenario
celery worker -l DEBUG -c 20

and running php test.php in the other terminal worked perfectly. Could it be that you have some incompatible settings in Celery?

[celery4] root@celerydev:~/celery-php/testscenario# pip freeze
Warning: cannot find svn location for distribute==0.6.24dev-r0
SOAPpy==0.12.0
amqp==2.1.4
anyjson==0.3.3
argparse==1.2.1
billiard==3.5.0.2
celery==4.0.2
celery-with-redis==3.0
chardet==2.0.1
## FIXME: could not find svn URL in dependency_links for this package:
distribute==0.6.24dev-r0
fpconst==0.7.2
jedi==0.9.0
kombu==4.0.2
powerline-status==2.5
python-apt==0.8.8.2
python-dateutil==1.5
python-debian==0.1.21
python-debianbts==1.11
pytz==2016.10
redis==2.10.5
reportbug==6.4.4
six==1.6.1
vine==1.1.3
wsgiref==0.1.2
rocksfrow commented 7 years ago

I have been meaning to follow up with you. Sorry for my delay I've just been crunching getting this queue working.

It wasn't a bug with celery4 branch, I wasn't properly defining my custom exchange when init Celery.

So right now I have 4.0.2 working. BUT, I did confirm the eta support is not working in that branch.

What I ended up doing is just setting the task protocol to 1 and using your latest release and everything is working.

I also ran into an issue creating tasks from within the original celery4 task (with protocol 2), but using protocol 1 by default resolved it.

I just don't have the time to resolve that now that I have a working stack. I'll follow up ASAP w eta support to celery 4 branch.

On Jan 7, 2017 12:04 PM, "GDR!" notifications@github.com wrote:

I just tried the celery4 branch with celery==4.0.2 and test.php posted the task successfully. I didn't do any changes to the protocol, just ported the settings in testscenario to the new syntax: https://github.com/gjedeer/ celery-php/blob/celery4/testscenario/celeryconfig.py

cd testscenario celery worker -l DEBUG -c 20

and running php test.php https://github.com/gjedeer/celery-php/blob/celery4/test.php in the other terminal worked perfectly. Could it be that you have some incompatible settings in Celery?

[celery4] root@celerydev:~/celery-php/testscenario# pip freeze Warning: cannot find svn location for distribute==0.6.24dev-r0 SOAPpy==0.12.0 amqp==2.1.4 anyjson==0.3.3 argparse==1.2.1 billiard==3.5.0.2 celery==4.0.2 celery-with-redis==3.0 chardet==2.0.1

FIXME: could not find svn URL in dependency_links for this package:

distribute==0.6.24dev-r0 fpconst==0.7.2 jedi==0.9.0 kombu==4.0.2 powerline-status==2.5 python-apt==0.8.8.2 python-dateutil==1.5 python-debian==0.1.21 python-debianbts==1.11 pytz==2016.10 redis==2.10.5 reportbug==6.4.4 six==1.6.1 vine==1.1.3 wsgiref==0.1.2

— You are receiving this because you commented. Reply to this email directly, view it on GitHub https://github.com/gjedeer/celery-php/issues/78#issuecomment-271095742, or mute the thread https://github.com/notifications/unsubscribe-auth/AAl2OzhLekQdTOR1DjcYO1TbKzliiXPVks5rP8WhgaJpZM4KrXff .

gjedeer commented 7 years ago

That's cool, thanks for your effort anyway. In the meanwhile, I just ran tests with the AMQPLib connector and they pass 100% (mind you, there's no test for ETA)

gjedeer commented 7 years ago

PECL-AMQP connector is delayed until https://github.com/pdezwart/php-amqp/issues/263 is solved.

I tried writing a simple patch but, while it solved single NULL values like 'eta' or 'group', it serialized

'timelimit' => array(NULL, NULL),

into

'timelimit' => array()

which crashed Celery.

For the record, the patch I tried:

diff --git a/amqp.c b/amqp.c
index 1bc135f..d06d026 100644
--- a/amqp.c
+++ b/amqp.c
@@ -304,9 +304,12 @@ void internal_convert_zval_to_amqp_table(zval *zvalArguments, amqp_table_t *argu
                                internal_convert_zval_to_amqp_table(value, &field->value.table, 1 TSRMLS_CC);

                                break;
+                       case IS_NULL:
+                               field->kind = AMQP_FIELD_KIND_VOID;
+
+                               break;
                        default:
                                switch(Z_TYPE_P(value)) {
-                                       case IS_NULL:     strcpy(type, "null"); break;
                                        case IS_OBJECT:   strcpy(type, "object"); break;
                                        case IS_RESOURCE: strcpy(type, "resource"); break;
                                        default:          strcpy(type, "unknown");
gjedeer commented 7 years ago

Redis message looks as following

{
    "body": "W1syLCAyXSwge30sIHsiY2hvcmQiOiBudWxsLCAiY2FsbGJhY2tzIjogbnVsbCwgImVycmJhY2tzIjogbnVsbCwgImNoYWluIjogbnVsbH1d",
    "headers": {
        "origin": "gen30995@celerydev",
        "lang": "py",
        "task": "tasks.add",
        "group": null,
        "root_id": "587879af-15f9-4677-8b1b-9f5f5dda3f13",
        "expires": null,
        "retries": 0,
        "timelimit": [
            null,
            null
        ],
        "argsrepr": "(2, 2)",
        "eta": null,
        "parent_id": null,
        "id": "587879af-15f9-4677-8b1b-9f5f5dda3f13",
        "kwargsrepr": "{}"
    },
    "content-type": "application/json",
    "properties": {
        "body_encoding": "base64",
        "delivery_info": {
            "routing_key": "celery",
            "exchange": ""
        },
        "delivery_mode": 2,
        "priority": 0,
        "correlation_id": "587879af-15f9-4677-8b1b-9f5f5dda3f13",
        "reply_to": "7449fc5f-5470-3dce-b15e-b9474ea756f8",
        "delivery_tag": "84f75102-80bf-428a-b796-5ed0493d5fc6"
    },
    "content-encoding": "utf-8"
}
rocksfrow commented 7 years ago

@gjedeer thanks for being so responsive!

I'm not too hot with unit testing -- so it'd be cool to get that in there so you could see it doesn't work? I definitely confirmed the ETA does not work by queueing messages using protocol 1 on celery 3 and celery 4 with ETA's applied using the celery-php master (not celery4 branch).

When I get over these other hurdles I'm working on maybe I'll have the time to look into how to write a unit test for it. I did look over the code an saw that ETA is moved to headers and NOT in params anymore. I saw you were doing an array_merge maybe you just need to move the ETA param to the headers.

gjedeer commented 7 years ago

I'm not hurrying or anything, it's just an issue I opened and you commented on and I'm using it to paste info for myself or whoever will work on this :)

ETA doesn't work because I set the field to NULL, as I just discovered.

rocksfrow commented 7 years ago

Hey I just saw that redis message which brings up another thing... I see you're setting the correlation_id there -- but the correlation_id isn't being set in amqplibconnector... could that cause any issues with creating tasks from the parent task? Because I was able to get the celery4 branch to post the initial task, but then (existing working with celery3) task.delay()s within that task would fail once received with 'Wrong Destinatinon!?!"

gjedeer commented 7 years ago

Do you have any documentation on what correlation_id is? My AMQP knowledge is rather rusty and this seems to be a new thing

rocksfrow commented 7 years ago

@gjedeer haha yeah I get it. I meant comment wise. Yeah I know ETA is set to null. But you were previously doing a array_merge to overwrite the null value with the user provided one ... but since that was moved to headers I think that's why it's not working anymore. It looks like an easy fix just don't have time for now.

Also, it's not just celery-php, so many accompanying libraries are lacking celery 4 support -- the rabbitmq mgmt module is buggy as hell when using task protocol 2 -- so for now sticking to task protocol 1 got me the benefits I needed from celery 4 without the new protocol, for now.

rocksfrow commented 7 years ago

correction_id is new in protocol 2... I think i remember reading replaced task id?

Yes, confirmed here: http://docs.celeryproject.org/en/latest/internals/protocol.html

Just search that page for correlation_id

rocksfrow commented 7 years ago

Could that have any impact on a task being created within a task (that has a properly defined task_route) ending up getting dropped for Wrong Destination? I was stumped so my only fix was to fallback to protocol 1 for now which is fine.

rocksfrow commented 7 years ago

Oh, flower is buggy as hell with celery 4 (task protocol 2) also because mochiweb is buggy because of it. hopefully these other projects mature quickly.

gjedeer commented 7 years ago

Looks like I have interesting times ahead (client wanted Celery 4 for a project that goes live soon).

Re. correlation_id: I added it in celery.php, so it should be added to all connectors now.

gjedeer commented 7 years ago

\o/

[celery4] root@celerydev:~/celery-php# phpunit CeleryRedisTest unittest/CeleryRedisTest.php
PHPUnit 4.0.17 by Sebastian Bergmann.

................

Time: 13.22 seconds, Memory: 2.75Mb

OK (16 tests, 35 assertions)
rocksfrow commented 7 years ago

More info on correlation_id.

http://stackoverflow.com/questions/20184755/practical-examples-of-how-correlation-id-is-used-in-messaging

rocksfrow commented 7 years ago

What's the unit tests testing though? Like I said, I was able to get celery4 branch to create a task. I was wrong initially.

BUT, that task spawns more tasks via task.apply_async(). This code hasn't been changed, but when using task protocol 2, the subtasks that are spawned go to the right queue but once consumed celery trashes them for 'Wrong Destination'. I think that might be a celery bug but it's hard to say.

That's why I was wondering if maybe the format of the celery4 task that's coming in missing a piece of info (like correllation id) would cause a sub task not to route properly. It doesn't make sense though.

Sorry just ranting here.

rocksfrow commented 7 years ago

Are you able to do that? You should be able to test easily... es:

@app.task
def mytask():
    for i in range(1,10):
        othertask.apply_async()

$celery->posttask() creates the initial task successfully, but the subsequent tasks are thrown away once consumed from a separate queue.

That same archetecture works with task protocol 1 fine (with celery 4). But not task_protocol 2.

rocksfrow commented 7 years ago

NOTE: i am NOT using any default celery values for exchange/queue/key

gjedeer commented 7 years ago

Could you open a new issue for that and paste this code? I'll take a look at it... sometime.

rocksfrow commented 7 years ago

@gjedeer sure thing... maybe i'll beat you too it.

Want me to create a sep issue to write a unit test for ETA as well?

gjedeer commented 7 years ago

Yeah, please do. This megathread will be hard to manage :)

gjedeer commented 7 years ago

Thanks to @pinepain there's this PR fixing the biggest PECL AMQP issue.

Not all tests are passing yet but we're almost there.

shulcsm commented 7 years ago

What is the state Celery4 support? I'm interested in redis broker in particular.

gjedeer commented 7 years ago

It passes the unit tests which cover basic functionality. Users report that advanced stuff, in particular ETA support, does not work. There is a pull request promising to take care of that and you're welcome to test and improve upon that PR.

On February 15, 2017 1:31:28 PM CET, "Mārtiņš Šulcs" notifications@github.com wrote:

What is the state Celery4 support? I'm interested in redis broker in particular.

-- You are receiving this because you were mentioned. Reply to this email directly or view it on GitHub: https://github.com/gjedeer/celery-php/issues/78#issuecomment-279998854

-- Sent from my Android device with K-9 Mail. Please excuse my brevity.