#!/bin/bash /usr/scripts/python.sh # -*- coding: utf-8 -*- # # Trigger library, designed to send events messages. # # Author : Pierre-Elliott Bécue # License : GPLv3 # Date : 10/03/2015 """ This service (event) is designed to receive any modification done on LDAP database, and to make a correct diff between former and later object in order to guess which services has to be updated. """ import importlib import itertools import traceback # Trigger features import gestion.config.trigger as trigger_config from gestion.trigger.host import TriggerFactory, record_service from gestion.trigger.producer import EventProducer # Clogger import cranslib.clogger as clogger LOGGER = clogger.CLogger("trigger", "event", trigger_config.log_level, trigger_config.debug) PRODUCER = EventProducer("trigger.civet") SERVICES = [] for config_service in trigger_config.all_services: try: SERVICES.append(importlib.import_module("gestion.trigger.parsers.%s" % (config_service,))) except Exception: LOGGER.critical("Fatal : import of %r failed, see following traceback. %r", config_service, traceback.format_exc()) class EventList(list): """List which is designed to grow up when one try to acces an element out of range""" def __fill(self, index): """Fills the intermediates indexes if needed""" while len(self) <= index: self.append({}) def __getitem__(self, index): """Gets the item after filling if needed""" self.__fill(index) return super(EventList, self).__getitem__(index) def __setitem__(self, index, value): """Sets the item after filling if needed""" self.__fill(index) return super(EventList, self).__setitem__(index, value) class EventTracker(object): """Stores events actions from event service. It allows to track all services regeneration, and to chain services execution when needed. To avoid data loss during process, the EventTracker duplicates its own data in a file. This file will be synced, but, by default, RAM data is considered as the current state of the factory. A sanity check method allows to guess if the file should be loaded to RAM.""" event_chain = {} @classmethod def record_event_to_chain(cls, ob_id, pos, service_name, service_data): """Records a chain of events. args contains a tuple which arguments is a list of dicts. ob_id is a unique identifier of the current chain. Each dicts points to a message to send independently via trigger. args should look like : ([("dhcp", {'update':...}, ob_id), (...., ob_id)], [...])""" # If no entry, we create an EventList. if ob_id not in cls.event_chain: cls.event_chain[ob_id] = EventList() # If service is already there, we are facing a double setting of service, which is not # normal. if service_name in cls.event_chain[ob_id][pos]: LOGGER.critical("[%r] Weird. event_chain[%r][%r][%r] set to %r, but asking me to set it to %r.", ob_id, ob_id, pos, service_name, cls.event_chain[ob_id][pos][service_name], service_data) else: LOGGER.debug("[%r] Adding %r to EventTracker.event_chain[%r][%r][%r].", ob_id, service_data, ob_id, pos, service_name) cls.event_chain[ob_id][pos][service_name] = service_data @classmethod def check_empty(cls, ob_id): """Checks if cls.event_chain[ob_id] is empty""" if ob_id not in cls.event_chain: LOGGER.debug("[%r] EventTracker.cls_event_chain free of %r.", ob_id, ob_id) return True if len(cls.event_chain[ob_id]) == 0: cls.event_chain.pop(ob_id) LOGGER.debug("[%r] EventTracker.cls_event_chain free of %r.", ob_id, ob_id) return True @classmethod def get_off_record(cls, ob_id): """Expedits a formatted record""" # We will pop items from event_chain[ob_id] # untill we have a non-empty dict. if cls.check_empty(ob_id): return [] dico = False while not dico: if len(cls.event_chain[ob_id]) > 0: dico = cls.event_chain[ob_id][0] # Should not happen. if not dico: cls.event_chain[ob_id].pop(0) else: # If we are at the end of the list dico = True # then, we have nothing to do. if dico == True: return [] if isinstance(dico, bool): dico = {} return [ (ob_id, service_name, service_data) for (service_name, service_data) in dico.iteritems() ] @classmethod def ack(cls, ob_id, service_name): """Removes service_name from the event_chain, since everything is ok.""" if cls.check_empty(ob_id): LOGGER.info("[%r] Ack for %r, but nothing to ack...", ob_id, service_name) return None if service_name not in cls.event_chain[ob_id][0]: LOGGER.info("[%r] Ack for %r, but nothing in event_chain[%r][0]...", ob_id, service_name, ob_id) return None # Remove the service_name from the dict. cls.event_chain[ob_id][0].pop(service_name) # If dict is empty, we drop it. if not cls.event_chain[ob_id][0]: cls.event_chain[ob_id].pop(0) return None # If the list is empty, we drop it. if not cls.event_chain[ob_id]: cls.event_chain.pop(ob_id) return None return True def diff_o_matic(before, after): """Fait un diff exhaustif des deux dicos""" if not before and not after: raise ValueError("diff_o_matic received %r as an argument, which is unusable." % ((before, after),)) # set(dico) retourne un set de dico.keys() keys_pool = set(before).union(set(after)) diff = {} for key in keys_pool: if before.has_key(key): if not isinstance(before[key], list): blist = [before[key]] else: blist = list(before[key]) else: blist = [] if after.has_key(key): if not isinstance(after[key], list): alist = [after[key]] else: alist = list(after[key]) else: alist = [] moins, plus = compare_lists(blist, alist) if moins != [] or plus != []: diff[key] = (moins, plus) return diff def compare_lists(list1, list2): """Compare deux listes, retourne deux listes, une avec les données perdues, et une avec les données apparues Insensible à la casse. """ moins, plus = [], [] for elem in [] + list1: try: ind = list2.index(elem.lower()) except ValueError: moins.append(elem) continue list1.remove(elem) list2.pop(ind) plus = plus + list2 return moins, plus @record_service(ack=False) def event(ob_id, before, after, more): """When any event arrives on trigger-civet-event, this method is called and designed to transcript the body (ldap data) in something usable for the services. Afterwards, it sends these transcripts on the good way using routing_key. body is a 4-tuple, containing hash, the former state of the object (a simple dict), and the later state, a dict with additionnal (but non-LDAP) data. The data are non-binding-dependant. A new object has body[1] to None, a deleted one has body[2] to None. """ LOGGER.info("[%r] Received message %r…", ob_id, (ob_id, before, after, more)) # Hey, I'll follow you 'till your end. diff = diff_o_matic(before, after) # Some debug if needed. LOGGER.debug("[%r] in service event, diff is %r.", ob_id, diff) # Now, diff is a dict containing attributes which has been modified. # diff['macAddress'] could look like (['aa:bb:cc:dd:ee:fg'], ['aa:bb:cc:dd:ee:ff']), # where the list on the left is the former value of attributes, and the list on the # right the latter values. # -*- Explain -*- #In [11]: import itertools # #In [12]: a = [[(3, 'lol'), ('7', 3)], [(5, 6), None], [None], [('lol', 'lal')]] # #In [13]: a #Out[13]: [[(3, 'lol'), ('7', 3)], [(5, 6), None], [None], [('lol', 'lal')]] # #In [14]: list(set([message for message in itertools.chain(*a)])) #Out[14]: [('7', 3), (5, 6), None, ('lol', 'lal'), (3, 'lol')] # Only one None from a, since [None, x, y, None] is equivalent for itertools to [x, y] # #In [15]: b = list(set([message for message in itertools.chain(*a) if message is not None])) # #In [16]: b #Out[16]: [('7', 3), (5, 6), ('lol', 'lal'), (3, 'lol')] functions = list(set([function for function in itertools.chain(*[TriggerFactory.get_parser(key) for key in diff]) if function is not None])) LOGGER.debug("[%r] in service event, functions are %r.", ob_id, functions) # Compute the whole list of messages. This returns a list of 2-tuples. We remove None messages, which # should not occcur... But, whatever. msgs_to_send = [msg for msg in [function(ob_id, (before, after), diff) for function in functions] if msg is not None] LOGGER.debug("[%r] in service event, messages are %r.", ob_id, msgs_to_send) for msg in msgs_to_send: service_name, body, pos = msg[0], msg[1], msg[2] LOGGER.info("[%r] Adding %r on the EventTracker", ob_id, (pos, service_name, body)) EventTracker.record_event_to_chain(ob_id, pos, service_name, body) # Sends the first wave on the way. todo = EventTracker.get_off_record(ob_id) for msg in todo: LOGGER.info("Sending %r on the road \\o/", msg) # XXX - uncomment this when in production trigger_send(*msg) def trigger_send(ob_id, routing_key, body): """Sends a message via civet/trigger""" body = tuple([ob_id] + [body]) PRODUCER.send_message("trigger.%s" % (routing_key,), body)