Background Tasks with Celery Explained
Key Concepts
- Celery Overview
- Message Broker
- Task Queues
- Workers
- Tasks
- Scheduling Tasks
- Result Backend
- Error Handling
- Scaling Workers
- Monitoring and Logging
1. Celery Overview
Celery is a distributed task queue system that allows you to run background tasks asynchronously. It is particularly useful for handling long-running tasks, such as sending emails, processing images, or performing database operations, without blocking the main application.
2. Message Broker
A message broker is a middleware that facilitates communication between Celery and its workers. Common message brokers include RabbitMQ and Redis. The broker receives tasks from the application and distributes them to available workers.
# Example configuration for RabbitMQ broker_url = 'pyamqp://guest@localhost//'
3. Task Queues
Task queues are used to organize and prioritize tasks. Celery allows you to define multiple queues, each with its own set of workers. This enables you to handle different types of tasks with varying priorities.
# Example of defining a task queue from celery import Celery app = Celery('tasks', broker='pyamqp://guest@localhost//') @app.task def process_image(image_path): # Task implementation pass
4. Workers
Workers are processes that execute tasks. They listen to the task queues and pick up tasks to run. Multiple workers can be started to handle a high volume of tasks concurrently.
# Starting a Celery worker celery -A your_project worker --loglevel=info
5. Tasks
Tasks are the units of work that Celery executes. They are defined as Python functions and decorated with @app.task
. Tasks can be invoked asynchronously, allowing the main application to continue processing without waiting for the task to complete.
# Example of a Celery task @app.task def send_email(to, subject, body): # Task implementation pass
6. Scheduling Tasks
Celery supports scheduling tasks to run at specific intervals or at a future time. This is achieved using Celery Beat, a scheduler that periodically sends tasks to the task queue.
# Example of scheduling a task from celery.schedules import crontab app.conf.beat_schedule = { 'send-report-every-monday': { 'task': 'your_project.tasks.send_report', 'schedule': crontab(hour=8, minute=0, day_of_week=1), }, }
7. Result Backend
A result backend is used to store the results of tasks. This allows you to retrieve the outcome of a task after it has been executed. Common result backends include Redis and a SQL database.
# Example configuration for Redis result backend result_backend = 'redis://localhost:6379/0'
8. Error Handling
Error handling in Celery involves defining retry policies and handling exceptions. Tasks can be configured to retry automatically if they fail, and custom error handlers can be implemented to manage specific exceptions.
# Example of error handling in a task @app.task(bind=True, max_retries=3) def process_payment(self, payment_info): try: # Task implementation pass except Exception as exc: raise self.retry(exc=exc, countdown=60)
9. Scaling Workers
Scaling workers involves increasing the number of worker processes to handle more tasks concurrently. This can be done manually or automatically using cloud services like AWS EC2 or Kubernetes.
# Example of scaling workers celery -A your_project worker --loglevel=info --concurrency=10
10. Monitoring and Logging
Monitoring and logging are crucial for tracking the performance and health of Celery workers. Tools like Flower provide real-time monitoring of tasks and workers, while logging can be configured to capture detailed information about task execution.
# Example of starting Flower for monitoring celery -A your_project flower