Exploraremos una guía simple para aprovechar al máximo tres poderosas tecnologías: FastAPI, Redis y Celery. Estas herramientas a menudo generan dudas sobre cómo combinarlas y aprovechar al máximo sus capacidades. Aquí esta una guía introductoria que te ayudará a familiarizarte con ellas.

Pero empecemos por entender qué enfoque tiene cada una de estas tecnologías.

Recuerda que, para seguir esta guía, es recomendado tener conocimiento previo en Python y familiaridad con la línea de comando. 🤓

FastAPI

FastAPI es un framework de alto rendimiento, fácil de aprender y rápido de programar. Está diseñado para desarrollar aplicaciones web de manera eficiente y lista para su uso en producción. FastAPI ofrece una forma ágil y segura de construir APIs modernas.

Celery

Celery es una herramienta de código abierto que actúa como una cola de tareas asíncronas. Se basa en la transmisión de mensajes distribuida, lo que nos permite realizar operaciones en tiempo real, así como programar tareas periódicas.

Antes de seguir avanzando, aclaremos ciertas cosas sobre Celery para que entendamos cómo funciona completamente.

Brokers ¿qué son y cómo interactúan con Celery?

En Celery, un “broker” es un intermediario o servicio de mensajería que se utiliza para enviar y recibir mensajes entre la aplicación y los workers de Celery. El broker actúa como un canal de comunicación entre las tareas encoladas y los procesos que las ejecutan.

Los brokers son fundamentales en Celery porque permiten la comunicación asíncrona entre la aplicación y los workers distribuidos. Los workers pueden recibir tareas desde la cola del broker y ejecutarlas de forma independiente, lo que facilita la escalabilidad y la distribución de las tareas en un entorno distribuido.

Existen varios brokers que se pueden utilizar con Celery, algunos de los más comunes son:

  • RabbitMQ: Es uno de los más populares y ampliamente utilizados con Celery. Proporciona una implementación robusta y escalable de AMQP (Advanced Message Queuing Protocol).
  • Redis: Es una base de datos en memoria que también puede actuar como broker en Celery. Proporciona una gran velocidad y rendimiento, lo que lo hace adecuado para entornos con alta concurrencia.
  • Amazon SQS: Es un servicio de mensajería completamente administrado por Amazon Web Services (AWS) que puede actuar como broker en Celery. Proporciona una alta disponibilidad y escalabilidad.

¿Y los Workers?

En Celery, los «workers» son procesos independientes que se encargan de ejecutar tareas en paralelo.
Están a la escucha de la cola de tareas y las van realizando conforme estén disponibles. Puedes tener varios workers ejecutándose simultáneamente en diferentes máquinas o procesos, lo que permite una distribución de carga efectiva y un procesamiento más rápido de las tarea

Redis

Redis es un poderoso almacén de claves/valores NoSQL en memoria que actúa como una caché de aplicaciones y una base de datos de respuesta rápida. Al almacenar los datos en memoria, proporciona un rendimiento excepcionalmente rápido, capacidades de replicación y una variedad de estructuras de datos versátiles para manejar datos complejos.

Instalando los necesario

Es recomendable crear primero un entorno virtual para asegurar que los paquetes se instalen de manera independiente a tu entorno de Python principal y que la carpeta sea una subcarpeta del proyecto Python al que está asociado. Para hacer esto, puedes utilizar el siguiente comando:

Linux y Mac OS:

python3 -m venv ruta/para/entorno_virtual/nombre_del_entorno

Windows (Python deberá estar incluido dentro de las variables del sistema para que funcione):

python -m venv c:\ruta\para\entorno_virtual\nombre_del_entorno

Y para acceder a nuestro entorno usaremos el comando:

Linux y Mac OS:

source ruta/para/entorno_virtual/nombre_del_entorno/bin/activate

Windows

c:\ruta\para\entorno_virtual\nombre_del_entorno\Scripts\activate.bat

FastAPI, Celery y redis

Para instalar estas herramientas, haremos uso de 'pip' con los siguientes comandos:

pip install fastapi
pip install celery
pip install redis

FastAPI depende de una herramienta que es uvicorn, para instalarla lo haremos igualmente desde pip:

pip install uvicorn

Redis

Redis está disponible para múltiples sistemas operativos. Te recomendamos acceder a su página, ir a la sección de instalación y dirigirte al tutorial que se adecúe a tu sistema operativo.

Configurando nuestro archivos y carpetas

Si quieres saltarte este paso puedes descargar el proyecto desde mi Github, recuerda solo instalar las dependencias. 🤓

Iniciamos creando la siguiente estructura de archivos y carpetas para mantener todo ordenado:

SRC/
├── .env/
├── Celery/
│   ├── app.py
│   └── celeryConfig.py
├── Blog.ipynb
├── requirements.txt
└── tasks.py

Hablemos primeramente de lo necesario para poder usar celery, así que creamos una carpeta con el nombre Celery (Esta carpeta puede tener cualquier otro nombre) y dentro crearemos dos archivos app.py y celeryConfig.py

celeryConfig.py es el archivo donde estableceremos todas las configuraciones que necesitemos, es importante mantener sus nombres específicos y seguir la sintaxis adecuada para que Celery pueda utilizarlas correctamente.

# enable_utc: Establece el uso de UTC.
enable_utc = True
# task_serializer: Establece el serializador de tareas para usar el formato JSON.
task_serializer = 'json'
# result_serializer: Establece el serializador de resultados para usar el formato JSON.
result_serializer = 'json'
# broker_url: Establece la URL del broker de mensajes.
# por defecto redis usa el puerto 6379
broker_url = 'redis://localhost:6379/0'
# result_backend: Establece la URL del backend de resultados.
result_backend = 'redis://localhost:6379/0'
# Celery intentará reconectarse al broker si la conexión falla al iniciar la aplicación
broker_connection_retry_on_startup = True
# imports: Establece una lista de módulos para importar cuando se inicia la aplicación.
imports = (
    'tasks',
)

En nuestro archivo app.py solo creamos una instancia de celery, le asignamos un nombre y le pasamos la ruta de nuestro archivo de configuración.

# Importamos celery
from celery import Celery
# Creamos la instancia de celery y le asignamos un nombre
app = Celery('tasks_celery')
# Le damos la ruta de nuestro archivo de configuración que se encuentra
# en Celery/configCelery.py
app.config_from_object('Celery.celeryConfig')

Una vez teniendo nuestra configuración básica de celery estableceremos las tareas que queramos que celery tome en cuenta, así que creamos en la raíz el archivo tasks.py

# Importamos sleep y randint para simular un proceso
from time import sleep
from random import randint
# Importamos la variable "app" de nuestro archivo app.py
from Celery.app import app as celery_app
# @celery_app: Es un decorador que nos permite registrar una tarea.
@celery_app.task()
def add(x, y):
    sleep(randint(1, 5))
    print(x + y)

@celery_app.task(name='mul')
def mul(x, y):
    sleep(randint(1, 5))
    print(x * y)

En este caso, las tareas, en vez de almacenar el resultado, simplemente harán que los workers impriman el resultado en la terminal.

Por último, hablaremos del archivo Blog.py o Blog.ipynb (si usamos el notebook).

# 1. Importar FastAPI
# 2. Importamos HTTPException para poder devolver errores HTTP
from fastapi import FastAPI
from fastapi import HTTPException
# Importamos las tareas que queremos ejecutar de manera asíncrona
from tasks import add, mul
# 3. Crear una instancia de FastAPI
app = FastAPI(title="DiegoDG.com.mx", description="API para probar FastAPI, Redis y Celery", version="1.0")
# 4. Creamos las rutas
# get: Método HTTP
# /suma/{a}/{b}: Ruta con parámetros
@app.get("/suma/{a}/{b}")
def suma(a: int, b: int):
    try:
        # Gracias a celery, add ahora tiene una función delay que permite enviarla al broker
        add.delay(a, b)
        # devolvemos un mensaje para saber que se ha enviado la tarea a Celery.
        # Esto no quiere decir que se haya ejecutado correctamente la tarea.
        # Nuestra tarea puede fallar en celery, es por ello que debemos de tener un manejo de errores
        # y almacenar su resultado si es necesario
        return {"message": "Se ha enviado la tarea de sumar a Celery"}
    except Exception as e:
        # Si hay un error durante el envío de la tarea a Celery, devolvemos un error HTTP
        raise HTTPException(status_code=400, detail=str(e))

@app.get("/multiplicacion/{a}/{b}")
def multiplicacion(a: int, b: int):
    try:
        mul.delay(a, b)
        return {"message": "Se ha enviado la tarea de multiplicar a Celery"}
    except Exception as e:
        raise HTTPException(status_code=400, detail=str(e))

Y ahora, ¿qué sigue?

Ya hemos creado o descargado los archivos, ahora iniciaremos nuestro servidor de FastAPI, si usaste la versión de GitHub con el Jupyter notebook solo basta con iniciar el cuaderno el cual ya está preparado y configurado.

Si creaste los archivos manualmente tendremos que dirigirnos a nuestra terminal y ubicarnos en la raíz de nuestra carpeta y ejecutar el siguiente comando:

- uvicorn: servidor web que se utiliza para aplicaciones web en Python
- Blog:app: especifica el módulo y la variable de la aplicación a ejecutar
- --reload: recarga automáticamente el servidor si hay cambios 
uvicorn Blog:app --reload

Veremos algo como esto en nuestra consola, además nos dará la dirección y puerto donde está corriendo nuestro servidor, en este caso 127.0.0.1 en el puerto 8000.

INFO:     Will watch for changes in these directories: ['/Users/Library/Documents/Proyect']
INFO:     Uvicorn running on http://127.0.0.1:8000 (Press CTRL+C to quit)
INFO:     Started reloader process [20301] using StatReload
INFO:     Started server process [20304]
INFO:     Waiting for application startup.
INFO:     Application startup complete.

Y por último tendremos que iniciar los workers de celery para esto colocaremos el siguiente comando en nuestra terminal:

- celery: es el comando utilizado para ejecutar los workers
- -A Celery.app: especifica el nombre del módulo y la variable de la aplicación
- --loglevel INFO: es una opción que establece que registrarán mensajes informativos en el registro
celery -A Celery.app worker --loglevel INFO

Nota: Si usas Windows o macOS y tienes algún problema con Celery, agrega el siguiente parámetro al comando: «–pool solo«. Estos problemas pueden estar relacionados con la forma en que se manejan los procesos y los recursos en estos sistemas operativos. Esta recomendación debes seguirla solo para entornos locales y de desarrollo, ya que el rendimiento es inferior en comparación con otros pools.

Veremos algo como esto, si observamos al final, podremos observar las tareas que están disponibles para ejecutar las cuales tiene el nombre que le asignamos nosotros (add y mul).

-------------- [email protected] v5.3.1 (emerald-rush)
--- ***** ----- 
-- ******* ---- macOS-13.4.1-arm64-arm-64bit 2023-06-30 22:05:02
- *** --- * --- 
- ** ---------- [config]
- ** ---------- .> app:         tasks_celery:0x405C00000
- ** ---------- .> transport:   redis://localhost:6379/0
- ** ---------- .> results:     redis://localhost:6379/0
- *** --- * --- .> concurrency: 8 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** ----- 
 -------------- [queues]
                .> celery           exchange=celery(direct) key=celery
                
[tasks]
  . add
  . mul
[2023-06-30 22:05:02,897: INFO/MainProcess] Connected to redis://localhost:6379/0
[2023-06-30 22:05:02,899: INFO/MainProcess] mingle: searching for neighbors
[2023-06-30 22:05:03,909: INFO/MainProcess] mingle: all alone
[2023-06-30 22:05:03,935: INFO/MainProcess] [email protected] ready.

Ahora a verificar que todo funciona 🤓

Accedemos a la dirección IP de nuestro servidor 127.0.0.1:8000/docs y desde esta pagina podremos probar las funciones que creamos.

Basta con seleccionar la función y escribir los parámetros necesarios y ejecutarla, veremos que nos regresa el mensaje «Se ha enviado la tarea de sumar a Celery«.

Mientras que en nuestra terminal corriendo celery podremos observar el resultado que se imprime en la consola.

[2023-06-30 22:20:42,197: INFO/MainProcess] Task add[7sh65-065d-2873-97r5-76f4f2ffg26f] received
[2023-06-30 22:20:43,202: WARNING/ForkPoolWorker-8] 60 // Resultado que imprime nuestra función
[2023-06-30 22:20:43,231: INFO/ForkPoolWorker-8] Task add[7sh65-065d-2873-97r5-76f4f2ffg26f] succeeded in 1.0310063749930123s: None

Conclusion

Hemos recorrido una guía básica sobre el uso de FastAPI, Celery y Redis. Estas tecnologías pueden ser un conjunto poderoso cuando se combinan correctamente en el desarrollo de aplicaciones web. FastAPI nos brinda un marco de trabajo rápido y eficiente para crear APIs con Python. Celery, por su parte, nos proporciona una forma escalable de manejar tareas asíncronas en segundo plano. Y finalmente, Redis nos ofrece una base de datos en memoria extremadamente rápida y versátil.