Django Celery
What is Celery?
Celery is an open-source distributed task queue system that allows you to run asynchronous tasks and manage them outside of your Django request/response cycle. Celery is commonly used to offload long-running tasks, such as sending emails, generating reports, or making external API calls, to a background worker, improving the performance and responsiveness of your web application.
How do you install and configure Celery in a Django project?
To install Celery in a Django project, follow these steps:
- Install Celery via pip:
pip install celery. - Create a
celery.pyfile in your project's main directory to configure Celery. - Update the
settings.pyfile to configure your Celery broker (commonly Redis or RabbitMQ).
Example of a basic Celery configuration in celery.py:
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
# Set the default Django settings module for the 'celery' program
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproject.settings')
app = Celery('myproject')
# Load task modules from all registered Django app configs
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()
In this example, Celery is configured to use the Django settings module and automatically discover tasks in your Django apps.
What are Celery brokers, and which ones are commonly used?
Celery brokers are message brokers responsible for passing tasks from your Django application to the Celery workers. The broker holds tasks in a queue until they are picked up and executed by the workers. Common brokers used with Celery include:
- Redis: A fast, in-memory data store commonly used with Celery for local development and production environments.
- RabbitMQ: A robust message broker often used in large-scale production systems.
Example of configuring Redis as a broker in settings.py:
CELERY_BROKER_URL = 'redis://localhost:6379/0'
In this example, Redis is used as the message broker, running on localhost with the default Redis port 6379.
How do you define tasks in Celery?
Tasks in Celery are defined as Python functions decorated with the @app.task decorator. These tasks can be executed asynchronously by Celery workers.
Example of defining a simple task:
from celery import shared_task
@shared_task
def add(x, y):
return x + y
In this example, the add function is a Celery task that takes two numbers and returns their sum. The task can be executed asynchronously by Celery workers.
How do you execute a Celery task asynchronously?
To execute a Celery task asynchronously, you call the task's delay() or apply_async() method. These methods add the task to the queue, and a Celery worker picks it up and executes it asynchronously.
Example of executing a task asynchronously:
# Call the task asynchronously
add.delay(3, 5)
In this example, the add task is executed asynchronously with the arguments 3 and 5. The result is processed by a Celery worker without blocking the main application.
What is the difference between delay() and apply_async() in Celery?
Both delay() and apply_async() are used to execute Celery tasks asynchronously, but they differ in the level of control they offer:
delay(): A shortcut method that sends tasks to the queue with the given arguments and uses the default options.apply_async(): Provides more control over task execution, allowing you to specify additional options like countdown, retries, and routing.
Example of using apply_async() with options:
# Execute task asynchronously with a 10-second countdown
add.apply_async((3, 5), countdown=10)
In this example, the task will be executed asynchronously after a 10-second delay.
How do you schedule periodic tasks in Celery?
To schedule periodic tasks in Celery, you can use the celery-beat extension, which provides a scheduler that sends tasks to the queue at specified intervals. You can define periodic tasks using the CELERY_BEAT_SCHEDULE setting in settings.py.
Example of scheduling a periodic task:
CELERY_BEAT_SCHEDULE = {
'add-every-30-seconds': {
'task': 'myapp.tasks.add',
'schedule': 30.0,
'args': (10, 20),
},
}
In this example, the add task is scheduled to run every 30 seconds with the arguments 10 and 20.
How do you retry failed tasks in Celery?
Celery allows you to automatically retry tasks that fail due to transient issues, such as network failures. You can use the retry() method inside your task function to retry it after a failure. You can also configure retry options like the number of retries and the delay between retries.
Example of retrying a task:
from celery import shared_task
@shared_task(bind=True, max_retries=5)
def fetch_data(self, url):
try:
# Simulate an external API request
result = requests.get(url)
return result.json()
except requests.exceptions.RequestException as exc:
# Retry the task after a failure
raise self.retry(exc=exc, countdown=10)
In this example, the fetch_data task retries up to 5 times with a 10-second delay between retries if an exception occurs during the execution.
How do you monitor Celery tasks?
You can monitor Celery tasks using tools like Flower, a web-based monitoring tool for Celery. Flower provides a real-time dashboard for monitoring task states, inspecting workers, and viewing task execution details.
Steps to install and run Flower:
- Install Flower via pip:
pip install flower. - Run Flower:
celery -A myproject flower.
Once Flower is running, you can access the monitoring dashboard by navigating to http://localhost:5555 in your browser.
What are Celery workers, and how do you start them?
Celery workers are processes that execute tasks from the task queue. Each worker listens to the message broker for tasks, picks them up, and processes them. You can start a Celery worker by running the following command:
celery -A myproject worker --loglevel=info
In this command:
-A myprojectspecifies the Django project where Celery is configured.workerstarts the worker process.--loglevel=infosets the log level toinfoto display task execution details.
What is the purpose of Celery queues?
Celery queues allow you to organize tasks into different queues based on priority, type, or resource usage. By default, Celery uses a single queue, but you can define multiple queues and route tasks to specific queues based on your application's needs.
Example of defining multiple queues in settings.py:
CELERY_QUEUES = {
'high_priority': {
'exchange': 'high_priority',
'routing_key': 'high_priority',
},
'low_priority': {
'exchange': 'low_priority',
'routing_key': 'low_priority',
},
}
In this example, two task queues are defined: high_priority and low_priority. You can route tasks to these queues based on your application's requirements.
How do you route tasks to specific queues in Celery?
You can route tasks to specific queues in Celery by defining routing rules in the CELERY_TASK_ROUTES setting or by specifying the queue argument when calling apply_async().
Example of routing tasks to specific queues:
CELERY_TASK_ROUTES = {
'myapp.tasks.high_priority_task': {'queue': 'high_priority'},
'myapp.tasks.low_priority_task': {'queue': 'low_priority'},
}
In this example, the high_priority_task is routed to the high_priority queue, and the low_priority_task is routed to the low_priority queue.
What is task chaining in Celery?
Task chaining in Celery allows you to link multiple tasks together so that they execute sequentially. The output of one task is passed as the input to the next task in the chain.
Example of task chaining:
from celery import chain
result = chain(add.s(2, 2) | add.s(4) | add.s(8))()
In this example, the tasks are executed in sequence: first, add(2, 2), then add(4) using the output of the first task, and finally add(8) using the result of the second task.
How do you run tasks in parallel with Celery?
To run tasks in parallel in Celery, you can use task groups. Task groups allow you to run multiple tasks concurrently and aggregate their results once all tasks are completed.
Example of running tasks in parallel using a task group:
from celery import group
result = group(add.s(2, 2), add.s(4, 4), add.s(8, 8))()
In this example, the add tasks are executed in parallel, and the result object contains the results of all tasks once they complete.