Cockpit, la solution d'emaling de Pilot Systems propose des fonctionnalité avancées qui peuvent demander beaucoup de temps de calcul. Dans le cadre d'une application web, il est hors de question de ne pas donner de retour utilisateur dans les secondes qui suivent sa requête. Nous avons donc cherché une solution permettant de lancer ces taches en asynchrone. Notre choix s'est vite porté sur Celery
Celery est un gestionnaire de tâches distribuées. C'est à dire qu'il permet de lancer des tâches de manière concurrente et asynchrone. Il se base sur un système de messages, de tâches et de "worker".
Le cas d'utilisation classique est le suivant : votre utilisateur demande à votre application une action particulièrement longue. Vous envoyez un message à Celery indiquant la tâche qu'il doit exécuter et vous renvoyez une réponse à votre utilisateur indiquant que la tâche est en cours de traitement.
Plus tard, vous pourrez recontacter votre utilisateur via AJAX ou en lui envoyant un mail lui indiquant que la tâche est terminée.
L'architecture de Celery s'appuie principalement un serveur de file de messages respectant le protocole AMPQ. chez Pilot Systems, nous avons choisis d'utiliser RabbitMQ car il est robuste, libre et performant. Ce serveur de file de messages va faire le lien entre Celery et les workers.
Les workers sont des processus python. Ils sont à l'écoute de RabbitMQ et exécutent les tâches qu'il leur fournit.
Lorsque tous les workers sont occupés, RabbitMQ attend que l'un des workers se libère pour lui donner la prochaine tâche. Une fois que l'un des worker a terminé son travail, il en informe RabbitMQ qui à son tour en informe Celery. Ce mode de fonctionnement est très performant et, vous allez le voir, vous offre également une très grande souplesse de configuration.
RabbitMQ
L'installation de Celery se fait en plusieurs étapes. En premier lieu, il vous faut installer RabbitMQ. Sur un système tel que Debian, cela est aussi simple que :
$ apt-get install rabbitmq-server
Comme RabbitMQ fonctionne sur le principe d'un serveur, il vous faut créer un hôte virtuel par projet et un utilisateur. Bien entendu vous pouvez ne créer qu'un seul vhost pour tous vos projets et un seul utilisateur a qui vous donnez tous les droits, mais pour des raisons évidentes, je vous conseil de faire comme nous faisons sur Cockpit : un vhost par projet et un utilisateur par vhost. Ainsi toutes vos instances seront isolées les unes des autres.
Cockpit est une application en SAAS qui compte aujourd'hui plus d'une centaine d'instances. Il n'était donc pas possible ou envisageable de créer les vhost et les utilisateurs "à la main". Nous avons donc réalisé un script de migration que nous avons intégré à notre processus de création d'instances.
Si vous souhaitez tester l'installation, ou si vous souhaitez installer RabbitMQ pour une seule instance, vous pouvez faire, sous l'utilisateur root :
rabbitmqctl add_user myuser mypassword rabbitmqctl add_vhost myvhost rabbitmqctl set_permissions -p myvhost myuser ".*" ".*" ".*"
remplacez bien entendu "myuser" et "mypassword" par le nom de l'utilisateur souhaité et le mypassword par le mot de passe souhaité.
Celery
Maintenant que RabbitMQ est installé, il vous faut encore installer Celery proprement dit. Pour cela, nous avons créé un paquet Debian pour Debian stable backporté depuis Debian testing. Ce paquet est disponible sur notre dépôt Debian. En utilisant ce paquet, un simple :
apt-get install python-celery
suffit.
Dans le cas où comme nous vous souhaitez utiliser Celery au sein d'un projet Django, vous allez avoir besoin de l'application "django-celery". C'est cette application qui vous permettra, depuis votre code Django, de communiquer avec Celery. Soit en lui envoyant des tâches, soit en récupérant le résultat des tâches précédemment envoyées.
La encore, nous avons fait un paquet Debian pour Debian stable qui devrais vous simplifier grandement la vie.
apt-get install python-django-celery
Maintenant, vous devez configurer votre projet Django pour lui permettre d'utiliser Celery.
Il faut donc installer django-celery sur l'instance. Pour cela, ajouter dans les "INSTALLED_APPS" des settings :
"djcelery"
puis:
import djcelery
djcelery.setup_loader()
puis, vous devez modifier le script wsgi en y ajoutant:
import os
os.environ["CELERY_LOADER"]= "django"
Enfin, vous devez indiquer à Celery quel serveur RabbitMQ utiliser. Pour se faire, dans les settings, ajoutez:
BROKER_HOST = "localhost"
BROKER_PORT = 5672
BROKER_USER = "<user>"
BROKER_PASSWORD = "<password>"
BROKER_VHOST = "<vhost>"
Django-Celery va enregistrer ses résultats dans votre base de données. Pour créer les tables dont vous avez besoin vous devrez faire :
python manage.py syncdb
Pour vérifier que votre installation et votre configuration est correcte, vous pouvez tout d'abord vérifier le status de RAbbitMQ avec la commande :
rabittmqctl status
Pour vérifier le bon fonctionnement de Celery, vous pouvez utiliser la commande :
python manage.py celeryd -l INFO
Celery devrait alors vous indiquer qu'il communique correctement avec RabbitMQ.
Au lancement de la commande :
python manage.py celeryd
django-celery va aller chercher dans chacunes de vos applications un fichier tasks.py sensé contenir toutes les tâches que vous allez lancer via Celery. Les tâches sont des fonctions python et elles on accès à tout le framework Django. Pour définir une tache, il vous faut importer le décorateur "task" depuis Celery. Votre première tache pourrait donc être écrite comme ceci :
#myapp/tasks.py from celery.decorators import task @task def add(x,y): return x+y
redémarrez votre script Celery avec la commande :
python manage.py celeryd
puis dans un terminal :
python manage.py shell >>> from myapp import tasks >>> tasks.add(4,5) 9
Ici, le fonctionnement est absolument classique. Votre tache viens d'être lancée et Python a attendu que la tache soit exécutée pour vous renvoyer le résultat. Seulement ce n'est certainement pas ce qui vous intéresse et il est beaucoup plus intéressant de lancer la tache en asynchrone. pour cela vous devez écrire :
>>> task.add().delay(4,5)
cette fois, le résultat ne va pas vous être retourné car Celery va vous rendre la main dès qu'il aura transmis le message à RabbitMQ. Vous pourrez en revanche récupérer le résultat plus tard :
>>> job = task.add().delay(4,5) >>> job.status SUCCESS >>> job.result 9
De nombreuses autres options sont disponibles avec Celery. Vous pouvez par exemple définir un rate limit par tâche afin de ne pas surcharger votre serveur avec des tâches répétées par erreur, réessayer votre tâche en cas d'erreur et spécifier le nombre de fois qu'une tâche doit être réessayée, vous pouvez créer des sous-tâches et des callbacks, vous pouvez même utiliser Celery pour réaliser des tâches à intervalle régulier et bien d'autres choses encore. Pour en savoir plus, vous pouvez consulter la documentation en ligne de Celery sur readthedocs .
La question qui se pose quand il s'agit d'utiliser Celery dans un projet Django est celle du retour utilisateur. Comment indiquer à l'utilisateur qu'une tâche est bien lancée ? Qu'elle est terminée ?
Chez Pilot Systems, et dans le cadre de notre projet Cockpit, nous avons défini deux type de tâches :
Pour ces deux types de tâches, nous utilisons le système de message de Django. Ainsi, nos utilisateurs vont dans un premier temps recevoir un message sur leur écran leur indiquant que la tâche demandée a bien été enregistrée et qu'ils recevront de nos nouvelles dans peu de temps. Ensuite, toujours via le système de messages de Django, nous leurs affichons à l'écran que leur tâche s'est bien réalisée.
Pour des tâches plus longues, nous envoyons un mail à l'utilisateur dans lequel nous lui fournissons les informations dont il a besoin pour récupérer le résultat de sa tâche.
Ce dernier point est particulièrement vital pour nous. En effet, Cockpit est un système de mailing et de contacts qui traite un très grand volume de données et certaines tâches peuvent prendre plusieurs heures. Grâce à l'intégration de celery, l'utilisateur n'est pas bloqué quand une tache est en cours.
Lorsque nous savons que la tache demandée est relativement courte, nous implémentons le code suivant dans notre vue :
from django.contrib import messages from myapp.tasks import ma_tache ma_tache().delay() messages.add_message(request, messages.INFO, "La tâche x a démarré") return render(request,'template',context)
L'utilisateur va donc avoir immédiatement un message l'informant que la tâche est en cours. Notez au passage que nous utilisons "render" qui permet de retourner l'objet request au template. Sans quoi le message ne s'affichera pas.
Là encore il faut envoyer un message à l'utilisateur pour lui indiquer que la tache qui a demandé quelque temps plus tôt est terminée. C'est un peu plus délicat car la tâche Celery n'a pas accès à la requête de l'utilisateur (puisqu'elle s'exécute de façon asynchrone).
Nous avons pris le parti de contourner ce problème en implémentant un middleware dans notre application. Ce middleware va s'appliquer pour chaque requête et va afficher un message pour chaque tâche terminée à l'utilisateur. Nous avons, lors du développement de ce middleware, constaté que Django faisait une boucle de redirection lorsque l'utilisateur n'étais pas authentifié, ce que nous avons corrigé. Nous avons également fait en sorte que ce middleware fonctionne même si la valeur de retour de la tache est vide. Cela ne devrait pas se produire mais on est jamais trop prudent. Voici ce middleware dans son intégralité :
from djcelery.models import TaskMeta from django.contrib import messages class CeleryMessageMiddleware(object): def process_request(self,request): if request.user.is_authenticated(): tasks = TaskMeta.objects.filter(status = "SUCCESS") for task in tasks: task_result = "None" if task.result != None: task_result = task.result messages.add_message(request, messages.INFO,u"Tâche terminée :"+task_result) task.delete()
Lorsque les tâches sont plus longues ou que le résultat doit être présenté à l'utilisateur, cette méthode arrive à ses limites. Par exemple, sur Cockpit, nous proposons des statistiques détaillées sur un envoie de newsletter et la possibilité de télécharger ces statistiques dans un fichier csv. Ce fichier peut contenir 500.000 entrées. Dans un cas comme celui ci, il est impossible de répondre immédiatement à l'utilisateur.
De plus cela pose de graves problèmes de performances car si vous essayez de charger 500.000 objets Python en mémoire, vous risquez de vous exposer a quelques déconvenues...
La solution que nous avons adoptée est donc d'écrire séquentiellement dans un fichier. Ainsi, à chaque ligne, la mémoire est libérée.
Il faut ensuite renvoyer une réponse à l'utilisateur. Là encore, la solution que nous avons adoptée est pensée pour les performances. Puisque nous écrivons dans un fichier, nous rendons disponible ce fichier dans notre dossier media, accessible à l'utilisateur de Cockpit. L'url du fichier est générée dynamiquement (en contenant un hachage cryptographique) pour ne pas permettre à un tiers d'avoir accès à des informations qui peuvent être confidentielles. Ensuite, nous envoyons un mail à l'utilisateur avec le lien vers ses résultats. Voyons comment cela se passe dans la pratique.
À la fin d'une tâche nous écrivons :
send_mail('Les statistiques de la newsletter %s'%newsletter.subject, """Récupérez les statistiques de la newsletter %s que vous avez demandées à cette adresse : %s """%(newsletter.subject,url), settings.DEFAULT_FROM_EMAIL, [user.email], fail_silently=False)
Celery vous permet également de lancer des tâches périodiques. Contrairement aux tâches que nous avons vu jusqu'ici, les taches périodiques vont se lancer d'elles même à intervalle régulier. Il s'agit d'une alternative, plus flexible et "haut niveau", au bon vieux cron.
Attention néanmoins a lancer Celery avec l'option -B. Dans le cas contraire, Celery ne sera pas à l'écoute des tâches périodiques.
Celery vous propose deux syntaxes différentes pour les tâches périodiques :
@periodic_task(run_every=crontab(minute=2) def votre_tache(params): etc...Cette tache va donc être lancée toutes les heures à la seconde minute
@periodic_task(run_every=timedelta(minutes=2) def votre_tache(params): etc...
va se lancer toutes les 2 minutes.
Une fois votre code testé et validé, il reste à la passer en production.
Pour cela, plusieurs étapes sont à prévoir :
Actions sur le document