robmarano / 2018_Summer_Adv_Python_Project

2018 Summer Advanced Python Group Project
GNU General Public License v3.0
1 stars 0 forks source link

Set up Celery Scheduling #17

Open yvr2 opened 6 years ago

yvr2 commented 6 years ago

https://medium.com/@channeng/setting-up-a-task-scheduler-application-with-celery-flask-part-1-8652265050dc

https://console.aws.amazon.com/datapipeline/home?region=us-east-1#FirstRunPlace:

yvr2 commented 6 years ago

Tutorial for Django instead of Flask: http://docs.celeryproject.org/en/latest/django/first-steps-with-django.html

yvr2 commented 6 years ago

RQ (Redis Queue) is an alternative to Celery, and according to some discussion between other beginner developers Celery has the potential for being more trouble than it's worth ie getting too bogged down in configuration compared return on functionality. So RQ is an alternative worth considering, will investigate.

yvr2 commented 6 years ago

Data Pipeline functions with three overarching components:

  1. A "pipeline definition" file
  2. The pipeline itself
  3. A Task Runner, which presumably is how the daily script execution will be called

There are some data use restrictions with the AWS free tier for Data Pipeline use that we need to be aware of: If your AWS account is less than 12 months old, you are eligible to use the free tier. The free tier includes three low-frequency preconditions and five low-frequency activities per month at no charge. For more information, see AWS Free Tier. (source)

yvr2 commented 6 years ago

Trying to activate a test pipeline in AWS at https://console.aws.amazon.com/datapipeline/home?region=us-east-1#CreatePipelinesPlace:

It seems like a prerequisite for activating the pipeline is getting S3 storage setup in order to define parameters for the pipeline. I think S3 is involved in how output will be stored each time the scheduler runs.

Once we understand the DynamoDB portion better and have a table to work with, there are two pipeline templates for dataflow between DynamoDB and S3.

yvr2 commented 6 years ago

Need to create results backend for my Celery process, for the tutorial I'm doing so with Redis and using RabbitMQ as the broker, but we will ultimately be using Django as the backend.

yvr2 commented 6 years ago

import tweepy

consumer_key = 'vdfRaONJbNQcWK5nqzUt34jIu'
consumer_secret = ''
access_token = '1012402072052957185-Txm1yLugHsMiTxTDMuaLUYY7kF7nLL'
access_token_secret = 'TxxT2ZNiT8JY58bUcMlHAzmNjSZjrEunqLFRxkajEE7i7'

app = Celery('tasks', backend='rpc://', broker='pyamqp://guest@localhost//')

@app.task
def tweets(keyword):
    searched_tweets = api.search(q=keyword, rpp=100, count=1000)
    print(searched_tweets)```
yvr2 commented 6 years ago
from celery import Celery
import tweepy

consumer_key = 'vdfRaONJbNQcWK5nqzUt34jIu'
consumer_secret = ''
access_token = '1012402072052957185-Txm1yLugHsMiTxTDMuaLUYY7kF7nLL'
access_token_secret = 'TxxT2ZNiT8JY58bUcMlHAzmNjSZjrEunqLFRxkajEE7i7'

app = Celery('tasks', backend='rpc://', broker='pyamqp://guest@localhost//')

auth = tweepy.OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_token, access_token_secret)

api = tweepy.API(auth)

@app.task
def tweets(keyword):
    searched_tweets = api.search(q=keyword, rpp=100, count=1000)
    print(searched_tweets)
yvr2 commented 6 years ago

The above works to get a simple API call to run in the Celery worker, by defining a keyword.

>>> from tasks2 import tweets
>>> tweets.delay('lit')

Gets the Celery worker to run the API call, now I'm going to add more complex API call to get it to the point where we get the keyword data in the format we want, then work on integration with Django. This consists of:

from .celery import app as celery_app

__all__ = ['celery_app']
kellycarmody commented 6 years ago

from celery import Celery
from tweepy.streaming import StreamListener
from tweepy import OAuthHandler
from tweepy import Stream
import time
import os
import io

consumer_key = 'vdfRaONJbNQcWK5nqzUt34jIu'
consumer_secret = ''
access_token = '1012402072052957185-Txm1yLugHsMiTxTDMuaLUYY7kF7nLL'
access_token_secret = 'TxxT2ZNiT8JY58bUcMlHAzmNjSZjrEunqLFRxkajEE7i7'

app = Celery('tasks', backend='rpc://', broker='pyamqp://guest@localhost//')

start_time = time.time()
keyword_list = ['lit']

#This is a basic listener that just prints received tweets to stdout.
class listener(StreamListener): `

    def __init__(self, start_time, time_limit=60):

        self.time = start_time
        self.limit = time_limit
        self.tweet_data = []

    def on_data(self, data):

        saveFile = io.open('raw_tweets.json', 'a', encoding='utf-8')

        while (time.time() - self.time) < self.limit:

            try:
                self.tweet_data.append(data)

                return True

            except BaseException as e:
                print('failed ondata')
                time.sleep(5)
                pass

        saveFile = io.open('raw_tweets.json', 'w', encoding='utf-8')
        saveFile.write(u'[\n')
        saveFile.write(','.join(self.tweet_data))
        saveFile.write(u'\n]')
        saveFile.close()
        exit()

@app.task
def pull():
    print('class must be called')

    auth =OAuthHandler(consumer_key, consumer_secret) #OAth object
    auth.set_access_token(access_token, access_token_secret)

    twitterStream = Stream(auth, listener(start_time, time_limit=20))
    twitterStream.filter(track=keyword_list, languages=['en'])
kellycarmody commented 6 years ago

python

from celery_raw import pull

pull()

You can enter this into your shell to run the celery task

kellycarmody commented 6 years ago

Still working on our final version we want

yvr2 commented 6 years ago

@kellycarmody @celestinosalim I've edited the tweedata directory, on my local machine for now, to integrate Celery, according to the documentation here: http://docs.celeryproject.org/en/latest/django/first-steps-with-django.html

In tweedata/tweedata/tweedata:

from __future__ import absolute_import, unicode_literals
import os
from celery import Celery

# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'tweedata.settings')

app = Celery('tweedata')

# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
#   should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')

# Load task modules from all registered Django app configs.
app.autodiscover_tasks()

@app.task(bind=True)
def debug_task(self):
    print('Request: {0!r}'.format(self.request))

This will make sure the app is always imported when

Django starts so that shared_task will use this app.

from .celery import app as celery_app

all = ('celery_app',)


The periodic scheduling, final decorator to run the API, and results backend are still in progress and will require a few more edits of Django files to integrate.

I'm now working on figuring out the Celery backend setup on the Django side. We can use Django itself as a backend by installing django_celery_results, adding it in INSTALLED_APPS in settings.py, adding CELERY_RESULT_BACKEND = 'django-db' in settings.py, and using `` $ python manage.py migrate django_celery_results `` to create Celery database tables by performing a database migration, as described under 'Extentions' in the documentation referenced above.  

BUT this `https://www.caktusgroup.com/blog/2014/06/23/scheduling-tasks-celery/` tutorial claims that "For now, you just need to know that Celery needs a broker and we can get by using Django itself during development (but you must use something more robust and better performing in production)." So I'm looking into Redis.

Also need to set up and integrate celery-beat with Django for periodic scheduling, working on setting up a chrontab decorator to run every hour, to migrate our API function into it. We can change the frequency easily using chrontab: `http://docs.celeryproject.org/en/latest/reference/celery.schedules.html#celery.schedules.crontab`
kellycarmody commented 6 years ago

Ah alright great, I'm working on doing celery-beat now for periodic scheduling. Well I was last night and will today after I get off work. It should be simple in principle, but my task keeps running into an exception. I think it may have to do with a memory leak or the timing or something. Do you want to work on the backend setup and django integration stuff and I focus work on getting the celery-beat setup and the dynamodb integration?

On Fri, Aug 17, 2018 at 1:10 PM, yvr2 notifications@github.com wrote:

I've edited the tweedata directory locally to integrate Celery, according to the documentation here: http://docs.celeryproject.org/ en/latest/django/first-steps-with-django.html

In tweedata/tweedata/tweedata:

  • I added a celery.py file which defines the Celery instance, with the following code:

from future import absolute_import, unicode_literals import os from celery import Celery

set the default Django settings module for the 'celery' program.

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'tweedata.settings')

app = Celery('tweedata')

Using a string here means the worker doesn't have to serialize

the configuration object to child processes.

- namespace='CELERY' means all celery-related configuration keys

should have a CELERY_ prefix.

app.config_from_object('django.conf:settings', namespace='CELERY')

Load task modules from all registered Django app configs.

app.autodiscover_tasks()

@app.task(bind=True) def debug_task(self): print('Request: {0!r}'.format(self.request))

  • I added an init.py file which initializes Celery when Django starts, with the following code:

from future import absolute_import, unicode_literals

This will make sure the app is always imported when

Django starts so that shared_task will use this app.

from .celery import app as celery_app

all = ('celery_app',)

I'm now working on figuring out the Celery backend setup on the Django side. We can use Django itself as a backend by installing django_celery_results, adding it in INSTALLED_APPS in settings.py, adding CELERY_RESULT_BACKEND = 'django-db' in settings.py, and using $ python manage.py migrate django_celery_results to create Celery database tables by performing a database migration, as described under 'Extentions' in the documentation referenced above.

BUT this https://www.caktusgroup.com/blog/2014/06/23/scheduling- tasks-celery/ tutorial claims that "For now, you just need to know that Celery needs a broker and we can get by using Django itself during development (but you must use something more robust and better performing in production)." So I'm looking into Redis.

Also need to set up and integrate celery-beat for periodic scheduling.

— You are receiving this because you commented. Reply to this email directly, view it on GitHub https://github.com/robmarano/2018_Summer_Adv_Python_Project/issues/17#issuecomment-413930253, or mute the thread https://github.com/notifications/unsubscribe-auth/AmgF6vy-cchuffmW6NYBXou8CA7j4Q2cks5uRvkNgaJpZM4VWNTx .

yvr2 commented 6 years ago

Okay cool, that works. Let’s be in touch as we make progress.

kellycarmody commented 6 years ago

Sounds good, I'll be uploading my code as soon as I come up with anything useful

kellycarmody commented 6 years ago

Here's a celery beat scheduler that I did get to run on my machine, it's not mine though, it's just a sample one, still need to get ours running, I wanted to make sure something worked

import datetime
from celery import Celery
from celery import group

app = Celery('test', broker='amqp://guest@localhost//', backend='amqp')
#Redis
#app = Celery('tasks', broker='redis://localhost:xxxx/0, backend='redis://localhost:xxxx/0')

@app.task
def add_days(days):
    return datetime.datetime.now() + datetime.timedelta(days=days)

app.conf.update(
    CELERYBEAT_SCHEDULE={
        'multiply-each-10-seconds': {
            'task': 'test.add_days',
            'schedule': datetime.timedelta(seconds=10),
            'args': (2, )
        },
    },
)