Vous êtes ici : Accueil / 2012 / Avril / Django Celery sur Cockpit : les cordes

Django Celery sur Cockpit : les cordes

écrit le 21/04/2012 Par Yohann Gabory
Après notre premier article portant sur l'intégration de Celery dans Cockpit, nous vous proposons de plonger un peu plus profondément dans les entrailles de Celery et en particulier sur les chords.

Comme vous le savez si vous suivez le blog des développeurs Pilot, Cockpit, notre solution d'emailing utilise désormais Celery pour la gestion des tâches de fond.

Or s'il est un point où nous sommes particulièrement vigilants, c'est sur l'envoi des newsletters. En effet, notre solution d'emailling doit pouvoir gérer des volumes de l'ordre de 500.000 mails a minima.

La gestion de l'envoi de newsletter dans Cockpit

Pour de tels volumes, Celery nous est d'une aide précieuse, mais bien entendu, notre système est ici adossé à une architecture système robuste comprenant :

  1. Un système de load balancing sur pas moins de 3 serveurs Postfix. Il faut également prendre en compte le fonctionnement parfois étrange de certains routeurs de mail comme Gmail, Hotmail et Yahoo qui nous obligent à complexifier cette architecture.
  2. Un système de gestion des transactions qui s'assure dans un cas que les mails partent tous, ou dans l'autre cas qu'aucun mail ne parte. Ce système permet de relancer l'envoi si besoin, sans générer plusieurs fois le même mail à l'attention du même destinataire.

Nous avons également des outils élaborés pour l'aide à la création de newsletter HTML puisque nous pouvons donner à nos clients un aperçu de l'affichage de leurs mails sur plus de 50 clients mails différents.

Aujourd'hui, et pour rester dans notre sujet, je vous propose de plonger un peu plus avant dans les rouages internes de notre machinerie grâce à un tour d'horizon de la préparation des mails avant envoi.

Notre besoin

Lorsqu'un utilisateur estime sa newsletter prête à partir, il sélectionne un ensemble de tags incluant (c'est à dire que les contacts possédant le dit tag recevront la newsletter) et un ensemble de tags excluant (c'est à dire que les contacts possédant le dit tag ne recevront pas la newsletter, pour faire une black list par exemple). En plus de cela, nous avons mis en place un système de règles très strictes qui excluent d'office toutes les personnes qui se sont désinscrites d'une newsletter. Il est en effet rien de plus désagréable que de recevoir une newsletter de laquelle on s'est désabonné.

A partir de cette étape, nous avons une queryset qui n'a pas encore été évaluée. Elle peut être assez complexe, en fonction des tags qui ont été sélectionnés et des règles de désinscription, mais surtout elle peut, comme nous l'avons vu plus haut, contenir plusieurs centaines de milliers d'objets. L'évaluer en une seule fois c'est la meilleur chance de voir la consommation mémoire s’envoler vers des sommets. Nous utilisons donc un système d'offset pour "alléger" notre newsletter. Voici un exemple :

#nous comptons le nombres d'objets
qs_count = queryset.count()
#puis nous générons une liste de queryset de plus petite taille.
#settings.NL_CHUNK_SIZE est une valeur que nous avons définie dans les settings. Par exemple 10.000
start = 0
chunk_list = []
while start <= nb:
    end = start + settings.NL_CHUNK_SIZE
    chunk_list.append(queryset[start:end])
    start = end
return chunk_list

Nous voici donc avec un liste de queryset contenant chacune 10.000 contacts.

Chacun de ces contacts doit recevoir une newsletter personnalisée. Avec par exemple, son nom et son prénom, ou un message spécial renseigné dans sa fiche Cockpit. Il faut également générer une version texte de cette newsletter afin de rester compatible avec ceux qui ont fait le choix de ne pas utiliser un client mail affichant le HTML.

Pour cette raison, nous allons pour chaque utilisateur générer un fichier texte comprenant le message complet, au format rfc822. Une fois cette génération terminée, nous pourrons envoyer l'ensemble des fichiers générés à Postfix.

Le problème

Il pourrait sembler à première vue que la meilleur façon de faire soit l'usage des subtasks :

@task
def generate_newsletters(newsletter, users):
    for user in users:
        generate_nl.subtask(newsletter,user)
    call_postfix_for_send()

Malheureusement, ce serait bien mal connaître la manière dont sont gérées les tâches Celery. Le code ci-dessus va lancer l'intégralité des subtask puis va lancer call_postfix_for_send() mais sans attendre le retour des generate_nl. Bref, avec un peu de malchance (par exemple, un problème de disque plein ou un redémarrage du serveur de base de données), les premières newsletters seront parties mais pas les suivantes, et il sera très difficile de ne recommencer l'envoi que des messages n'étant pas déjà partis.

Il faut donc faire en sorte que la dernière action attende que toutes les subtask soient terminées pour se lancer. Heureusement, Celery nous offre un outil qui rentre parfaitement dans nos cordes, si je puis dire : les Chords

Description du fonctionnement des Chords

Les Chords sont apparues dans la version 2.3 de celery. Elles sont donc pleinement utilisables avec la version que nous vous fournissons dans notre dépôt. Une Chord est une tâche qui va attendre que toutes les tâches d'un même taskset se terminent avant de se lancer. Bref, exactement ce dont nous avons besoin. Un exemple pour y voir plus clair : voici deux tâches Celery tout ce qu'il y a de plus simple. La première calcule la somme de deux nombres et la seconde calcule la somme d'un ensemble de nombres :

from celery.task import task
@task
def add(x, y):
    return x + y

@task
def tsum(numbers):
    return sum(numbers)

Voyons maintenant comment utiliser une chord pour calculer la somme des résultats de plusieurs "add" à la suite. Il faut évidement que tous les calculs soient réalisés avant de pouvoir calculer la somme globale. C'est pourquoi nous allons utiliser une Chord, de la façon suivante :

from celery.task import chord
chord(add.subtask((i, i)) for i in xrange(100))(tsum.subtask()).get()

Hum... comme ceci ce n'est peut-être pas très clair, voyons comment découper cette fonction pour la rendre plus lisible.

Tout d'abord, vous allez avoir besoin d'un callback. Un callback, c'est la fonction qui va être appelée une fois que toutes les autres auront terminé. En l'occurrence il s'agit de notre fonction de calcul de la somme totale :

callback = tsum.subtask()

Il nous faut ensuite un header qui correspond à une liste de tâches à exécuter, en bref, un TaskSet :

header = [add.subtask((i, i)) for i in xrange(100)]

L'ensemble une fois mixé donne :

result = chord(header)(callback)

Et en effet, dans les tests que nous avons effectués, toutes les tâches présentes dans la liste header ont bien été exécutées lorsque le callback est appelé.

La gestion des erreurs

Il reste seulement une contrainte de taille. Il est absolument capital pour nos clients (et donc pour nous) qu'aucune newsletter ne soit envoyée si tous les fichiers n'ont pas été générés. Nous ne voulons pas, par exemple que la moitié de nos contacts reçoivent leur newsletter le lundi et que les autres, à cause d'une erreur qui n'aura pas été corrigée rapidement, reçoivent leur newsletter plusieurs jours plus tard. Or, s'il est un problème avec les chords telles qu'elles sont implémentées aujourd'hui, c'est qu'elles ne vérifient pas la réussite ou l'échec des tâches présentent dans le header avant de lancer le callback. Il suffit que les tâches soient terminées, en réussite ou en échec pour que le callback soit exécuté.

À l'heure où nous écrivons ses lignes, il n'existe pas de paramètres optionnels permettant de tenir compte des résultats des TaskSet dans Celery.

Pilot_chord : une corde intelligente.

Chez Pilot Systems, nous avons donc écrit une chord qui se préoccupe de la TaskSet et qui va échouer en cas d'erreur de l'une des sous-tâches. Plus exactement, nous avons réécrit la fonction _unlock_chord de Celery pour gérer ce cas. Cette nouvelle fonction s'écrit comme suit :

from celery import current_app
from celery.result import TaskSetResult
from celery.task.sets import TaskSet, subtask

@current_app.task(name="pilot.chord_unlock", max_retries=None)
def _unlock_chord(setid, callback, interval=1, propagate=False,
        max_retries=None):
    try:
        result = TaskSetResult.restore(setid)
        if not result:
            # Well, seems the taskset was either cancelled or already
            # completed, give up
            return
        if result.ready():
            if result.successful():
                subtask(callback).delay(result.join(propagate=propagate))
                result.delete()
        else:
            _unlock_chord.retry(countdown=interval, max_retries=max_retries)
    except (Exception), exc:
        _unlock_chord.retry(exc=exc, countdown=600, max_retries=3)

Ensuite, il a fallu expliquer à Celery d'utiliser notre fonction au lieu de la sienne, pour se faire, nous définissons un nouveau back-end dans lequel nous surchargeons la méthode on_chord_apply :

from djcelery.backends.database import DatabaseBackend

class PilotChordBackend(DatabaseBackend):
    def on_chord_apply(self, setid, body, *args, **kwargs):
        from celery.registry import tasks
        tasks["pilot.chord_unlock"].apply_async((setid, body, ), kwargs,
                                                countdown=1)

Et enfin, dans la configuration, lui dire d'utiliser notre propre back-end :

#
# Use our own Celery backend
#
CELERY_RESULT_BACKEND = 'pilot'
from celery.backends import BACKEND_ALIASES
BACKEND_ALIASES['pilot'] = 'common.pchord.PilotChordBackend'

La reprise sur erreur

Maintenant que la question des erreurs est réglée, il reste un soucis. Il se peut que la génération du fichier de la newsletter échoue à un moment donné à cause d'erreurs indépendantes du code. Par exemple, pas assez de mémoire au moment de la génération, ou un problème de communication avec la base de données, bref, les raisons peuvent être multiples. Il faut donc trouver un moyen de mettre en place la reprise sur erreur. Heureusement, Celery nous offre un outil très utile avec le paramètre retry.

Le retry est un mécanisme qui va permettre de relancer une tâche ayant échouée. Ce mécanisme est assez souple et permet de définir le temps d'attente entre chaque essai et le nombre d'essais avant d'abandonner définitivement.

Un exemple de code pour mieux en comprendre le fonctionnement :

@task
def add(x,y):
    try:
        do something stupid
    except (Exception), exc:
        add.retry(exc=exc, countdown=600, max_retries=3)

Vous l'aurez compris, l'important dans ce code réside dans :

add.retry(exc=exc, countdown=600, max_retries=3)

Après l'exception, le retry va se charger de réessayer de réaliser la tâche avec :

  • les mêmes paramètres, qu'il extrait de l'objet exception (exc) ;
  • un temps d'attente de 600 secondes (10 minutes) ;
  • un nombre d'essais égal à 3 (max_retries).

En conclusion

Dans le cas présent, en modifiant un peu le comportement par défaut de Celery, il est possible de créer un ensemble de tâches asynchrones tout en conservant un contrôle fort du workflow applicatif. En découpant intelligemment les fonctions, nous avons en plus des gains en terme de performances avoisinant les x10 par rapport aux solutions précédentes.

Nous espérons que ce tour d'horizon de Celery, autour d'un exemple de la vraie vie vous aura donné à vous aussi l'envie de vous jeter dans l'aventure et d'intégrer également Celery dans vos projets.

Actions sur le document