diff --git a/cmb/__init__.py b/cmb/__init__.py index 119d52ee..f9729d4d 100644 --- a/cmb/__init__.py +++ b/cmb/__init__.py @@ -1 +1,2 @@ from consumer import AsynchronousConsumer +from producer import BasicProducer diff --git a/cmb/producer.py b/cmb/producer.py new file mode 100644 index 00000000..d0cffcff --- /dev/null +++ b/cmb/producer.py @@ -0,0 +1,79 @@ +#!/bin/bash /usr/scripts/python.sh +# -*- coding: utf-8 -*- +# +# This code is a Crans production, based on pika documentation examples +# and on the RabbitMQ tutorials. +# +# This code is placed under the terms of GNU General Public License v3. +# +# Author : Pierre-Elliott Bécue +# Date : 18/05/2014 +# License : GPLv3 + +import sys +import pika +import cranslib.clogger as clogger + +logger = clogger.CLogger(sys.argv[0].split("/")[-1].replace(".py", ""), "info") + +class BasicProducer(object): + """ + This is CMB basic producer, it doesn't have to be asynchronous or anything, + it's just a basic object that sends messages. + + """ + + def __init__(self, url, exchange_name, app_id): + """Init + + """ + + self._connection = None + self._channel = None + self._exchange_name = exchange_name + self._app_id = app_id + self._url = url + logger.info("Initializing with app_id %s" % (self._app_id,)) + + def connect(self): + """Opens a basic connection which handles almost anything we need. + + """ + logger.info("Connecting to %s…" % (self._url)) + return pika.BlockingConnection(pika.ConnectionParameters(self._url)) + + def get_chan(self): + """Creates a channel and reopens connection if needed.""" + try: + logger.info("Opening channel…") + self._channel = self._connection.channel() + logger.info("Channel active.") + except AttributeError: + logger.warning("Connection no longer working, reconnecting…") + self._connection = self.connect() + self.get_chan() + + def send_message(self, routing_key, body): + """Sends basic message with app_id and body + + """ + try: + logger.info("Sending message %s with routing_key %s." % (body, routing_key)) + self._channel.basic_publish(exchange=self._exchange_name, + routing_key=routing_key, + body=body, + properties=pika.BasicProperties( + delivery_mode=2, + app_id=self._app_id, + )) + except: + print "Failure on cmb.producer" + raise + + def close(self): + """Closes the connection + + """ + + self._channel.close() + self._connection.close() diff --git a/gestion/trigger/event.py b/gestion/trigger/event.py new file mode 100644 index 00000000..8405d7d3 --- /dev/null +++ b/gestion/trigger/event.py @@ -0,0 +1,50 @@ +#!/bin/bash /usr/scripts/python.sh +# -*- coding: utf-8 -*- +# +# Trigger library, designed to send events messages. +# +# Author : Pierre-Elliott Bécue +# License : GPLv3 +# Date : 18/05/2014 + +import cmb +import cPickle +import gestion.config.trigger as trigger_config +import cranslib.clogger as clogger +import pika + +logger = clogger.CLogger("trigger", "info") + +class Event(cmb.BasicProducer): + """ + Event tracker + """ + def __init__(self, app_id): + """Extended + + """ + logger.info("Starting trigger Event program…") + super(Event, self).__init__(trigger_config.master, 'trigger.event', app_id) + self._connection = self.connect() + self.get_chan() + + def send_message(self, routing_key, body): + """Sends basic message with app_id and body + + """ + try: + logger.info("Sending message %s with routing_key %s.", body, routing_key) + body = cPickle.dumps(body) + self._channel.basic_publish(exchange=self._exchange_name, + routing_key=routing_key, + body=body, + properties=pika.BasicProperties( + delivery_mode=2, + app_id=self._app_id, + )) + except: + print "Failure in trigger.event" + raise + + def announce(self, body): + self.send_message("trigger.announce", body) diff --git a/gestion/trigger/hosts/civet.py b/gestion/trigger/hosts/civet.py new file mode 100644 index 00000000..736b2b7e --- /dev/null +++ b/gestion/trigger/hosts/civet.py @@ -0,0 +1,5 @@ +#!/bin/bash /usr/scripts/python.sh +# -*- coding: utf-8 -*- + +from gestion.trigger.civet import civet +from gestion.trigger.host import trigger diff --git a/gestion/trigger/readme b/gestion/trigger/readme new file mode 100644 index 00000000..0ff44577 --- /dev/null +++ b/gestion/trigger/readme @@ -0,0 +1,4 @@ +Documentation succincte de trigger +================================== + + diff --git a/gestion/trigger/transcriptor.py b/gestion/trigger/transcriptor.py new file mode 100644 index 00000000..f049f4e9 --- /dev/null +++ b/gestion/trigger/transcriptor.py @@ -0,0 +1,80 @@ +#!/bin/bash /usr/scripts/python.sh +# -*- coding: utf-8 -*- +# +# Author : Pierre-Elliott Bécue +# License : GPLv3 +# Date : 29/04/2014 + +import argparse +import cranslib.clogger as clogger +import cmb +import cPickle +import socket +import gestion.config.trigger as trigger_config +import gestion.affichage as affichage +import sys + +logger = clogger.CLogger("transcriptor", "info") + +def process_event(body): + """ + Makes the transcription between the ldap modlist generated + by lc_ldap and sent to the event queue and the message one + have to send to the services. + + Example : + {} + + """ + +class EvenementListener(cmb.AsynchronousConsumer): + """ + Gestionnaire d'événement + """ + + def on_message(self, channel, basic_deliver, properties, body): + """Invoked by pika when a message is delivered from RabbitMQ. The + channel is passed for your convenience. The basic_deliver object that + is passed in carries the exchange, routing key, delivery tag and + a redelivered flag for the message. The properties passed in is an + instance of BasicProperties with the message properties and the body + is the message that was sent. + + :param pika.channel.Channel channel: The channel object + :param pika.Spec.Basic.Deliver: basic_deliver method + :param pika.Spec.BasicProperties: properties + :param str|unicode body: The message body + + """ + origin = properties.app_id + message_id = properties.message_id + logger.info('Received message # %s from %s: %s', + basic_deliver.delivery_tag, properties.app_id, body) + body = cPickle.loads(body) + process_event(body) + self.acknowledge_message(basic_deliver.delivery_tag) + + def run(self): + """Run the example consumer by connecting to RabbitMQ and then + starting the IOLoop to block and allow the SelectConnection to operate. + + """ + logger.info("""Crans Message Broker ++--------------------------------------------+ +| Welcome on Transcriptor | ++--------------------------------------------+""") + self._connection = self.connect() + self.add_queue("trigger-event", "trigger.event") + self._connection.ioloop.start() + +def daemonize(): + listener = EvenementListener(trigger_config.master, "trigger", "topic") + try: + listener.run() + except KeyboardInterrupt: + logger.warning("Caught SIGINT, will now go for shutdown.") + listener.stop() + +if __name__ == '__main__': + # Daemonize the trigger app, in order to listen and execute commands from civet. + daemonize()