scripts/gestion/trigger/trigger.py

141 lines
6.6 KiB
Python
Executable file

#!/bin/bash /usr/scripts/python.sh
# -*- coding: utf-8 -*-
#
# Trigger library, designed to be called on any host, it will
# retreive its appropriate modules based on the hostname. Based
# on the Crans Message Broker stuff, it adds specific methods designed
# to be used as a replacement for generate
#
# Author : Pierre-Elliott Bécue <becue@crans.org>
# License : GPLv3
# Date : 10/03/2015
"""Main program for trigger library"""
import argparse
import cPickle
import socket
import traceback
import sys
import pika
import gestion.secrets_new as secrets
import gestion.config.trigger as trigger_config
import gestion.affichage as affichage
from gestion.trigger.host import trigger_service
import cranslib.clogger as clogger
import cmb
HOSTNAME = socket.gethostname().split(".")[0]
LOGGER = clogger.CLogger("trigger", "trigger", trigger_config.log_level, trigger_config.debug)
# Ce bloc contient le peu de "magie" de la librairie, on utilise les services listés dans config/trigger.py
# comme référence. Pour éviter toute redondance, la commande importe donc les services utiles suivant cette
# config. Leur import ne sert pas directement, il permet juste de peupler la TriggerFactory contenue dans
# gestion/trigger/host.py.
# Il faut donc bien importer ces fichiers, mais ils ne sont pas utilisés directement ensuite.
import importlib
SERVICES = {}
for config_service in trigger_config.services[HOSTNAME]:
try:
SERVICES[config_service] = importlib.import_module("gestion.trigger.services.%s" % (config_service,))
except Exception as e:
LOGGER.critical("Fatal : import of %s failed, see following traceback. %s", config_service, traceback.format_exc())
class EvenementListener(cmb.AsynchronousConsumer):
"""
Gestionnaire d'événement
"""
def on_message(self, channel, basic_deliver, properties, body):
"""Invoked by pika when a message is delivered from RabbitMQ. The
channel is passed for your convenience. The basic_deliver object that
is passed in carries the exchange, routing key, delivery tag and
a redelivered flag for the message. The properties passed in is an
instance of BasicProperties with the message properties and the body
is the message that was sent.
:param pika.channel.Channel channel: The channel object
:param pika.Spec.Basic.Deliver: basic_deliver method
:param pika.Spec.BasicProperties: properties
:param str|unicode body: The message body
"""
about = basic_deliver.routing_key.split(".")[1]
# Peut-être utile plus tard
#origin = properties.app_id
#message_id = properties.message_id
body = cPickle.loads(body)
LOGGER.info('Received message # %s from %s: %s',
basic_deliver.delivery_tag, properties.app_id, body)
# On tente d'invoquer le trigger attendu, à l'aide de la méthode trigger
# about contient le nom de la fonction à appeler, body lui est filé en argument.
try:
if about in trigger_config.services[HOSTNAME]:
trigger_service(about)(*body)
else:
raise AttributeError
except (AttributeError, TypeError) as error_message:
LOGGER.warning('No suitable trigger found for message # %s from %s: %s on host %s. Discarding it. (traceback: %r) (args: %r)',
basic_deliver.delivery_tag, properties.app_id, body, HOSTNAME, traceback.format_exc(), body)
self.acknowledge_message(basic_deliver.delivery_tag)
def run(self):
"""Run the example consumer by connecting to RabbitMQ and then
starting the IOLoop to block and allow the SelectConnection to operate.
"""
LOGGER.info("""Crans Message Broker
+--------------------------------------------+
| Welcome on Trigger |
+--------------------------------------------+""")
self._connection = self.connect()
for service in trigger_config.services[HOSTNAME]:
self.add_queue("trigger-%s-%s" % (HOSTNAME, service), "trigger.%s" % (service,))
self._connection.ioloop.start()
def daemonize():
"""Runs the script in "background"."""
trigger_password = secrets.get('rabbitmq_trigger_password')
credentials = pika.PlainCredentials(trigger_config.user, trigger_password)
listener = EvenementListener(url=trigger_config.master, exchange_name="trigger", exchange_type="topic", port=trigger_config.port, credentials=credentials, ssl=trigger_config.ssl)
try:
listener.run()
except KeyboardInterrupt:
LOGGER.warning("Caught SIGINT, will now go for shutdown.")
listener.stop()
if __name__ == '__main__':
# We use a parser to capture all possible arguments designed for one host
PARSER = argparse.ArgumentParser(description="Initier une régénération de services.", add_help=False)
PARSER.add_argument('-a', '--all', help="Régénération complète des services sur l'hôte %s." % (HOSTNAME,), action="store_true")
PARSER.add_argument('-d', '--daemon', help="Écouter en arrière plan.", action="store_true")
PARSER.add_argument('-h', '--help', help="Affiche ce message et quitte.", action="store_true")
# For each service supposingly managed by host, generate one parser option
# Deuxième petit morceau "magique" du code.
for arg_service in trigger_config.services[HOSTNAME]:
PARSER.add_argument('--%s' % (arg_service,), help="Force la régénération du service %s." % (arg_service,), action="store_true")
ARGS = PARSER.parse_args()
if ARGS.help:
PARSER.print_help()
sys.exit(0)
elif ARGS.all:
# Regenerates all services availables, don't crash on nonexistant ones
for host_service in trigger_config.services[HOSTNAME]:
try:
print affichage.style(" (Ré)Génération du service %s" % (host_service,), "cyan")
trigger_service(host_service)('full_regeneration', True)
except (AttributeError, TypeError):
print "No suitable trigger handle found for service %s on host %s. Perhaps True is not accepted as a keyword." % (host_service, HOSTNAME)
elif ARGS.daemon:
# Daemonize the trigger app, in order to listen and execute commands from civet.
daemonize()
else:
# If not all and not daemon, try all services one by one.
for arg_service in trigger_config.services[HOSTNAME]:
if getattr(ARGS, arg_service, False) == True:
print affichage.style(" (Ré)Génération du service %s" % (arg_service,), "cyan")
trigger_service(arg_service)('full_regeneration', True)