diff --git a/gestion/config/trigger.py b/gestion/config/trigger.py index 38b370d8..5f545b0a 100644 --- a/gestion/config/trigger.py +++ b/gestion/config/trigger.py @@ -19,17 +19,17 @@ ssl = True # useradd : Envoie le mail de bienvenue, et crée le home # userdel : Détruit le home, déconnecte l'utilisateur sur zamok, détruit les indexes dovecot, désinscrit l'adresse crans des mailing listes associées services = { - 'civet' : ["event"], + 'civet' : ["event", "ack"], 'dhcp' : ["dhcp"], 'dyson' : ["autostatus"], 'isc' : ["dhcp"], 'komaz' : ["firewall", "secours"], - 'owl' : ["userdel"], - 'redisdead' : ["mailman", "modif_ldap", "solde", "userdel", "secours"], + 'owl' : ["users"], + 'redisdead' : ["mailman", "modif_ldap", "solde", "users", "secours"], 'sable' : ["dns"], 'titanic' : ["secours"], - 'zamok' : ["userdel"], - 'zbee' : ["useradd", "userdel"], + 'zamok' : ["users"], + 'zbee' : ["users"], } # XXX - Uncomment this when in prod diff --git a/gestion/trigger/host.py b/gestion/trigger/host.py index 7df75e66..1fdb99b9 100644 --- a/gestion/trigger/host.py +++ b/gestion/trigger/host.py @@ -9,8 +9,22 @@ # Author : Pierre-Elliott Bécue # License : GPLv3 # Date : 28/04/2014 +"""This module provides host functions for trigger, such as the TriggerFactory which +stores parsers and services metadata. +""" import collections +import functools + +import gestion.config.trigger as trigger_config +from gestion.trigger.producer import EventProducer + +# Clogger +import cranslib.clogger as clogger + +LOGGER = clogger.CLogger("trigger", "host.py/ack", trigger_config.log_level, trigger_config.debug) + +PRODUCER = EventProducer("trigger.civet") class TriggerFactory(object): """Factory containing which function is part of the trigger set @@ -22,38 +36,96 @@ class TriggerFactory(object): @classmethod def register_service(cls, key, value): + """Stores the appropriate service in the factory""" cls._services[key] = value @classmethod def get_service(cls, key): + """Retrieves the appropriate service""" return cls._services.get(key, None) @classmethod def get_services(cls): + """Retrieves the list of all services""" return cls._services.values() @classmethod def register_parser(cls, keys, parser): + """Stores the attributes to watch and the function""" for key in keys: cls._parsers[key].append(parser) @classmethod def get_parser(cls, keyword): + """Restitutes the parser using keywords""" return cls._parsers[keyword] -def record_service(func): +def record_service(ack=True): """Records in the triggerfactory the function The function provided are services to regen """ - TriggerFactory.register_service(func.func_name, func) + def enhance_func(func): + """Creates an enhanced function which tests if ack is True and + creates an ack if it's the case.""" + @functools.wraps(func) + def enhanced_func(*args, **kwargs): + """Dummy""" + # The first arg is ob_id, execpt if kwargs. + if args: + __ob_id = args[0] + else: + __ob_id = kwargs['ob_id'] + + # The function does not return. + func(*args, **kwargs) + + LOGGER.debug("[%r] Ran %r on (%r, %r)", __ob_id, func.func_name, args, kwargs, ) + + if ack: + # We send directly with routing key trigger.ack on the way. + # Thus, ack service does not need any parser. + routing_key = "ack" + body = (__ob_id, func.func_name) + LOGGER.debug("[%r] Ack %r.", __ob_id, body) + PRODUCER.send_message("trigger.%s" % (routing_key,), body) + TriggerFactory.register_service(func.func_name, enhanced_func) + return enhanced_func + return enhance_func def trigger_service(what): + """Calls the appropriate service""" return TriggerFactory.get_service(what) def record_parser(*args): + """Stores the function in TriggerFactory, using args as + keys for the dict""" + def find_parser(func): - TriggerFactory.register_parser(args, func) - return func + """Adds the chaining_pos at the end of the return of functions.""" + @functools.wraps(func) + def enhanced_func(*args, **kwargs): + """dummy""" + __ob_id = args[0] + ret = func(*args, **kwargs) + LOGGER.debug("[%r] In record_parser.find_parser, ran %r(%r, %r). Got %r.", __ob_id, func.func_name, args, kwargs, ret) + if ret is not None: + ret = [elem for elem in ret] + [getattr(func, "chaining_pos", 0)] + LOGGER.debug("[%r] In record_parser.find_parser, for %r got chaining_pos %r", __ob_id, func.func_name, ret[-1]) + return ret + TriggerFactory.register_parser(args, enhanced_func) + return enhanced_func + return find_parser + +def chaining(pos): + """Allows chaining of operations, by adding a position marker + on the function.""" + + def add_pos(func): + """Adds the chaining_pos variable to func""" + setattr(func, "chaining_pos", pos) + return func + + return add_pos diff --git a/gestion/trigger/parsers/dhcp.py b/gestion/trigger/parsers/dhcp.py index 56f74f8c..cfc82626 100644 --- a/gestion/trigger/parsers/dhcp.py +++ b/gestion/trigger/parsers/dhcp.py @@ -16,13 +16,14 @@ import lc_ldap.attributs from gestion.trigger.host import record_parser @record_parser(lc_ldap.attributs.macAddress.ldap_name, lc_ldap.attributs.ipHostNumber.ldap_name) -def send_mac_ip(body, diff): +def send_mac_ip(ob_id, body, diff): """Computes mac_ip data to send from body and diff - """ - macs = tuple([body[i].get(lc_ldap.attributs.macAddress.ldap_name, [''])[0] for i in xrange(1, 3)]) - ips = tuple([body[i].get(lc_ldap.attributs.ipHostNumber.ldap_name, [''])[0] for i in xrange(1, 3)]) - hostnames = tuple([body[i].get(lc_ldap.attributs.host.ldap_name, [''])[0] for i in xrange(1, 3)]) + The dict contains lists of tuples, so we can iterate on them + in the service.""" + macs = tuple([body[i].get(lc_ldap.attributs.macAddress.ldap_name, [''])[0] for i in xrange(0, 2)]) + ips = tuple([body[i].get(lc_ldap.attributs.ipHostNumber.ldap_name, [''])[0] for i in xrange(0, 2)]) + hostnames = tuple([body[i].get(lc_ldap.attributs.host.ldap_name, [''])[0] for i in xrange(0, 2)]) # Régénération du DHCP : if not macs[0]: diff --git a/gestion/trigger/parsers/firewall.py b/gestion/trigger/parsers/firewall.py index 3e51f906..bf47c310 100644 --- a/gestion/trigger/parsers/firewall.py +++ b/gestion/trigger/parsers/firewall.py @@ -14,22 +14,24 @@ import lc_ldap.attributs from gestion.trigger.host import record_parser @record_parser(lc_ldap.attributs.macAddress.ldap_name, lc_ldap.attributs.ipHostNumber.ldap_name) -def send_mac_ip(body, diff): +def send_mac_ip(ob_id, body, diff): """Computes mac_ip data to send from body and diff + Body is a couple of two dicts (before, after) + """ - macs = tuple([body[i].get(lc_ldap.attributs.macAddress.ldap_name, [''])[0] for i in xrange(1, 3)]) - ips = tuple([body[i].get(lc_ldap.attributs.ipHostNumber.ldap_name, [''])[0] for i in xrange(1, 3)]) + macs = tuple([body[i].get(lc_ldap.attributs.macAddress.ldap_name, [''])[0] for i in xrange(0, 2)]) + ips = tuple([body[i].get(lc_ldap.attributs.ipHostNumber.ldap_name, [''])[0] for i in xrange(0, 2)]) # Mise à jour du parefeu mac_ip if not macs[0]: # Création d'une nouvelle machine. - fw = {'add': [(macs[1], ips[1])]} + fw_dict = {'add': [(macs[1], ips[1])]} elif not macs[1]: # Destruction d'une machine. - fw = {'delete': [(macs[0], ips[0])]} + fw_dict = {'delete': [(macs[0], ips[0])]} else: # Mise à jour. - fw = {'update': [(macs[0], ips[0], macs[1], ips[1])]} - return ("firewall", ("mac_ip", fw)) + fw_dict = {'update': [(macs[0], ips[0], macs[1], ips[1])]} + return ("firewall", ("mac_ip", fw_dict)) diff --git a/gestion/trigger/producer.py b/gestion/trigger/producer.py index df09489e..c9796faa 100644 --- a/gestion/trigger/producer.py +++ b/gestion/trigger/producer.py @@ -21,7 +21,7 @@ import cranslib.clogger as clogger # Trigger features import gestion.config.trigger as trigger_config -logger = clogger.CLogger("trigger", "event", trigger_config.log_level, trigger_config.debug) +logger = clogger.CLogger("trigger", "EventProducer", trigger_config.log_level, trigger_config.debug) class EventProducer(cmb.BasicProducer): """ diff --git a/gestion/trigger/readme.fr b/gestion/trigger/readme.fr index 6e4f5ce7..b6483bf0 100644 --- a/gestion/trigger/readme.fr +++ b/gestion/trigger/readme.fr @@ -1,11 +1,9 @@ Auteur : PEB -Date : 14/07/2014 +Date : 09/03/2015 Licence : GPLv3 -Documentation succincte de trigger -================================== - -Tous les fichiers sont renseignés depuis /usr/scripts. +What the fuck is happening? +=========================== Trigger est une sorte de librairie de remplacement de generate et des services dans la base LDAP, qui fonctionnent avec bien trop de délai. @@ -13,6 +11,18 @@ dans la base LDAP, qui fonctionnent avec bien trop de délai. Trigger est le fruit d'une longue et intelligente (quelle modestie) réflexion, et donc nous allons ici décrire son fonctionnement. +Mise à jour LDAP : the fuck is happening? +========================================= + +Le binding envoit un tuple contenant en première entrée un hash, en deuxième entrée +un dico contenant les attributs avant modif par le binding, en troisième entrée un +dico contenant les attributs après modif, en quatrième entrée des données additionnelles +(inchangées durant tout le processing). + +Documentation succincte de trigger +================================== + +Tous les fichiers sont renseignés depuis /usr/scripts. * gestion/trigger/trigger.py est un fichier python qui importe un consumer de la librairie cmb. Il marche de manière asynchrone, c'est-à-dire qu'il attend et traîte les messages un par un. Dans gestion/config/trigger.py, il y a la liste @@ -21,10 +31,11 @@ et donc nous allons ici décrire son fonctionnement. qu'il doit importer. Par exemple, sur l'hôte dhcp, le seul service présent est dhcp, et donc trigger va aller chercher gestion/trigger/service/dhcp.py, et travailler avec. - * gestion/trigger/trigger.py importe une méthode trigger depuis - gestion/trigger/host.py. Cette méthode permet d'aller puiser dans une factory - portant le nom TriggerFactory les références vers les services utiles. Cela - permet ensuite de les régénérer à la volée. + * gestion/trigger/trigger.py importe des services, qui sont dans le dossier + services, et eux importent une méthode depuis gestion/trigger/host.py, qui leur + permet d'enregistrer des triggers. Cette méthode permet d'aller puiser dans une + factory portant le nom TriggerFactory les références vers les services utiles. + Cela permet ensuite de les régénérer à la volée. * Le dossier gestion/trigger/services contient la liste des services existants pour trigger. Le fonctionnement des services sera détaillé ci-après. @@ -32,56 +43,56 @@ et donc nous allons ici décrire son fonctionnement. Fonctionnement des services =========================== -"Un service est une classe qui ne sera jamais instanciée" +Un service est un fichier dans le dossier gestion/trigger/services. Il contient +une fonction décorée avec record_service. C'est une fonction qui sera appelée quand +trigger recevra une demande sur un serveur fournissant ledit service. -Un service est la donnée dans un fichier d'une classe portant le nom du fichier -(et donc du service). La casse dans le nom de la classe n'importe pas. Cette -classe hérite de BasicService, une classe définie dans -gestion/trigger/services/service.py. Cette classe s'appuie sur la métaclasse -MetaService pour se construire, ce qui permet d'établir un certain nombre de -liens entre les méthodes d'une classe représentant un service et des attributs -de lc_ldap que l'on souhaite monitorer. La métaclasse et l'ensemble des liens -susmentionnés n'ont d'intérêt que pour la partie "transcription des modifs de la -base LDAP dans un langage compréhensible par les services". - -Enfin, tout service contient une méthode regen prévue pour régénérer ledit -service. - -Les services peuvent ensuite contenir autant de méthodes que souhaitées, dans la -mesure où se sont des méthodes de classe ou statiques. - -La variable faisant le lien entre les attributs ldap à monitorer et les -fonctions à appeler pour transcrire les changements s'appelle changes_trigger. -C'est un dictionnaire dont les clefs sont le nom des attributs ldap à -surveiller, et les valeurs des tuples contenant les noms des fonctions à -appeler en cas de changement. - -Ces fonctions devront toujours avoir le prototype suivant : - @classmethod - def toto(cls, body, diff): -où body et diff sont gérés et fournis tels quels par le service event. body est -un 3-tuple contenant le dn de l'objet ldap modifié, la liste des clefs avant -modification, et celle après. diff est un dictionnaire de différences calculé -entre body[1] et body[2]. +Pour que civet sache si un service doit être régénéré, et donc qu'il lui envoie +un message, il faut définir un parser. Ces parsers sont contenus dans +gestion/trigger/parsers/, et portent le nom du service associé. Ils contiennent +au moins une fonction décorée avec record_parser (dont les arguments sont des +attributs ldap à surveiller). Quand civet reçoit des modifs des bindings, il regarde +pour chaque attribut ayant changé s'ils sont surveillés par des parsers, et le cas +échéant demande la régénération des services associés. Ajouter un nouveau service ========================== Pour ajouter un service, il faut créer un fichier adapté dans trigger/services/, -puis, définir une classe héritant de BasicService, et respecter quelques règles -primordiales. +et un dans trigger/parsers/. Il faut écrire des fonctions adaptées (le nom est libre), +par exemple, pour un parser : -Premièrement, ce service sera importé sur chaque machine où il est configuré -pour fonctionner, et sur civet dans event.py. Pensez donc une fois le tout -configuré à relancer trigger sur civet, et à vérifier que ça marche. La variable -de configuration debug dans gestion/config/trigger.py est là pour aider. Parmi -les choses importantes, l'idéal est d'avoir des dépendances les plus paresseuses -possibles d'un point de vue évaluation. Ainsi, civet qui ne fait qu'importer le -fichier et utiliser les fonctions d'analyse listées dans changes_trigger peut -éviter de jouer avec ce qui ne le concerne pas. +{{{ +@record_parser(lc_ldap.attributs.macAddress.ldap_name, lc_ldap.attributs.ipHostNumber.ldap_name) +def send_mac_ip(body, diff): +}}} -Ensuite, il faut absolument une méthode regen, et définir changes_trigger. (un -dict vide convient) +body est le message reçu par civet sans transformation. diff est le diff calculé +à la volée. Le nom de la fonction n'est pas important. Le décorateur prend les +noms d'attributs à surveiller en paramètre. La fonction doit retourner un tuple +dont le premier élément est le nom du service à régénérer (par exemple, "dhcp"), +et le second les choses que le service devra lire et gérer pour se régénérer. + +Pour un service, voici un exemple : + +{{{ +@record_service +def dhcp(body=None): +}}} + +body contient le "body" construit dans un parseur. La fonction est décorée, et +son nom est stocké dans la TriggerFactory. Comme souligné précédemment, le nom +de la fonction est important, au même titre que le nom des fichiers dans +trigger/parsers et triggers/services. + +Il faut ensuite référencer le service dans config/trigger.py pour les serveurs +où il est important, et relancer trigger sur ces machines. Lors des tests, il ne +faut pas hésiter à passer trigger en debug dans le fichier config/trigger.py. + +Parmi les choses importantes, l'idéal est d'avoir des dépendances les plus +paresseuses possibles d'un point de vue évaluation. Ainsi, civet qui ne fait +qu'importer le fichier et utiliser les fonctions d'analyse listées dans +changes_trigger peut éviter de jouer avec ce qui ne le concerne pas. Enfin, si vous avez des questions, posez-les avant, pas après. @@ -94,11 +105,10 @@ trigger-*-nomduservice. Un service spécial ================== -civet est un hôte spécial, qui gère un service spécial : le transcripteur. Le -transcripteur est le service event, dans gestion/trigger/services/event.py, -qui reçoit des messages sur la queue trigger-civet-event. C'est lui qui, -fonction des messages reçus, les répartis tous vers les autres queues avec -clef de routage idoine. +Le service event est celui qui utilise les parseurs pour savoir quels services +doivent être régénérés. Quand il reçoit le body, il fait un calcul des différences +entre body[1] et body[2] (les deux dicos), et fournit ces différences aux parseurs, +qui lui rendent des messages à envoyer. L'intérêt est d'assurer une indépendance maximale entre binding ldap et la librairie trigger : le binding doit juste envoyer avec clef de routage diff --git a/gestion/trigger/services/ack.py b/gestion/trigger/services/ack.py new file mode 100644 index 00000000..f10a17ab --- /dev/null +++ b/gestion/trigger/services/ack.py @@ -0,0 +1,50 @@ +#!/bin/bash /usr/scripts/python.sh +# -*- coding: utf-8 -*- +# +# Trigger library, designed to send events messages. +# +# Author : Pierre-Elliott Bécue +# License : GPLv3 +# Date : 10/03/2015 + +""" +This service (event) is designed to receive any modification done on LDAP +database, and to make a correct diff between former and later object in order +to guess which services has to be updated. +""" + +# Trigger features +import gestion.config.trigger as trigger_config +from gestion.trigger.host import record_service +from gestion.trigger.services.event import EventTracker, trigger_send # really useful EventList ? + +# Clogger +import cranslib.clogger as clogger + +logger = clogger.CLogger("trigger", "ack", trigger_config.log_level, trigger_config.debug) + +@record_service(ack=False) +def ack(ob_id, service_name): + """Ack when something has been done. + + Removes the acked thing from + """ + + logger.info("Received message %r…", (ob_id, service_name)) + + todo = EventTracker.ack(ob_id, service_name) + + # if todo is None, then we have finished a list, or emptied + # EventTracker's content. + if todo is None: + todo = EventTracker.get_off_record(ob_id) + logger.info("Emptied one list in the chain %r. Trying to continue. Got %r", ob_id, todo) + + if todo: + for msg in todo: + logger.info("Sending %r on the road \\o/", msg) + # XXX - uncomment this when in production + trigger_send(*msg) + else: + logger.info("Aaaaaand, nothing.") + diff --git a/gestion/trigger/services/dhcp.py b/gestion/trigger/services/dhcp.py index 249e31b4..00683e66 100644 --- a/gestion/trigger/services/dhcp.py +++ b/gestion/trigger/services/dhcp.py @@ -42,7 +42,7 @@ else: ldap_conn = None @record_service -def dhcp(body=None): +def dhcp(ob_id, body=None): """Regenerates dhcp service taking body into account. """ diff --git a/gestion/trigger/services/event.py b/gestion/trigger/services/event.py index 0c8113b8..33fc979b 100644 --- a/gestion/trigger/services/event.py +++ b/gestion/trigger/services/event.py @@ -5,7 +5,7 @@ # # Author : Pierre-Elliott Bécue # License : GPLv3 -# Date : 18/05/2014 +# Date : 10/03/2015 """ This service (event) is designed to receive any modification done on LDAP @@ -17,8 +17,6 @@ import importlib import itertools import traceback -import gestion.secrets_new as secrets - # Trigger features import gestion.config.trigger as trigger_config from gestion.trigger.host import TriggerFactory, record_service @@ -27,26 +25,145 @@ from gestion.trigger.producer import EventProducer # Clogger import cranslib.clogger as clogger -# lc_ldap -import lc_ldap.attributs +LOGGER = clogger.CLogger("trigger", "event", trigger_config.log_level, trigger_config.debug) -logger = clogger.CLogger("trigger", "event", trigger_config.log_level, trigger_config.debug) +PRODUCER = EventProducer("trigger.civet") -services = [] +SERVICES = [] for config_service in trigger_config.all_services: try: - services.append(importlib.import_module("gestion.trigger.parsers.%s" % (config_service,))) - except Exception as e: - logger.critical("Fatal : import of %s failed, see following traceback. %s", config_service, traceback.format_exc()) + SERVICES.append(importlib.import_module("gestion.trigger.parsers.%s" % (config_service,))) + except Exception: + LOGGER.critical("Fatal : import of %r failed, see following traceback. %r", config_service, traceback.format_exc()) -def diff_o_matic(body=()): +class EventList(list): + """List which is designed to grow up when one try to acces an element out of + range""" + + def __fill(self, index): + """Fills the intermediates indexes if needed""" + while len(self) <= index: + self.append({}) + + def __getitem__(self, index): + """Gets the item after filling if needed""" + self.__fill(index) + return super(EventList, self).__getitem__(index) + + def __setitem__(self, index, value): + """Sets the item after filling if needed""" + self.__fill(index) + return super(EventList, self).__setitem__(index, value) + +class EventTracker(object): + """Stores events actions from event service. It allows to track all services + regeneration, and to chain services execution when needed. To avoid data loss + during process, the EventTracker duplicates its own data in a file. + + This file will be synced, but, by default, RAM data is considered as the + current state of the factory. A sanity check method allows to guess if the + file should be loaded to RAM.""" + + event_chain = {} + + @classmethod + def record_event_to_chain(cls, ob_id, pos, service_name, service_data): + """Records a chain of events. args contains a tuple which arguments + is a list of dicts. ob_id is a unique identifier of the current chain. + + Each dicts points to a message to send independently via trigger. + + args should look like : + ([("dhcp", {'update':...}, ob_id), (...., ob_id)], [...])""" + + # If no entry, we create an EventList. + if ob_id not in cls.event_chain: + cls.event_chain[ob_id] = EventList() + + # If service is already there, we are facing a double setting of service, which is not + # normal. + if service_name in cls.event_chain[ob_id][pos]: + LOGGER.critical("[%r] Weird. event_chain[%r][%r][%r] set to %r, but asking me to set it to %r.", ob_id, ob_id, pos, service_name, cls.event_chain[ob_id][pos][service_name], service_data) + else: + LOGGER.debug("[%r] Adding %r to EventTracker.event_chain[%r][%r][%r].", ob_id, service_data, ob_id, pos, service_name) + cls.event_chain[ob_id][pos][service_name] = service_data + + @classmethod + def check_empty(cls, ob_id): + """Checks if cls.event_chain[ob_id] is empty""" + if ob_id not in cls.event_chain: + LOGGER.debug("[%r] EventTracker.cls_event_chain free of %r.", ob_id, ob_id) + return True + + if len(cls.event_chain[ob_id]) == 0: + cls.event_chain.pop(ob_id) + LOGGER.debug("[%r] EventTracker.cls_event_chain free of %r.", ob_id, ob_id) + return True + + @classmethod + def get_off_record(cls, ob_id): + """Expedits a formatted record""" + # We will pop items from event_chain[ob_id] + # untill we have a non-empty dict. + if cls.check_empty(ob_id): + return [] + + dico = False + while not dico: + if len(cls.event_chain[ob_id]) > 0: + dico = cls.event_chain[ob_id][0] + # Should not happen. + if not dico: + cls.event_chain[ob_id].pop(0) + else: + # If we are at the end of the list + dico = True + + # then, we have nothing to do. + if dico == True: + return [] + + if isinstance(bool, dico): + dico = {} + + return [ + (ob_id, service_name, service_data) + for (service_name, service_data) in dico.iteritems() + ] + + @classmethod + def ack(cls, ob_id, service_name): + """Removes service_name from the event_chain, since + everything is ok.""" + + if cls.check_empty(ob_id): + LOGGER.info("[%r] Ack for %r, but nothing to ack...", ob_id, service_name) + return None + + if service_name not in cls.event_chain[ob_id][0]: + LOGGER.info("[%r] Ack for %r, but nothing in event_chain[%r][0]...", ob_id, service_name, ob_id) + return None + + # Remove the service_name from the dict. + cls.event_chain[ob_id][0].pop(service_name) + + # If dict is empty, we drop it. + if not cls.event_chain[ob_id][0]: + cls.event_chain[ob_id].pop(0) + return None + + # If the list is empty, we drop it. + if not cls.event_chain[ob_id]: + cls.event_chain.pop(ob_id) + return None + + return True + +def diff_o_matic(before, after): """Fait un diff exhaustif des deux dicos""" - if not body: - raise ValueError("diff_o_matic received %r as an argument, which is unusable." % (body,)) - - before = dict(body[1]) or {} - after = dict(body[2]) or {} + if not before and not after: + raise ValueError("diff_o_matic received %r as an argument, which is unusable." % ((before, after),)) # set(dico) retourne un set de dico.keys() keys_pool = set(before).union(set(after)) @@ -96,25 +213,28 @@ def compare_lists(list1, list2): return moins, plus -@record_service -def event(body=()): +@record_service(ack=False) +def event(ob_id, before, after, more): """When any event arrives on trigger-civet-event, this method is called and designed to transcript the body (ldap data) in something usable for the services. Afterwards, it sends these transcripts on the good way using routing_key. - body is a 5-tuple, containing timestamp, the former state of the object + body is a 4-tuple, containing hash, the former state of the object (a simple dict), and the later state, a dict with additionnal (but - non-LDAP) data and a dict of step indicators (an int). The data are - non-binding-dependant. + non-LDAP) data. The data are non-binding-dependant. A new object has body[1] to None, a deleted one has body[2] to None. """ - logger.info("Received message %r…", body) + LOGGER.info("[%r] Received message %r…", ob_id, (ob_id, before, after, more)) - diff = diff_o_matic(body) + # Hey, I'll follow you 'till your end. + diff = diff_o_matic(before, after) + + # Some debug if needed. + LOGGER.debug("[%r] in service event, diff is %r.", ob_id, diff) # Now, diff is a dict containing attributes which has been modified. # diff['macAddress'] could look like (['aa:bb:cc:dd:ee:fg'], ['aa:bb:cc:dd:ee:ff']), @@ -140,15 +260,23 @@ def event(body=()): # Compute the whole list of messages. This returns a list of 2-tuples. We remove None messages, which # may occur, since there is chained-services. - msg_to_send = [msg for msg in [function(body, diff) for function in functions] if msg is not None] + msgs_to_send = [msg for msg in [function(ob_id, (before, after), diff) for function in functions] if msg is not None] + LOGGER.debug("[%r] in service event, messages are %r.", ob_id, msgs_to_send) - for msg in msg_to_send: - logger.info("Sending %r on the road \\o/", msg) + for msg in msgs_to_send: + service_name, body, pos = msg[0], msg[1], msg[2] + LOGGER.info("[%r] Adding %r on the EventTracker", ob_id, (pos, service_name, body)) + EventTracker.record_event_to_chain(ob_id, pos, service_name, body) + + # Sends the first wave on the way. + todo = EventTracker.get_off_record(ob_id) + for msg in todo: + LOGGER.info("Sending %r on the road \\o/", msg) # XXX - uncomment this when in production trigger_send(*msg) -def trigger_send(routing_key, body, orig=None): - sender = EventProducer("trigger.civet") - if orig is not None: - body = (body, orig) - sender.send_message("trigger.%s" % (routing_key,), body) +def trigger_send(ob_id, routing_key, body): + """Sends a message via civet/trigger""" + + body = tuple([ob_id] + [elem for elem in body]) + PRODUCER.send_message("trigger.%s" % (routing_key,), body) diff --git a/gestion/trigger/services/firewall.py b/gestion/trigger/services/firewall.py index bf20ad37..033e7563 100644 --- a/gestion/trigger/services/firewall.py +++ b/gestion/trigger/services/firewall.py @@ -42,7 +42,7 @@ def fwrecord(fun): FwFactory.register(fun.func_name, fun) @record_service -def firewall(body=()): +def firewall(ob_id, body=()): """Regens the specific service """ diff --git a/gestion/trigger/services/service.py b/gestion/trigger/services/service.py deleted file mode 100644 index b84a5997..00000000 --- a/gestion/trigger/services/service.py +++ /dev/null @@ -1,96 +0,0 @@ -#!/bin/bash /usr/scripts/python.sh -# -*- coding: utf-8 -*- -# -# This module is NOT used anymore (will be buried soon). - -""" -This module provides a basic service class to other services. It should *NOT* -be referenced in configuration of trigger. - -It is not used anymore. -""" - -import collections - -import cranslib.clogger as clogger -import gestion.config.trigger as trigger_config -from gestion.trigger.host import TriggerFactory - -logger = clogger.CLogger("trigger", "service", "debug", trigger_config.debug) - -class MetaService(type): - """Metaclass designed to handle all services. - - """ - - def __new__(mcs, cname, cpar, cattrs): - """Method producing the new class itself - At first, I wanted to put the changes_trigger modification in __new__, - using direct modification of cattrs['changes_trigger'] by pointing the - required methods (classmethods). The problem was that these methods were - bound at the return of type.__new__, for a reason I could not exactly - explain. - - I found a workaround using __init__, so the point would be to remove - __new__, and directly use type.__new__, but this comment seems useful, - so __new__ will survive. - - """ - return super(MetaService, mcs).__new__(mcs, cname, cpar, cattrs) - - def __init__(cls, cname, cpar, cattrs): - """Used to register the generated classes in TriggerFactory, and modify the behavior of - changes_trigger by pointing functions instead of their names. This allows to cancel any - positional requirement in class definition. - - Do NEVER return something in __init__ function. - - """ - if not cname == "BasicService": - TriggerFactory.register(cname.lower(), cls) - changes_trigger = collections.defaultdict(list) - # I love getattr - text_changes_trigger = getattr(cls, "changes_trigger", {}) - for (ldap_attr_name, funcs_name) in text_changes_trigger.items(): - for func_name in funcs_name: - # I really love getattr. - get = getattr(cls, func_name, None) - if get is None: - logger.critical("Fatal, bad function (%s) reference in %s.", func_name, cname) - continue - changes_trigger[ldap_attr_name].append(get) - setattr(cls, "changes_trigger", changes_trigger) - super(MetaService, cls).__init__(cname, cpar, cattrs) - -class BasicService(object): - """Basic service handler. Other services should inherit fron this one. - - """ - - __metaclass__ = MetaService - - changes_trigger = {} - - @classmethod - def get_changes(cls, body, diff): - """Looks for changes and creates messages to send back - - """ - # list of all messages to send. - msg_list = [] - - # lists all functions to call - func_list = set() - for (attrib, functions) in cls.changes_trigger.iteritems(): - if attrib in diff: - func_list.update(functions) - for function in func_list: - msg_list.append(function(body, diff)) - return msg_list - - @classmethod - def regen(cls, body): - """This method is referenced to avoid uncaught exceptions - - """ - pass diff --git a/gestion/trigger/trigger.py b/gestion/trigger/trigger.py index 5e7a6f65..a1140a3a 100755 --- a/gestion/trigger/trigger.py +++ b/gestion/trigger/trigger.py @@ -8,7 +8,8 @@ # # Author : Pierre-Elliott Bécue # License : GPLv3 -# Date : 29/04/2014 +# Date : 10/03/2015 +"""Main program for trigger library""" import argparse import cPickle @@ -24,21 +25,21 @@ from gestion.trigger.host import trigger_service import cranslib.clogger as clogger import cmb -hostname = socket.gethostname().split(".")[0] -logger = clogger.CLogger("trigger", "trigger", trigger_config.log_level, trigger_config.debug) +HOSTNAME = socket.gethostname().split(".")[0] +LOGGER = clogger.CLogger("trigger", "trigger", trigger_config.log_level, trigger_config.debug) # Ce bloc contient le peu de "magie" de la librairie, on utilise les services listés dans config/trigger.py # comme référence. Pour éviter toute redondance, la commande importe donc les services utiles suivant cette -# config. Leur import ne sert pas directemet, il permet juste de peupler la TriggerFactory contenue dans +# config. Leur import ne sert pas directement, il permet juste de peupler la TriggerFactory contenue dans # gestion/trigger/host.py. # Il faut donc bien importer ces fichiers, mais ils ne sont pas utilisés directement ensuite. import importlib -services = {} -for config_service in trigger_config.services[hostname]: +SERVICES = {} +for config_service in trigger_config.services[HOSTNAME]: try: - services[config_service] = importlib.import_module("gestion.trigger.services.%s" % (config_service,)) + SERVICES[config_service] = importlib.import_module("gestion.trigger.services.%s" % (config_service,)) except Exception as e: - logger.critical("Fatal : import of %s failed, see following traceback. %s", config_service, traceback.format_exc()) + LOGGER.critical("Fatal : import of %s failed, see following traceback. %s", config_service, traceback.format_exc()) class EvenementListener(cmb.AsynchronousConsumer): """ @@ -64,18 +65,20 @@ class EvenementListener(cmb.AsynchronousConsumer): #origin = properties.app_id #message_id = properties.message_id body = cPickle.loads(body) - logger.info('Received message # %s from %s: %s', + LOGGER.info('Received message # %s from %s: %s', basic_deliver.delivery_tag, properties.app_id, body) + # On tente d'invoquer le trigger attendu, à l'aide de la méthode trigger # about contient le nom de la fonction à appeler, body lui est filé en argument. try: - if about in trigger_config.services[hostname]: - trigger_service(about)(body) + if about in trigger_config.services[HOSTNAME]: + trigger_service(about)(*body) else: raise AttributeError except AttributeError: - logger.warning('No suitable trigger found for message # %s from %s: %s on host %s. Discarding it.', - basic_deliver.delivery_tag, properties.app_id, body, hostname) + LOGGER.warning('No suitable trigger found for message # %s from %s: %s on host %s. Discarding it.', + basic_deliver.delivery_tag, properties.app_id, body, HOSTNAME) + self.acknowledge_message(basic_deliver.delivery_tag) def run(self): @@ -83,54 +86,56 @@ class EvenementListener(cmb.AsynchronousConsumer): starting the IOLoop to block and allow the SelectConnection to operate. """ - logger.info("""Crans Message Broker + LOGGER.info("""Crans Message Broker +--------------------------------------------+ | Welcome on Trigger | +--------------------------------------------+""") self._connection = self.connect() - for service in trigger_config.services[hostname]: - self.add_queue("trigger-%s-%s" % (hostname, service), "trigger.%s" % (service,)) + for service in trigger_config.services[HOSTNAME]: + self.add_queue("trigger-%s-%s" % (HOSTNAME, service), "trigger.%s" % (service,)) self._connection.ioloop.start() def daemonize(): + """Runs the script in "background".""" trigger_password = secrets.get('rabbitmq_trigger_password') credentials = pika.PlainCredentials(trigger_config.user, trigger_password) listener = EvenementListener(url=trigger_config.master, exchange_name="trigger", exchange_type="topic", port=trigger_config.port, credentials=credentials, ssl=trigger_config.ssl) try: listener.run() except KeyboardInterrupt: - logger.warning("Caught SIGINT, will now go for shutdown.") + LOGGER.warning("Caught SIGINT, will now go for shutdown.") listener.stop() if __name__ == '__main__': # We use a parser to capture all possible arguments designed for one host - parser = argparse.ArgumentParser(description="Initier une régénération de services.", add_help=False) - parser.add_argument('-a', '--all', help="Régénération complète des services sur l'hôte %s." % (hostname,), action="store_true") - parser.add_argument('-d', '--daemon', help="Écouter sur civet en arrière plan.", action="store_true") - parser.add_argument('-h', '--help', help="Affiche ce message et quitte.", action="store_true") + PARSER = argparse.ArgumentParser(description="Initier une régénération de services.", add_help=False) + PARSER.add_argument('-a', '--all', help="Régénération complète des services sur l'hôte %s." % (HOSTNAME,), action="store_true") + PARSER.add_argument('-d', '--daemon', help="Écouter en arrière plan.", action="store_true") + PARSER.add_argument('-h', '--help', help="Affiche ce message et quitte.", action="store_true") + # For each service supposingly managed by host, generate one parser option # Deuxième petit morceau "magique" du code. - for arg_service in trigger_config.services[hostname]: - parser.add_argument('--%s' % (arg_service,), help="Force la régénération du service %s." % (arg_service,), action="store_true") - args = parser.parse_args() + for arg_service in trigger_config.services[HOSTNAME]: + PARSER.add_argument('--%s' % (arg_service,), help="Force la régénération du service %s." % (arg_service,), action="store_true") + ARGS = PARSER.parse_args() - if args.help: - parser.print_help() + if ARGS.help: + PARSER.print_help() sys.exit(0) - elif args.all: + elif ARGS.all: # Regenerates all services availables, don't crash on nonexistant ones - for host_service in trigger_config.services[hostname]: + for host_service in trigger_config.services[HOSTNAME]: try: print affichage.style(" (Ré)Génération du service %s" % (host_service,), "cyan") trigger_service(host_service)(True) except AttributeError: - print "No suitable trigger handle found for service %s on host %s" % (host_service, hostname) - elif args.daemon: + print "No suitable trigger handle found for service %s on host %s" % (host_service, HOSTNAME) + elif ARGS.daemon: # Daemonize the trigger app, in order to listen and execute commands from civet. daemonize() else: # If not all and not daemon, try all services one by one. - for arg_service in trigger_config.services[hostname]: - if getattr(args, arg_service, False) == True: + for arg_service in trigger_config.services[HOSTNAME]: + if getattr(ARGS, arg_service, False) == True: print affichage.style(" (Ré)Génération du service %s" % (arg_service,), "cyan") trigger_service(arg_service)(True)