tschellenbach / Stream-Framework

Stream Framework is a Python library, which allows you to build news feed, activity streams and notification systems using Cassandra and/or Redis. The authors of Stream-Framework also provide a cloud service for feed technology:
https://getstream.io/
Other
4.73k stars 542 forks source link

OperationTimedOut in Celery #98

Open TvoroG opened 10 years ago

TvoroG commented 10 years ago

Hello!

When I try to follow user insert queries return the following error:

ERROR 2014-10-07 13:21:24,547 log 5092 -1216981312 Task stream_framework.tasks.fanout_operation_hi_priority[3312a605-9d8a-4ee7-9385-929f6d881b68] raised unexpected: OperationTimedOut('errors=errors=errors={}, last_host=localhost, last_host=None, last_host=None',)
Traceback (most recent call last):
  File "/vagrant/.env/lib/python2.7/site-packages/celery/app/trace.py", line 240, in trace_task
    R = retval = fun(*args, **kwargs)
  File "/vagrant/.env/lib/python2.7/site-packages/celery/app/trace.py", line 437, in __protected_call__
    return self.run(*args, **kwargs)
  File "/vagrant/.env/lib/python2.7/site-packages/stream_framework/tasks.py", line 17, in fanout_operation_hi_priority
    return fanout_operation(feed_manager, feed_class, user_ids, operation, operation_kwargs)
  File "/vagrant/.env/lib/python2.7/site-packages/celery/local.py", line 167, in <lambda>
    __call__ = lambda x, *a, **kw: x._get_current_object()(*a, **kw)
  File "/vagrant/.env/lib/python2.7/site-packages/celery/app/trace.py", line 438, in __protected_call__
    return orig(self, *args, **kwargs)
  File "/vagrant/.env/lib/python2.7/site-packages/celery/app/task.py", line 420, in __call__
    return self.run(*args, **kwargs)
  File "/vagrant/.env/lib/python2.7/site-packages/stream_framework/tasks.py", line 11, in fanout_operation
    feed_manager.fanout(user_ids, feed_class, operation, operation_kwargs)
  File "/vagrant/.env/lib/python2.7/site-packages/stream_framework/feed_managers/base.py", line 346, in fanout
    operation(feed, **operation_kwargs)
  File "/vagrant/.env/lib/python2.7/site-packages/cqlengine/query.py", line 197, in __exit__
    self.execute()
  File "/vagrant/.env/lib/python2.7/site-packages/stream_framework/storage/cassandra/timeline_storage.py", line 30, in execute
    super(Batch, self).execute()
  File "/vagrant/.env/lib/python2.7/site-packages/cqlengine/query.py", line 185, in execute
    tmp = execute('\n'.join(query_list), parameters, self._consistency)
  File "/vagrant/.env/lib/python2.7/site-packages/cqlengine/connection.py", line 112, in execute
    result = session.execute(query, params)
  File "/vagrant/.env/lib/python2.7/site-packages/cassandra/cluster.py", line 1294, in execute
    result = future.result(timeout)
  File "/vagrant/.env/lib/python2.7/site-packages/cassandra/cluster.py", line 2790, in result
    raise OperationTimedOut(errors=self._errors, last_host=self._current_host)
OperationTimedOut: errors=errors=errors={}, last_host=localhost, last_host=None, last_host=None

I found this solution: https://github.com/cqlengine/cqlengine/issues/237 but it's crude. How you deal with this problem?

tbarbugli commented 10 years ago

unfortunately the solution in the link is pretty much the best way to deal with python-driver long-lived session and celery pre-fork. How often do you get the timeout? did you already check cassandra logs?

TvoroG commented 10 years ago

Almost every attempt to follow. I didn't find anything useful in cassandra logs. Thanks for help! I just wanted to know whether there is a better solution or not.

picturedots commented 8 years ago

I think that I'm getting this error -- unfortunately the original link to the cqlengine issue is now dead ( https://github.com/cqlengine/cqlengine/issues/237 ) -- does anybody have any memory as to the workaround in that link?

tbarbugli commented 8 years ago

something like this:

import threading
from django.conf import settings
from cassandra.cluster import Cluster
from celery.signals import worker_process_init,worker_process_shutdown

thread_local = threading.local()

@worker_process_init.connect
def open_cassandra_session(*args, **kwargs):
    cluster = Cluster([settings.DATABASES["cassandra"]["HOST"],], protocol_version=3)
    session = cluster.connect(settings.DATABASES["cassandra"]["NAME"])
    thread_local.cassandra_session = session

@worker_process_shutdown.connect
def close_cassandra_session(*args,**kwargs):
    session = thread_local.cassandra_session
    session.shutdown()
    thread_local.cassandra_session = None

We should probably add this to the documentation of the library and hook it up own cassandra connection setup() functions.

JelteF commented 8 years ago

A blog post with a similar issue and solution can be found here: http://robertolopes.me/2015/07/operationtimedout-exception-raised-when-using-the-cassandra-driver-for-python-in-conjunction-with-celery-delayed-tasks/ And sometimes tests are failing for us because of this: https://travis-ci.org/JelteF/Stream-Framework/jobs/168539883

tbarbugli commented 8 years ago

I think we should add this to the docs and perhaps wrap it up somehow with an helper function. If you use stream-framework with Cassandra you will have this problem for sure.