Comme vous le savez si vous suivez le blog des développeurs Pilot, Cockpit, notre solution d'emailing utilise désormais Celery pour la gestion des tâches de fond.
Or s'il est un point où nous sommes particulièrement vigilants, c'est sur l'envoi des newsletters. En effet, notre solution d'emailling doit pouvoir gérer des volumes de l'ordre de 500.000 mails a minima.
Pour de tels volumes, Celery nous est d'une aide précieuse, mais bien entendu, notre système est ici adossé à une architecture système robuste comprenant :
Nous avons également des outils élaborés pour l'aide à la création de newsletter HTML puisque nous pouvons donner à nos clients un aperçu de l'affichage de leurs mails sur plus de 50 clients mails différents.
Aujourd'hui, et pour rester dans notre sujet, je vous propose de plonger un peu plus avant dans les rouages internes de notre machinerie grâce à un tour d'horizon de la préparation des mails avant envoi.
Lorsqu'un utilisateur estime sa newsletter prête à partir, il sélectionne un ensemble de tags incluant (c'est à dire que les contacts possédant le dit tag recevront la newsletter) et un ensemble de tags excluant (c'est à dire que les contacts possédant le dit tag ne recevront pas la newsletter, pour faire une black list par exemple). En plus de cela, nous avons mis en place un système de règles très strictes qui excluent d'office toutes les personnes qui se sont désinscrites d'une newsletter. Il est en effet rien de plus désagréable que de recevoir une newsletter de laquelle on s'est désabonné.
A partir de cette étape, nous avons une queryset qui n'a pas encore été évaluée. Elle peut être assez complexe, en fonction des tags qui ont été sélectionnés et des règles de désinscription, mais surtout elle peut, comme nous l'avons vu plus haut, contenir plusieurs centaines de milliers d'objets. L'évaluer en une seule fois c'est la meilleur chance de voir la consommation mémoire s’envoler vers des sommets. Nous utilisons donc un système d'offset pour "alléger" notre newsletter. Voici un exemple :
#nous comptons le nombres d'objets qs_count = queryset.count() #puis nous générons une liste de queryset de plus petite taille. #settings.NL_CHUNK_SIZE est une valeur que nous avons définie dans les settings. Par exemple 10.000 start = 0 chunk_list = [] while start <= nb: end = start + settings.NL_CHUNK_SIZE chunk_list.append(queryset[start:end]) start = end return chunk_list
Nous voici donc avec un liste de queryset contenant chacune 10.000 contacts.
Chacun de ces contacts doit recevoir une newsletter personnalisée. Avec par exemple, son nom et son prénom, ou un message spécial renseigné dans sa fiche Cockpit. Il faut également générer une version texte de cette newsletter afin de rester compatible avec ceux qui ont fait le choix de ne pas utiliser un client mail affichant le HTML.
Pour cette raison, nous allons pour chaque utilisateur générer un fichier texte comprenant le message complet, au format rfc822. Une fois cette génération terminée, nous pourrons envoyer l'ensemble des fichiers générés à Postfix.
Il pourrait sembler à première vue que la meilleur façon de faire soit l'usage des subtasks :
@task def generate_newsletters(newsletter, users): for user in users: generate_nl.subtask(newsletter,user) call_postfix_for_send()
Malheureusement, ce serait bien mal connaître la manière dont sont gérées les tâches Celery. Le code ci-dessus va lancer l'intégralité des subtask puis va lancer call_postfix_for_send() mais sans attendre le retour des generate_nl. Bref, avec un peu de malchance (par exemple, un problème de disque plein ou un redémarrage du serveur de base de données), les premières newsletters seront parties mais pas les suivantes, et il sera très difficile de ne recommencer l'envoi que des messages n'étant pas déjà partis.
Il faut donc faire en sorte que la dernière action attende que toutes les subtask soient terminées pour se lancer. Heureusement, Celery nous offre un outil qui rentre parfaitement dans nos cordes, si je puis dire : les Chords
Les Chords sont apparues dans la version 2.3 de celery. Elles sont donc pleinement utilisables avec la version que nous vous fournissons dans notre dépôt. Une Chord est une tâche qui va attendre que toutes les tâches d'un même taskset se terminent avant de se lancer. Bref, exactement ce dont nous avons besoin. Un exemple pour y voir plus clair : voici deux tâches Celery tout ce qu'il y a de plus simple. La première calcule la somme de deux nombres et la seconde calcule la somme d'un ensemble de nombres :
from celery.task import task @task def add(x, y): return x + y @task def tsum(numbers): return sum(numbers)
Voyons maintenant comment utiliser une chord pour calculer la somme des résultats de plusieurs "add" à la suite. Il faut évidement que tous les calculs soient réalisés avant de pouvoir calculer la somme globale. C'est pourquoi nous allons utiliser une Chord, de la façon suivante :
from celery.task import chord chord(add.subtask((i, i)) for i in xrange(100))(tsum.subtask()).get()
Hum... comme ceci ce n'est peut-être pas très clair, voyons comment découper cette fonction pour la rendre plus lisible.
Tout d'abord, vous allez avoir besoin d'un callback. Un callback, c'est la fonction qui va être appelée une fois que toutes les autres auront terminé. En l'occurrence il s'agit de notre fonction de calcul de la somme totale :
callback = tsum.subtask()
Il nous faut ensuite un header qui correspond à une liste de tâches à exécuter, en bref, un TaskSet :
header = [add.subtask((i, i)) for i in xrange(100)]
L'ensemble une fois mixé donne :
result = chord(header)(callback)
Et en effet, dans les tests que nous avons effectués, toutes les tâches présentes dans la liste header ont bien été exécutées lorsque le callback est appelé.
Il reste seulement une contrainte de taille. Il est absolument capital pour nos clients (et donc pour nous) qu'aucune newsletter ne soit envoyée si tous les fichiers n'ont pas été générés. Nous ne voulons pas, par exemple que la moitié de nos contacts reçoivent leur newsletter le lundi et que les autres, à cause d'une erreur qui n'aura pas été corrigée rapidement, reçoivent leur newsletter plusieurs jours plus tard. Or, s'il est un problème avec les chords telles qu'elles sont implémentées aujourd'hui, c'est qu'elles ne vérifient pas la réussite ou l'échec des tâches présentent dans le header avant de lancer le callback. Il suffit que les tâches soient terminées, en réussite ou en échec pour que le callback soit exécuté.
À l'heure où nous écrivons ses lignes, il n'existe pas de paramètres optionnels permettant de tenir compte des résultats des TaskSet dans Celery.
Chez Pilot Systems, nous avons donc écrit une chord qui se préoccupe de la TaskSet et qui va échouer en cas d'erreur de l'une des sous-tâches. Plus exactement, nous avons réécrit la fonction _unlock_chord de Celery pour gérer ce cas. Cette nouvelle fonction s'écrit comme suit :
from celery import current_app from celery.result import TaskSetResult from celery.task.sets import TaskSet, subtask @current_app.task(name="pilot.chord_unlock", max_retries=None) def _unlock_chord(setid, callback, interval=1, propagate=False, max_retries=None): try: result = TaskSetResult.restore(setid) if not result: # Well, seems the taskset was either cancelled or already # completed, give up return if result.ready(): if result.successful(): subtask(callback).delay(result.join(propagate=propagate)) result.delete() else: _unlock_chord.retry(countdown=interval, max_retries=max_retries) except (Exception), exc: _unlock_chord.retry(exc=exc, countdown=600, max_retries=3)
Ensuite, il a fallu expliquer à Celery d'utiliser notre fonction au lieu de la sienne, pour se faire, nous définissons un nouveau back-end dans lequel nous surchargeons la méthode on_chord_apply :
from djcelery.backends.database import DatabaseBackend class PilotChordBackend(DatabaseBackend): def on_chord_apply(self, setid, body, *args, **kwargs): from celery.registry import tasks tasks["pilot.chord_unlock"].apply_async((setid, body, ), kwargs, countdown=1)
Et enfin, dans la configuration, lui dire d'utiliser notre propre back-end :
# # Use our own Celery backend # CELERY_RESULT_BACKEND = 'pilot' from celery.backends import BACKEND_ALIASES BACKEND_ALIASES['pilot'] = 'common.pchord.PilotChordBackend'
Maintenant que la question des erreurs est réglée, il reste un soucis. Il se peut que la génération du fichier de la newsletter échoue à un moment donné à cause d'erreurs indépendantes du code. Par exemple, pas assez de mémoire au moment de la génération, ou un problème de communication avec la base de données, bref, les raisons peuvent être multiples. Il faut donc trouver un moyen de mettre en place la reprise sur erreur. Heureusement, Celery nous offre un outil très utile avec le paramètre retry.
Le retry est un mécanisme qui va permettre de relancer une tâche ayant échouée. Ce mécanisme est assez souple et permet de définir le temps d'attente entre chaque essai et le nombre d'essais avant d'abandonner définitivement.
Un exemple de code pour mieux en comprendre le fonctionnement :
@task def add(x,y): try: do something stupid except (Exception), exc: add.retry(exc=exc, countdown=600, max_retries=3)
Vous l'aurez compris, l'important dans ce code réside dans :
add.retry(exc=exc, countdown=600, max_retries=3)
Après l'exception, le retry va se charger de réessayer de réaliser la tâche avec :
Dans le cas présent, en modifiant un peu le comportement par défaut de Celery, il est possible de créer un ensemble de tâches asynchrones tout en conservant un contrôle fort du workflow applicatif. En découpant intelligemment les fonctions, nous avons en plus des gains en terme de performances avoisinant les x10 par rapport aux solutions précédentes.
Nous espérons que ce tour d'horizon de Celery, autour d'un exemple de la vraie vie vous aura donné à vous aussi l'envie de vous jeter dans l'aventure et d'intégrer également Celery dans vos projets.
Actions sur le document