
* Pour une plus grande modularité, event a été refactorisé, ce qui a impliqué de réécrire le fonctionnement des services. * Maintenant, y a plus qu'à tester.
127 lines
5.8 KiB
Python
Executable file
127 lines
5.8 KiB
Python
Executable file
#!/bin/bash /usr/scripts/python.sh
|
|
# -*- coding: utf-8 -*-
|
|
#
|
|
# Trigger library, designed to be called on any host, it will
|
|
# retreive its appropriate modules based on the hostname. Based
|
|
# on the Crans Message Broker stuff, it adds specific methods designed
|
|
# to be used as a replacement for generate
|
|
#
|
|
# Author : Pierre-Elliott Bécue <becue@crans.org>
|
|
# License : GPLv3
|
|
# Date : 29/04/2014
|
|
|
|
import argparse
|
|
import cranslib.clogger as clogger
|
|
import cmb
|
|
import cPickle
|
|
import socket
|
|
import gestion.config.trigger as trigger_config
|
|
import gestion.affichage as affichage
|
|
import sys
|
|
from gestion.trigger.host import trigger
|
|
|
|
hostname = socket.gethostname().split(".")[0]
|
|
|
|
# Ce bloc contient le peu de "magie" de la librairie, on utilise les services listés dans config/trigger.py
|
|
# comme référence. Pour éviter toute redondance, la commande importe donc les services utiles suivant cette
|
|
# config. Leur import ne sert pas directemet, il permet juste de peupler la TriggerFactory contenue dans
|
|
# gestion/trigger/host.py.
|
|
# Il faut donc bien importer ces fichiers, mais ils ne sont pas utilisés directement ensuite.
|
|
import importlib
|
|
services = {}
|
|
for config_service in trigger_config.services[hostname]:
|
|
services[config_service] = importlib.import_module("gestion.trigger.services.%s" % (config_service,))
|
|
logger = clogger.CLogger("trigger", "info")
|
|
|
|
class EvenementListener(cmb.AsynchronousConsumer):
|
|
"""
|
|
Gestionnaire d'événement
|
|
"""
|
|
|
|
def on_message(self, channel, basic_deliver, properties, body):
|
|
"""Invoked by pika when a message is delivered from RabbitMQ. The
|
|
channel is passed for your convenience. The basic_deliver object that
|
|
is passed in carries the exchange, routing key, delivery tag and
|
|
a redelivered flag for the message. The properties passed in is an
|
|
instance of BasicProperties with the message properties and the body
|
|
is the message that was sent.
|
|
|
|
:param pika.channel.Channel channel: The channel object
|
|
:param pika.Spec.Basic.Deliver: basic_deliver method
|
|
:param pika.Spec.BasicProperties: properties
|
|
:param str|unicode body: The message body
|
|
|
|
"""
|
|
about = basic_deliver.routing_key.split(".")[1]
|
|
# Peut-être utile plus tard
|
|
#origin = properties.app_id
|
|
#message_id = properties.message_id
|
|
logger.info('Received message # %s from %s: %s',
|
|
basic_deliver.delivery_tag, properties.app_id, body)
|
|
body = cPickle.loads(body)
|
|
# On tente d'invoquer le trigger attendu, à l'aide de la méthode trigger
|
|
# about contient le nom de la fonction à appeler, body lui est filé en argument.
|
|
try:
|
|
if about in trigger_config.services[hostname]:
|
|
trigger(about).regen(body)
|
|
else:
|
|
raise AttributeError
|
|
except AttributeError:
|
|
logger.warning('No suitable trigger found for message # %s from %s: %s on host %s. Discarding it.',
|
|
basic_deliver.delivery_tag, properties.app_id, body, hostname)
|
|
self.acknowledge_message(basic_deliver.delivery_tag)
|
|
|
|
def run(self):
|
|
"""Run the example consumer by connecting to RabbitMQ and then
|
|
starting the IOLoop to block and allow the SelectConnection to operate.
|
|
|
|
"""
|
|
logger.info("""Crans Message Broker
|
|
+--------------------------------------------+
|
|
| Welcome on Trigger |
|
|
+--------------------------------------------+""")
|
|
self._connection = self.connect()
|
|
for service in trigger_config.services[hostname]:
|
|
self.add_queue("trigger-%s-%s" % (hostname, service), "trigger.%s" % (service,))
|
|
self._connection.ioloop.start()
|
|
|
|
def daemonize():
|
|
listener = EvenementListener(trigger_config.master, "trigger", "topic")
|
|
try:
|
|
listener.run()
|
|
except KeyboardInterrupt:
|
|
logger.warning("Caught SIGINT, will now go for shutdown.")
|
|
listener.stop()
|
|
|
|
if __name__ == '__main__':
|
|
# We use a parser to capture all possible arguments designed for one host
|
|
parser = argparse.ArgumentParser(description="Initier une régénération de services.", add_help=False)
|
|
parser.add_argument('-a', '--all', help="Régénération complète des services sur l'hôte %s." % (hostname,), action="store_true")
|
|
parser.add_argument('-d', '--daemon', help="Écouter sur civet en arrière plan.", action="store_true")
|
|
parser.add_argument('-h', '--help', help="Affiche ce message et quitte.", action="store_true")
|
|
# For each service supposingly managed by host, generate one parser option
|
|
# Deuxième petit morceau "magique" du code.
|
|
for arg_service in trigger_config.services[hostname]:
|
|
parser.add_argument('--%s' % (arg_service,), help="Force la régénération du service %s." % (arg_service,), action="store_true")
|
|
args = parser.parse_args()
|
|
|
|
if args.help:
|
|
parser.print_help()
|
|
sys.exit(0)
|
|
elif args.all:
|
|
# Regenerates all services availables, don't crash on nonexistant ones
|
|
for host_service in trigger_config.services[hostname]:
|
|
try:
|
|
print affichage.style(" (Ré)Génération du service %s" % (host_service,), "cyan")
|
|
trigger(host_service)(True)
|
|
except AttributeError:
|
|
print "No suitable trigger handle found for service %s on host %s" % (host_service, hostname)
|
|
elif args.daemon:
|
|
# Daemonize the trigger app, in order to listen and execute commands from civet.
|
|
daemonize()
|
|
else:
|
|
# If not all and not daemon, try all services one by one.
|
|
for arg_service in trigger_config.services[hostname]:
|
|
if getattr(args, arg_service, False) == True:
|
|
print affichage.style(" (Ré)Génération du service %s" % (arg_service,), "cyan")
|
|
trigger(arg_service)(True)
|