Asynchronous tasks and cron jobs in AWS Django app (using Celery)

What are we going to build?

We've set up our Django app on AWS (with Elastic Beanstalk) in previous tutorial. It takes a request and return a response. In this tutorial we'll do more: add the ability to run crons and async tasks.

This is achieved using celery- an asynchronous task queue/job queue and scheduler for python apps.

Concepts involved

Async tasks allow you to run code on a separate thread from normal request-response cycle.
For example- say a user signs up and you want to send him an email telling him the sign up was successful and welcoming him to your site. If the email sending process is run during request-response cycle, other tasks that are potentially more important are delayed. There is no reason to wait for the email to get sent to have that API (user creation) return a response. Sending the email can be done in a separate process - async from signup request-response code.

Celery is a separate process from your main django webserver that allows you to run code asynchronously & periodically.

You may also want to run cron jobs. A cron is a program that enables you to execute commands or scripts automatically at a specified time/date or to schedule them to run periodically.

Idea architecture

Normally all this is done using Thread Spawning + Task Queues + Scheduler. For Django, there is a silver bullet- Celery.

Pre-requisites
  • Python, Pip, and Django project in a virtual environment.
  • Django app installed and running on AWS Elastic Beanstalk. You can complete this prerequisite here
  • EB CLI with all settings (roles, profiles) configured.
  • RabbitMQ server installed and running locally. This is a good resource if you need to set it up.
Tutorial
0. Project Structure

My project structure looks like this:

ebdjango  
  |-- ebdjango
  |   |-- __init__.py
  |   |-- settings.py
  |   |-- urls.py
  |   `-- wsgi.py
  |-- manage.py
  |-- .ebextensions
  |-- .elasticbeanstalk
  |-- venv 
  |-- .gitignore 
  |-- requirements.txt

Here, ebdjango is the project name as well as project root. Also, venv is virtual environment directory. Just replace ebdjango and venv with your Django project name and virtual env name for rest of the tutorial.

Make sure you follow this project structure to avoid having tons of problems configuring path.

1. Installing celery locally

1.1: Install dependency

Install celery in your venv.

$ pip install celery 
$ pip install djcelery

1.2 Celery settings:

In your ebdjango/settings.py, add

# Celery settings for django
CELERY_ACCEPT_CONTENT = ['json']  
CELERY_TASK_SERIALIZER = 'json'  
CELERY_RESULT_SERIALIZER = 'json'

CELERY_RESULT_BACKEND = 'djcelery.backends.database:DatabaseBackend'  
CELERYBEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler'  
CELERY_SEND_EVENTS = False  

Add add djcelery to your INSTALLED_APPS:

INSTALLED_APPS(  
    ...
    'djcelery',
    ...
)

1.2 Celery Config file:

Create a new file _celery.py in ebdjango\ebdjango:

# coding=utf-8

import os  
from celery import Celery  
from django.conf import settings

# Indicate Celery to use the default Django settings module
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'ebdjango.settings')

app = Celery('ebdjango')  
app.config_from_object('django.conf:settings')

# This line will tell Celery to autodiscover all your tasks.py that are in your app folders
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)

Add this line in ebdjango/__init__.py:

from ._celery import app as celery_app  

1.3 Run DB Migrations:

Run migrations that will be used by django-celery

$ python manage.py makemigrations djcelery
$ python manage.py migrate

2. Get Rabbit MQ Broker

Celery process needs a message broker to create distributed async tasks and to schedule tasks. One of the most popular message brokers is RabbitMQ. We'll be using that.

If you don't have a RabbitMQ service, head to https://www.cloudamqp.com/ and create one now. Use these to create an instance of service. Copy the entire url in details tab of the instance.

The url will look something like this:

amqp://user:password@host/vhost"  

Once you have the url, run this eb cli command:

$ eb setenv BROKER_URL=<URL>

Add this line in your settings.py:

BROKER_URL = os.environ['BROKER_URL']  

Run this command to check if the env variable was set correctly:

$ eb print env

A list environment variables will be printed. BROKER_URL should be in them.

3. Add settings for Elastic Beanstalk

In in the folder .ebextensions, create a folder named files. Add a file named celery_configuration.txt there.

Add following code to .ebextensions/files/celery_configuration.txt:

#!/usr/bin/env bash

# Get django environment variables
celeryenv=`cat /opt/python/current/env | tr '\n' ',' | sed 's/export //g' | sed 's/$PATH/%(ENV_PATH)s/g' | sed 's/$PYTHONPATH//g' | sed 's/$LD_LIBRARY_PATH//g'`  
celeryenv=${celeryenv%?}

# Create celery configuraiton script
celeryconf="[program:celeryd-worker]  
; Set full path to celery program if using virtualenv
command=/opt/python/run/venv/bin/celery worker -A ebdjango._celery --loglevel=INFO --without-gossip --without-mingle --without-heartbeat

directory=/opt/python/current/app  
user=nobody  
numprocs=1  
stdout_logfile=/var/log/celery-worker.log  
stderr_logfile=/var/log/celery-worker.log  
autostart=true  
autorestart=true  
startsecs=10

; Need to wait for currently executing tasks to finish at shutdown.
; Increase this if you have very long running tasks.
stopwaitsecs = 600

; When resorting to send SIGKILL to the program to terminate it
; send SIGKILL to its whole process group instead,
; taking care of its children as well.
killasgroup=true

; if rabbitmq is supervised, set its priority higher
; so it starts first
priority=998

environment=$celeryenv

[program:celeryd-beat]
; Set full path to celery program if using virtualenv
command=/opt/python/run/venv/bin/celery beat -A  ebdjango._celery --loglevel=INFO --workdir=/tmp

directory=/opt/python/current/app  
user=nobody  
numprocs=1  
stdout_logfile=/var/log/celery-beat.log  
stderr_logfile=/var/log/celery-beat.log  
autostart=true  
autorestart=true  
startsecs=10

; Need to wait for currently executing tasks to finish at shutdown.
; Increase this if you have very long running tasks.
stopwaitsecs = 600

; When resorting to send SIGKILL to the program to terminate it
; send SIGKILL to its whole process group instead,
; taking care of its children as well.
killasgroup=true

; if rabbitmq is supervised, set its priority higher
; so it starts first
priority=998

environment=$celeryenv"

# Create the celery supervisord conf script
echo "$celeryconf" | tee /opt/python/etc/celery.conf

# Add configuration script to supervisord conf (if not there already)
if ! grep -Fxq "[include]" /opt/python/etc/supervisord.conf  
  then
  echo "[include]" | tee -a /opt/python/etc/supervisord.conf
  echo "files: celery.conf" | tee -a /opt/python/etc/supervisord.conf
fi

# Reread the supervisord config
supervisorctl -c /opt/python/etc/supervisord.conf reread

# Update supervisord in cache without restarting all services
supervisorctl -c /opt/python/etc/supervisord.conf update

# Start/Restart celeryd through supervisord
supervisorctl -c /opt/python/etc/supervisord.conf restart celeryd-worker  
supervisorctl -c /opt/python/etc/supervisord.conf restart celeryd-beat  

Make sure you replace ebdjango with your project name.

Create a new file .ebextensions/99_celery.config and add following code:

container_commands:  
  04_celery_tasks:
    command: "cat .ebextensions/files/celery_configuration.txt > /opt/elasticbeanstalk/hooks/appdeploy/post/run_supervised_celeryd.sh && chmod 744 /opt/elasticbeanstalk/hooks/appdeploy/post/run_supervised_celeryd.sh"
    leader_only: true
  05_celery_tasks_run:
    command: "/opt/elasticbeanstalk/hooks/appdeploy/post/run_supervised_celeryd.sh"
    leader_only: true

4. Create a periodic task

All the configuration is done. And you are all set. Now let's just create a sample periodic task to test.

Create an app called my_app. Create file my_app/tasks.py and add following code:

# coding=utf-8
from celery.task import periodic_task  
from datetime import timedelta, datetime  
import logging

# A periodic task that will run every minute (the symbol "*" means every)
# A periodic task that will run every minute (the symbol "*" means every)
@periodic_task(run_every=timedelta(minutes=1))
def sample_periodic_task():  
    logger.info("Start task")
    now = datetime.now()
    result = now.day + now.minute
    logger.info("Task finished: result = %i" % result)

This cron task will run every minutes, add log the current time after the task was run.

5. Deploy the code

You've configured celery and added a sample cron task. Let's deploy the code now:

$ git add .
$ git commit -m "Added celery and periodic tasks"
$ eb deploy

After deployment is done, check the output of /var/log/celery-worker.log in your eb instance. Periodic task from celery should be printing an output similar to:

$ tail -f /var/log/celery-worker.log

[2017-04-22 23:08:38,493: INFO/Worker-2] Start task
[2017-04-22 23:08:38,494: INFO/Worker-2] Task finished: result = 30

Code: https://github.com/CodePalTutorials/django-elasticbeanstalk-starter

Shivam

Read more posts by this author.

Subscribe to TrySudo

Get the latest posts delivered right to your inbox.

or subscribe via RSS with Feedly!