FastAPI Task Queues And Celery
What is Celery, and why is it used in FastAPI?
Celery is a distributed task queue that allows you to run tasks asynchronously in the background. It is useful in FastAPI for offloading time-consuming tasks (e.g., sending emails, processing data) so that the API can respond quickly to client requests while the tasks are processed in the background. Celery integrates with FastAPI to manage tasks, scheduling, and concurrency effectively.
How do you set up Celery with FastAPI?
To set up Celery with FastAPI, you need to install Celery and configure it with a message broker like Redis or RabbitMQ. Celery will handle task distribution and communication between the FastAPI app and the workers that execute the tasks.
Steps to set up Celery:
- Install Celery and a message broker (e.g., Redis):
pip install celery redis - Configure a
celery.pyfile in your FastAPI project to initialize Celery. - Create and run background tasks using Celery.
Example of setting up Celery in FastAPI:
from celery import Celery
celery_app = Celery(
"worker",
broker="redis://localhost:6379/0",
backend="redis://localhost:6379/0"
)
@celery_app.task
def add(x, y):
return x + y
In this example, Celery is configured with Redis as the message broker and backend. A simple task add is created, which runs in the background to add two numbers.
How do you run Celery workers for FastAPI?
Celery workers are processes that execute the tasks in the task queue. After defining tasks in Celery, you need to start the Celery workers to process the tasks in the background.
To start Celery workers, run the following command:
celery -A celery_app worker --loglevel=infoIn this example, celery_app is the name of the Celery application, and worker starts the worker process to execute tasks. The --loglevel=info flag provides detailed logs for the task execution.
How do you execute background tasks in FastAPI using Celery?
To execute background tasks in FastAPI using Celery, you define a route in FastAPI that triggers a Celery task. The task is executed asynchronously, allowing the FastAPI app to respond immediately without waiting for the task to complete.
Example of executing a Celery task in FastAPI:
from fastapi import FastAPI
from celery import Celery
app = FastAPI()
celery_app = Celery(
"worker",
broker="redis://localhost:6379/0",
backend="redis://localhost:6379/0"
)
@celery_app.task
def add(x, y):
return x + y
@app.post("/add/")
def add_numbers(x: int, y: int):
task = add.delay(x, y)
return {"task_id": task.id}
In this example, when the /add/ route is called, a Celery task is triggered to add two numbers asynchronously. The task ID is returned immediately, and the result of the addition is processed in the background.
How do you retrieve task results in Celery?
Celery allows you to track the progress and result of background tasks. Once a task is executed, you can retrieve its result using the task ID returned when the task was started.
Example of retrieving a task result:
from fastapi import FastAPI
from celery.result import AsyncResult
from celery import Celery
app = FastAPI()
celery_app = Celery(
"worker",
broker="redis://localhost:6379/0",
backend="redis://localhost:6379/0"
)
@celery_app.task
def add(x, y):
return x + y
@app.get("/result/{task_id}")
def get_result(task_id: str):
task_result = AsyncResult(task_id, app=celery_app)
if task_result.ready():
return {"status": "completed", "result": task_result.result}
else:
return {"status": task_result.status}
In this example, the /result/{task_id} route allows you to check the status of the background task using the task ID. If the task is completed, the result is returned; otherwise, the current status is displayed.
How do you schedule periodic tasks with Celery in FastAPI?
Celery provides support for scheduling periodic tasks using the celery-beat extension. This allows you to schedule tasks to run at regular intervals, similar to cron jobs.
To schedule tasks, define a periodic task in the Celery configuration:
from celery.schedules import crontab
celery_app.conf.beat_schedule = {
"add-every-minute": {
"task": "add",
"schedule": crontab(minute="*/1"), # Runs every minute
"args": (5, 6),
},
}
In this example, the task add is scheduled to run every minute. You need to start the Celery beat process to manage periodic tasks:
celery -A celery_app beat --loglevel=infoHow do you handle task retries in Celery?
Celery allows you to retry tasks automatically in case of failure. You can configure the number of retries and the delay between retries using the retry method inside a task.
Example of handling retries in a task:
@celery_app.task(bind=True, max_retries=3, default_retry_delay=60)
def process_data(self, data_id):
try:
# Simulate processing data
if data_id % 2 == 0:
raise ValueError("Error processing data")
except Exception as exc:
raise self.retry(exc=exc)
In this example, the task process_data retries up to 3 times with a 60-second delay between retries if an error occurs during processing.
How do you handle task queues in Celery?
Celery allows you to define multiple task queues, which can be useful for prioritizing different types of tasks. You can assign tasks to specific queues and configure workers to listen to one or more queues.
Example of defining task queues in Celery:
celery_app.conf.task_queues = (
Queue("high_priority"),
Queue("low_priority"),
)
@celery_app.task(queue="high_priority")
def important_task():
return "This is a high priority task"
@celery_app.task(queue="low_priority")
def regular_task():
return "This is a low priority task"
In this example, two task queues are defined: high_priority and low_priority. The important_task is assigned to the high-priority queue, while the regular_task is assigned to the low-priority queue.
How do you use Celery with FastAPI for long-running tasks?
Celery is ideal for handling long-running tasks, such as processing large datasets or generating reports, without blocking the FastAPI application. By delegating these tasks to Celery, FastAPI can respond immediately to requests while the tasks run in the background.
Example of a long-running task in Celery:
@celery_app.task
def process_large_dataset(dataset_id):
# Simulate long-running task
for i in range(1000000):
pass
return {"status": "completed", "dataset_id": dataset_id}
@app.post("/process-dataset/")
def process_dataset(dataset_id: int):
task = process_large_dataset.delay(dataset_id)
return {"task_id": task.id}
In this example, the process_large_dataset task runs in the background, allowing the FastAPI app to return immediately with the task ID while the task completes asynchronously.
How do you monitor Celery tasks in FastAPI?
You can monitor Celery tasks using the Flower dashboard, which provides real-time information on task execution, failures, and retries. Flower is a web-based tool for monitoring and managing Celery workers and tasks.
To install and run Flower, use the following command:
pip install flower
celery -A celery_app flowerFlower runs on a web interface, where you can monitor task statuses, worker health, and more.
What are some best practices for using Celery with FastAPI?
Some best practices for using Celery with FastAPI include:
- Use separate queues for different priority tasks.
- Configure retries for tasks that may fail due to temporary issues.
- Use a robust message broker like Redis or RabbitMQ.
- Monitor task performance and failures using tools like Flower.
- Test Celery tasks independently to ensure correctness and reliability.
- Use Celery beat for scheduling periodic tasks, such as sending daily reports or performing database cleanup.