#!/bin/bash /usr/scripts/python.sh # -*- coding: utf-8 -*- # # Trigger library, designed to be called on any host, it will # retreive its appropriate modules based on the hostname. Based # on the Crans Message Broker stuff, it adds specific methods designed # to be used as a replacement for generate # # Author : Pierre-Elliott Bécue # License : GPLv3 # Date : 10/03/2015 """Main program for trigger library""" import argparse import cPickle import socket import traceback import sys import pika import gestion.secrets_new as secrets import gestion.config.trigger as trigger_config import gestion.affichage as affichage from gestion.trigger.host import trigger_service import cranslib.clogger as clogger import cmb HOSTNAME = socket.gethostname().split(".")[0] LOGGER = clogger.CLogger("trigger", "trigger", trigger_config.log_level, trigger_config.debug) # Ce bloc contient le peu de "magie" de la librairie, on utilise les services listés dans config/trigger.py # comme référence. Pour éviter toute redondance, la commande importe donc les services utiles suivant cette # config. Leur import ne sert pas directement, il permet juste de peupler la TriggerFactory contenue dans # gestion/trigger/host.py. # Il faut donc bien importer ces fichiers, mais ils ne sont pas utilisés directement ensuite. import importlib SERVICES = {} for config_service in trigger_config.services[HOSTNAME]: try: SERVICES[config_service] = importlib.import_module("gestion.trigger.services.%s" % (config_service,)) except Exception as e: LOGGER.critical("Fatal : import of %s failed, see following traceback. %s", config_service, traceback.format_exc()) class EvenementListener(cmb.AsynchronousConsumer): """ Gestionnaire d'événement """ def on_message(self, channel, basic_deliver, properties, body): """Invoked by pika when a message is delivered from RabbitMQ. The channel is passed for your convenience. The basic_deliver object that is passed in carries the exchange, routing key, delivery tag and a redelivered flag for the message. The properties passed in is an instance of BasicProperties with the message properties and the body is the message that was sent. :param pika.channel.Channel channel: The channel object :param pika.Spec.Basic.Deliver: basic_deliver method :param pika.Spec.BasicProperties: properties :param str|unicode body: The message body """ about = basic_deliver.routing_key.split(".")[1] # Peut-être utile plus tard #origin = properties.app_id #message_id = properties.message_id body = cPickle.loads(body) LOGGER.info('Received message # %s from %s: %s', basic_deliver.delivery_tag, properties.app_id, body) # On tente d'invoquer le trigger attendu, à l'aide de la méthode trigger # about contient le nom de la fonction à appeler, body lui est filé en argument. try: if about in trigger_config.services[HOSTNAME]: trigger_service(about)(*body) else: raise AttributeError except (AttributeError, TypeError) as error_message: LOGGER.warning('No suitable trigger found for message # %s from %s: %s on host %s. Discarding it. (traceback: %r) (args: %r)', basic_deliver.delivery_tag, properties.app_id, body, HOSTNAME, traceback.format_exc(), body) self.acknowledge_message(basic_deliver.delivery_tag) def run(self): """Run the example consumer by connecting to RabbitMQ and then starting the IOLoop to block and allow the SelectConnection to operate. """ LOGGER.info("""Crans Message Broker +--------------------------------------------+ | Welcome on Trigger | +--------------------------------------------+""") self._connection = self.connect() for service in trigger_config.services[HOSTNAME]: self.add_queue("trigger-%s-%s" % (HOSTNAME, service), "trigger.%s" % (service,)) self._connection.ioloop.start() def daemonize(): """Runs the script in "background".""" trigger_password = secrets.get('rabbitmq_trigger_password') credentials = pika.PlainCredentials(trigger_config.user, trigger_password) listener = EvenementListener(url=trigger_config.master, exchange_name="trigger", exchange_type="topic", port=trigger_config.port, credentials=credentials, ssl=trigger_config.ssl) try: listener.run() except KeyboardInterrupt: LOGGER.warning("Caught SIGINT, will now go for shutdown.") listener.stop() if __name__ == '__main__': # We use a parser to capture all possible arguments designed for one host PARSER = argparse.ArgumentParser(description="Initier une régénération de services.", add_help=False) PARSER.add_argument('-a', '--all', help="Régénération complète des services sur l'hôte %s." % (HOSTNAME,), action="store_true") PARSER.add_argument('-d', '--daemon', help="Écouter en arrière plan.", action="store_true") PARSER.add_argument('-h', '--help', help="Affiche ce message et quitte.", action="store_true") # For each service supposingly managed by host, generate one parser option # Deuxième petit morceau "magique" du code. for arg_service in trigger_config.services[HOSTNAME]: PARSER.add_argument('--%s' % (arg_service,), help="Force la régénération du service %s." % (arg_service,), action="store_true") ARGS = PARSER.parse_args() if ARGS.help: PARSER.print_help() sys.exit(0) elif ARGS.all: # Regenerates all services availables, don't crash on nonexistant ones for host_service in trigger_config.services[HOSTNAME]: try: print affichage.style(" (Ré)Génération du service %s" % (host_service,), "cyan") trigger_service(host_service)('full_regeneration', True) except (AttributeError, TypeError): print "No suitable trigger handle found for service %s on host %s. Perhaps True is not accepted as a keyword." % (host_service, HOSTNAME) elif ARGS.daemon: # Daemonize the trigger app, in order to listen and execute commands from civet. daemonize() else: # If not all and not daemon, try all services one by one. for arg_service in trigger_config.services[HOSTNAME]: if getattr(ARGS, arg_service, False) == True: print affichage.style(" (Ré)Génération du service %s" % (arg_service,), "cyan") trigger_service(arg_service)('full_regeneration', True)