diff --git a/gestion/trigger/producer.py b/gestion/trigger/producer.py new file mode 100644 index 00000000..df09489e --- /dev/null +++ b/gestion/trigger/producer.py @@ -0,0 +1,63 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# +# Event producer for trigger lib +# +# Author : Pierre-Elliott Bécue +# License : GPLv3 +# Date : 18/05/2014 + +""" +This is the message producer. It's standalone lib. +""" + +import cmb +import cPickle +import pika + +# Clogger +import cranslib.clogger as clogger + +# Trigger features +import gestion.config.trigger as trigger_config + +logger = clogger.CLogger("trigger", "event", trigger_config.log_level, trigger_config.debug) + +class EventProducer(cmb.BasicProducer): + """ + EventProducer tracker + """ + def __init__(self, app_id): + """Extended + + """ + logger.info("Starting trigger EventProducer program for app %s…", app_id) + trigger_password = secrets.get('rabbitmq_trigger_password') + credentials = pika.PlainCredentials(trigger_config.user, trigger_password) + super(EventProducer, self).__init__(url=trigger_config.master, exchange_name="trigger", app_id=app_id, port=trigger_config.port, credentials=credentials, ssl=trigger_config.ssl) + self._connection = self.connect() + self.get_chan() + + def send_message(self, routing_key, body): + """Sends basic message with app_id and body + + """ + try: + logger.info("Sending message %s with routing_key %s.", body, routing_key) + body = cPickle.dumps(body) + self._channel.basic_publish(exchange=self._exchange_name, + routing_key=routing_key, + body=body, + properties=pika.BasicProperties( + delivery_mode=2, + app_id=self._app_id, + )) + except: + print "Failure in trigger.event" + raise + + def announce(self, body): + """Feature to send message without giving routing_key + + """ + self.send_message("trigger.event", body) diff --git a/gestion/trigger/services/event.py b/gestion/trigger/services/event.py index da20320b..0c8113b8 100644 --- a/gestion/trigger/services/event.py +++ b/gestion/trigger/services/event.py @@ -13,9 +13,6 @@ database, and to make a correct diff between former and later object in order to guess which services has to be updated. """ -import cmb -import cPickle -import pika import importlib import itertools import traceback @@ -24,7 +21,8 @@ import gestion.secrets_new as secrets # Trigger features import gestion.config.trigger as trigger_config -from gestion.trigger.host import TriggerFactory, record_service, record_parser +from gestion.trigger.host import TriggerFactory, record_service +from gestion.trigger.producer import EventProducer # Clogger import cranslib.clogger as clogger @@ -41,45 +39,6 @@ for config_service in trigger_config.all_services: except Exception as e: logger.critical("Fatal : import of %s failed, see following traceback. %s", config_service, traceback.format_exc()) -class EventProducer(cmb.BasicProducer): - """ - EventProducer tracker - """ - def __init__(self, app_id): - """Extended - - """ - logger.info("Starting trigger EventProducer program for app %s…", app_id) - trigger_password = secrets.get('rabbitmq_trigger_password') - credentials = pika.PlainCredentials(trigger_config.user, trigger_password) - super(EventProducer, self).__init__(url=trigger_config.master, exchange_name="trigger", app_id=app_id, port=trigger_config.port, credentials=credentials, ssl=trigger_config.ssl) - self._connection = self.connect() - self.get_chan() - - def send_message(self, routing_key, body): - """Sends basic message with app_id and body - - """ - try: - logger.info("Sending message %s with routing_key %s.", body, routing_key) - body = cPickle.dumps(body) - self._channel.basic_publish(exchange=self._exchange_name, - routing_key=routing_key, - body=body, - properties=pika.BasicProperties( - delivery_mode=2, - app_id=self._app_id, - )) - except: - print "Failure in trigger.event" - raise - - def announce(self, body): - """Feature to send message without giving routing_key - - """ - self.send_message("trigger.event", body) - def diff_o_matic(body=()): """Fait un diff exhaustif des deux dicos""" @@ -144,8 +103,10 @@ def event(body=()): the services. Afterwards, it sends these transcripts on the good way using routing_key. - body is a 3-tuple, containing LDAP dn, the former state of the object - (a simple dict), and the later state. The data are non-binding-dependant. + body is a 5-tuple, containing timestamp, the former state of the object + (a simple dict), and the later state, a dict with additionnal (but + non-LDAP) data and a dict of step indicators (an int). The data are + non-binding-dependant. A new object has body[1] to None, a deleted one has body[2] to None. @@ -176,13 +137,18 @@ def event(body=()): #In [16]: b #Out[16]: [('7', 3), (5, 6), ('lol', 'lal'), (3, 'lol')] functions = list(set([function for function in itertools.chain(*[TriggerFactory.get_parser(key) for key in diff]) if function is not None])) - msg_to_send = [function(body, diff) for function in functions] + + # Compute the whole list of messages. This returns a list of 2-tuples. We remove None messages, which + # may occur, since there is chained-services. + msg_to_send = [msg for msg in [function(body, diff) for function in functions] if msg is not None] for msg in msg_to_send: logger.info("Sending %r on the road \\o/", msg) # XXX - uncomment this when in production trigger_send(*msg) -def trigger_send(routing_key, body): - sender = EventProducer("civet") +def trigger_send(routing_key, body, orig=None): + sender = EventProducer("trigger.civet") + if orig is not None: + body = (body, orig) sender.send_message("trigger.%s" % (routing_key,), body)