Flask Celery


What is Celery?

Celery is an asynchronous task queue/job queue system that allows you to run time-consuming tasks in the background, outside of the main Flask request/response cycle. Celery is commonly used for sending emails, processing data, and any other task that doesn't need to be executed immediately.


Why is Celery used in Flask applications?

Celery is used in Flask applications to offload long-running tasks such as sending emails, file processing, report generation, and other operations that would otherwise block the main application and slow down the response time for users. By moving these tasks to the background, Flask can respond to client requests immediately while the background task continues running separately.


How do you install Celery?

Celery can be installed using the pip package manager. Additionally, you'll need a message broker such as Redis or RabbitMQ for Celery to manage and distribute tasks.

Example of installing Celery and Redis:

pip install celery redis

In this example, both celery and redis (used as the message broker) are installed using pip.


How do you configure Celery in a Flask application?

To configure Celery in a Flask application, you need to initialize a Celery object and set the broker URL (such as Redis or RabbitMQ) in your configuration. Celery can be integrated into Flask by passing the Flask app configuration into Celery.

Example of configuring Celery with Redis in a Flask app:

from flask import Flask
from celery import Celery

app = Flask(__name__)
app.config['CELERY_BROKER_URL'] = 'redis://localhost:6379/0'

def make_celery(app):
    celery = Celery(app.import_name, broker=app.config['CELERY_BROKER_URL'])
    celery.conf.update(app.config)
    return celery

celery = make_celery(app)

In this example, Celery is configured to use Redis as the message broker, and the make_celery() function is used to initialize the Celery object with the Flask app configuration.


How do you define and run tasks in Celery?

Tasks in Celery are defined using the @celery.task decorator, and they can be run asynchronously using .delay() or .apply_async(). These tasks are executed by Celery workers in the background.

Example of defining and running a Celery task:

@celery.task
def send_email(recipient):
    print(f"Sending email to {recipient}")
    return f"Email sent to {recipient}"

@app.route('/trigger-email/<email>')
def trigger_email(email):
    send_email.delay(email)
    return 'Email task triggered!'

In this example, the send_email task is defined to send an email in the background. The delay() method is used to run the task asynchronously when the /trigger-email route is accessed.


How do you run Celery workers?

Celery workers are processes that run in the background and execute the tasks that are added to the task queue. You can start a Celery worker using the command line interface.

Example of running a Celery worker:

celery -A app.celery worker --loglevel=info

In this example, app.celery refers to the Celery instance in your Flask app, and the worker will start processing tasks with info-level logging enabled.


How do you schedule periodic tasks in Celery?

Celery provides built-in support for periodic tasks (cron-like scheduling) using the celery.beat module. You can define tasks that run at specific intervals or on a schedule.

Example of scheduling a periodic task:

from celery.schedules import crontab

@celery.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
    sender.add_periodic_task(crontab(minute='*/5'), scheduled_task.s(), name='Run every 5 minutes')

@celery.task
def scheduled_task():
    print("Periodic task running!")

In this example, the scheduled_task is scheduled to run every 5 minutes using crontab(). The setup_periodic_tasks function configures the periodic task schedule.


How do you monitor Celery tasks?

Celery provides several tools for monitoring tasks, such as Flower, a web-based tool for monitoring and administrating Celery clusters. You can also monitor task results using Celery's result backend.

Example of starting Flower to monitor Celery tasks:

celery -A app.celery flower --port=5555

In this example, Flower starts on port 5555, and you can view the status of tasks and workers in the Flower web UI.

To store and monitor task results, configure a result backend:

app.config['CELERY_RESULT_BACKEND'] = 'redis://localhost:6379/0'

With a result backend configured, you can track the status and result of each task.


How do you handle retries and error handling in Celery tasks?

Celery allows you to automatically retry tasks that fail due to temporary issues. You can use the retry() method inside a task to retry it if an exception occurs.

Example of retrying a task in case of failure:

@celery.task(bind=True, max_retries=3)
def retryable_task(self):
    try:
        # Simulate a failure
        raise ValueError('Something went wrong!')
    except Exception as exc:
        print('Retrying...')
        raise self.retry(exc=exc, countdown=60)

In this example, the retryable_task will automatically retry up to 3 times if it raises an exception, with a delay of 60 seconds between retries.


How do you chain multiple tasks in Celery?

Celery allows you to chain multiple tasks together so that one task runs after the other. This can be done using the chain() function or the | operator.

Example of chaining tasks in Celery:

from celery import chain

@celery.task
def first_task():
    print('First task executed!')
    return 'First result'

@celery.task
def second_task(arg):
    print(f'Second task executed with arg: {arg}')

@app.route('/run-chain')
def run_chain():
    chain(first_task.s() | second_task.s()).delay()
    return 'Task chain started!'

In this example, second_task runs only after first_task completes, passing the result of first_task as an argument to second_task.


How do you group tasks in Celery?

Celery allows you to group tasks together so they can be executed in parallel. The group() function is used to run a collection of tasks concurrently.

Example of running a group of tasks in parallel:

from celery import group

@celery.task
def task_a():
    return 'Task A result'

@celery.task
def task_b():
    return 'Task B result'

@app.route('/run-group')
def run_group():
    group(task_a.s(), task_b.s()).delay()
    return 'Task group started!'

In this example, task_a and task_b are executed in parallel when the /run-group route is accessed.


How do you revoke or cancel tasks in Celery?

Tasks in Celery can be revoked (canceled) using the revoke() method. This is useful when you need to stop a task from running, especially long-running tasks.

Example of revoking a Celery task:

# Start a task and get the task ID
task = long_running_task.delay()

# Revoke the task
task.revoke(terminate=True)

In this example, the long_running_task is started, and later, it is revoked using the revoke() method with terminate=True to forcefully stop the task.


What is Celery's result backend, and how is it used?

The Celery result backend stores the results of tasks, allowing you to retrieve the result once the task is complete. You can configure the result backend to use Redis, a database, or any other supported backend.

Example of configuring a result backend:

app.config['CELERY_RESULT_BACKEND'] = 'redis://localhost:6379/0'

Example of retrieving the result of a task:

result = some_task.delay()
# Wait for task result
print(result.get())

In this example, the result backend stores the result of some_task, and you can retrieve the result using result.get().

Ads