tschellenbach / Stream-Framework

Stream Framework is a Python library, which allows you to build news feed, activity streams and notification systems using Cassandra and/or Redis. The authors of Stream-Framework also provide a cloud service for feed technology:
https://getstream.io/
Other
4.73k stars 541 forks source link

stream_framework.tasks.follow_many hangs in celery for cassandra backend #170

Open aruljothi opened 8 years ago

aruljothi commented 8 years ago

Friends,

I tried to port our Redis backend for Stream framework to Cassandra and followed the instructions along with ensuring keyspace and model creation (sync_table). But when i try to create a follow relationship or any stream framework operations for that matter involving running follow_many task on the celery just hung.

I could see following from the celery logs,

  1. Task crossed pre_run state (through signal), but never exits with success or failure state
  2. I see following in the logs of worker process after the task completed pre_run state SELECT * FROM mystream.user_feed WHERE "feed_id" = %(0)s LIMIT 5000 Sending options message heartbeat on idle connection (139973351007568) cassandra Sending options message heartbeat on idle connection (139973368092368) cassandra Received options response on connection (139973368092368) from cassandra Received options response on connection (139973351007568) from cassandra

if i use celery always eager, the task completes with success state.

Any help is greatly appreciated. Please let me know if you need any more details from my side.

Regards, Aruljothi.S

aruljothi commented 8 years ago

Also is there a user group where i can post clarifications and queries for stream-framework

tschellenbach commented 8 years ago

Are you using Celery with something other than RabbitMQ as the broker? Celery workers aren't always reliable with other broker backends. Especially at high load. Are you having this issue in development or production?

aruljothi commented 8 years ago

I am using celery with rabbitmq and it is working perfectly fine for Redis backend. our app is in POC state and it is in development.

aruljothi commented 8 years ago

This link seems to explain the behaviour that i encounter http://stackoverflow.com/questions/24785299/python-cassandra-driver-operationtimeout-on-every-query-in-celery-task

BTW, if i do strace on the celery process which hungs, it gives me following strace -f -p 32 -s 10000 Process 32 attached futex(0x2a3ba10, FUTEX_WAIT_PRIVATE, 0, NULL

Since you guys are already using cassandra + celery, i think i might be missing something on some setup. Any help is greatly appreciated

aruljothi commented 8 years ago

It seems it has to do with sharing cassandra session across celery worker process. I had to put following in celery.py to ensure it is working fine. This also took care of keyspace creation and model sync in cassandra

from celery.signals import worker_process_init, beat_init

def cassandra_init(_args, *_kwargs): ''' initialize cassandra for mystreamapp ''' import os, importlib from stream_framework.storage.cassandra.connection import setup_connection, settings as stream_framework_settings from cassandra.cqlengine.connection import (cluster as cql_cluster, session as cql_session)
from cassandra.cqlengine.management import sync_table, create_keyspace_simple from myproject.core.semaphore_obj_mgr import SemaphoreObjMgr key_space = stream_framework_settings.STREAM_DEFAULT_KEYSPACE feed_module = "myproject.mystreamapp.lib.gen_feed" feed_classes = ("MyStreamappFlatFeed", "MyStreamappAggregatedFeed", "MyStreamappUserFeed") feed_module_obj = importlib.import_module(feed_module)

this ensures a worker instance won't be using the shared cassandra session

if cql_cluster is not None: cql_cluster.shutdown() if cql_session is not None: cql_session.shutdown()

this ensures a worker instance will get the new cassandra session

setup_connection() os.environ["CQLENG_ALLOW_SCHEMA_MANAGEMENT"] = "True" sp_mgr = SemaphoreObjMgr("chkcassandra") if sp_mgr.is_process_complete():

not required to be processed, may be already processed by another worker process

return True

sp_mgr.acquire() create_keyspace_simple(name=key_space, replication_factor=1) for feed_cls in feed_classes: feed_cls_obj = getattr(feed_module_obj, feed_cls) timeline = feed_cls_obj.get_timeline_storage() sync_table(timeline.model) sp_mgr.release()

Initialize worker context for both standard and periodic tasks.

worker_process_init.connect(cassandra_init) beat_init.connect(cassandra_init)

tbarbugli commented 8 years ago

that's right, forgot about it. We should add this to the install documentation for Celery

tbarbugli commented 8 years ago

@aruljothi it's not a good idea to have the sync_table in the worker_process_init callback function ;)

aruljothi commented 8 years ago

@tbarbugli I felt it, but since i kept that part of the code inside semaphore, it happens only once. What is your recommendation. The way i see it, it has to be called only once right?.

tbarbugli commented 8 years ago

no idea how you implemented the semaphore but I guess that only make sure that one process or one server runs that code at the same time :) anyways you should only run sync_table when something in your schema changes (same way you apply Django schema migrations)

On Tue, Mar 1, 2016 at 5:29 PM, Aruljothi notifications@github.com wrote:

@tbarbugli https://github.com/tbarbugli I felt it, but since i kept that part of the code inside semaphore, it happens only once. What is your recommendation. The way i see it, it has to be called only once right?.

— Reply to this email directly or view it on GitHub https://github.com/tschellenbach/Stream-Framework/issues/170#issuecomment-190796298 .

aruljothi commented 8 years ago

i got it, currently i used redis lock to implement the semaphore which is accessible across servers. However i understand the spirit of your feedback. I'll see where i can fit it in.

I have an unrelated query (i'll open a separate ticket if you want)

I created my feed as shown below and it was working fine when i used Redis backend, but it fails with Cassandra at following location at stream_framework/storage/base.py (def serializer)

85 serializer_instance = serializer_class( 86 activity_class=self.activity_class, **kwargs)

where the serializer_class is CassandraActivitySerializer (defined below) and that expects model as a parameter in init(), but it is not provided here. I think i am missing something here. Can you please help me here.

class MyAppFeed(CassandraFeed): ''' Customizing CassandraFeed class for MyApp requirements ''' activity_class = MyAppActivity activity_serializer = CassandraActivitySerializer

Regards, Aruljothi.S

aruljothi commented 8 years ago

@tbarbugli I have opened another ticket (171) for the above query, please review and help me on this.