#!/bin/bash /usr/scripts/python.sh # -*- coding: utf-8 -*- # # Trigger library, designed to send events messages. # # Author : Pierre-Elliott Bécue # License : GPLv3 # Date : 18/05/2014 """ This service (event) is designed to receive any modification done on LDAP database, and to make a correct diff between former and later object in order to guess which services has to be updated. """ import cmb import cPickle import pika import importlib import itertools import traceback import gestion.secrets_new as secrets # Trigger features import gestion.config.trigger as trigger_config from gestion.trigger.host import TriggerFactory from gestion.trigger.services.service import BasicService # Clogger import cranslib.clogger as clogger # lc_ldap import lc_ldap.attributs logger = clogger.CLogger("trigger", "event", "debug", trigger_config.debug) services = [] for config_service in trigger_config.all_services: try: services.append(importlib.import_module("gestion.trigger.services.%s" % (config_service,))) except Exception as e: logger.critical("Fatal : import of %s failed, see following traceback. %s", config_service, traceback.format_exc()) class EventProducer(cmb.BasicProducer): """ EventProducer tracker """ def __init__(self, app_id): """Extended """ logger.info("Starting trigger EventProducer program for app %s…", app_id) trigger_password = secrets.get('rabbitmq_trigger_password') credentials = pika.PlainCredentials(trigger_config.user, trigger_password) super(EventProducer, self).__init__(url=trigger_config.master, exchange_name="trigger", app_id=app_id, port=trigger_config.port, credentials=credentials, ssl=trigger_config.ssl) self._connection = self.connect() self.get_chan() def send_message(self, routing_key, body): """Sends basic message with app_id and body """ try: logger.info("Sending message %s with routing_key %s.", body, routing_key) body = cPickle.dumps(body) self._channel.basic_publish(exchange=self._exchange_name, routing_key=routing_key, body=body, properties=pika.BasicProperties( delivery_mode=2, app_id=self._app_id, )) except: print "Failure in trigger.event" raise def announce(self, body): """Feature to send message without giving routing_key """ self.send_message("trigger.event", body) def diff_o_matic(body=()): """Fait un diff exhaustif des deux dicos""" if not body: raise ValueError("diff_o_matic received %r as an argument, which is unusable." % (body,)) before = dict(body[1]) or {} after = dict(body[2]) or {} # set(dico) retourne un set de dico.keys() keys_pool = set(before).union(set(after)) diff = {} for key in keys_pool: if before.has_key(key): if not isinstance(before[key], list): blist = [before[key]] else: blist = list(before[key]) else: blist = [] if after.has_key(key): if not isinstance(after[key], list): alist = [after[key]] else: alist = list(after[key]) else: alist = [] moins, plus = compare_lists(blist, alist) if moins != [] or plus != []: diff[key] = (moins, plus) return diff def compare_lists(list1, list2): """Compare deux listes, retourne deux listes, une avec les données perdues, et une avec les données apparues Insensible à la casse. """ moins, plus = [], [] for elem in [] + list1: try: ind = list2.index(elem.lower()) except ValueError: moins.append(elem) continue list1.remove(elem) list2.pop(ind) plus = plus + list2 return moins, plus class Event(BasicService): """Event service class. It extends BasicService, but should not implement any change trigger, since it's this service which is designed to call change triggers of other services. """ @classmethod def get_changes(cls, body, diff): """Compute changes from diff""" return [None] @classmethod def regen(cls, body=()): """When any event arrives on trigger-civet-event, this method is called and designed to transcript the body (ldap data) in something usable for the services. Afterwards, it sends these transcripts on the good way using routing_key. body is a 3-tuple, containing LDAP dn, the former state of the object (a simple dict), and the later state. The data are non-binding-dependant. A new object has body[1] to None, a deleted one has body[2] to None. """ logger.info("Received message %r…", body) diff = diff_o_matic(body) # Now, diff is a dict containing attributes which has been modified. # diff['macAddress'] could look like (['aa:bb:cc:dd:ee:fg'], ['aa:bb:cc:dd:ee:ff']), # where the list on the left is the former value of attributes, and the list on the # right the latter values. # -*- Explain -*- #In [11]: import itertools # #In [12]: a = [[(3, 'lol'), ('7', 3)], [(5, 6), None], [None], [('lol', 'lal')]] # #In [13]: a #Out[13]: [[(3, 'lol'), ('7', 3)], [(5, 6), None], [None], [('lol', 'lal')]] # #In [14]: list(set([message for message in itertools.chain(*a)])) #Out[14]: [('7', 3), (5, 6), None, ('lol', 'lal'), (3, 'lol')] # Only one None from a, since [None, x, y, None] is equivalent for itertools to [x, y] # #In [15]: b = list(set([message for message in itertools.chain(*a) if message is not None])) # #In [16]: b #Out[16]: [('7', 3), (5, 6), ('lol', 'lal'), (3, 'lol')] msg_to_send = [message for message in itertools.chain(*[service.get_changes(body, diff) for service in TriggerFactory.get_services()]) if message is not None] for msg in msg_to_send: logger.info("Sending %r on the road \\o/", msg) # XXX - uncomment this when in production # trigger_send(*msg) def trigger_send(routing_key, body): sender = EventProducer("civet") sender.send_message("trigger.%s" % (routing_key,), body)