[trigger] On commence un début de gestionnaire d'événements.
* À terme, il sera ajouté aux bindings, dans l'objectif de gérer les envois de modifications sans que ceux-ci n'aient à implémenter la moindre autre chose qu'un producer standard qui balance des diff d'objets.
This commit is contained in:
parent
9c81aa2a23
commit
9cac0c2531
6 changed files with 219 additions and 0 deletions
|
@ -1 +1,2 @@
|
||||||
from consumer import AsynchronousConsumer
|
from consumer import AsynchronousConsumer
|
||||||
|
from producer import BasicProducer
|
||||||
|
|
79
cmb/producer.py
Normal file
79
cmb/producer.py
Normal file
|
@ -0,0 +1,79 @@
|
||||||
|
#!/bin/bash /usr/scripts/python.sh
|
||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
#
|
||||||
|
# This code is a Crans production, based on pika documentation examples
|
||||||
|
# and on the RabbitMQ tutorials.
|
||||||
|
#
|
||||||
|
# This code is placed under the terms of GNU General Public License v3.
|
||||||
|
#
|
||||||
|
# Author : Pierre-Elliott Bécue <becue@crans.org>
|
||||||
|
# Date : 18/05/2014
|
||||||
|
# License : GPLv3
|
||||||
|
|
||||||
|
import sys
|
||||||
|
import pika
|
||||||
|
import cranslib.clogger as clogger
|
||||||
|
|
||||||
|
logger = clogger.CLogger(sys.argv[0].split("/")[-1].replace(".py", ""), "info")
|
||||||
|
|
||||||
|
class BasicProducer(object):
|
||||||
|
"""
|
||||||
|
This is CMB basic producer, it doesn't have to be asynchronous or anything,
|
||||||
|
it's just a basic object that sends messages.
|
||||||
|
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, url, exchange_name, app_id):
|
||||||
|
"""Init
|
||||||
|
|
||||||
|
"""
|
||||||
|
|
||||||
|
self._connection = None
|
||||||
|
self._channel = None
|
||||||
|
self._exchange_name = exchange_name
|
||||||
|
self._app_id = app_id
|
||||||
|
self._url = url
|
||||||
|
logger.info("Initializing with app_id %s" % (self._app_id,))
|
||||||
|
|
||||||
|
def connect(self):
|
||||||
|
"""Opens a basic connection which handles almost anything we need.
|
||||||
|
|
||||||
|
"""
|
||||||
|
logger.info("Connecting to %s…" % (self._url))
|
||||||
|
return pika.BlockingConnection(pika.ConnectionParameters(self._url))
|
||||||
|
|
||||||
|
def get_chan(self):
|
||||||
|
"""Creates a channel and reopens connection if needed."""
|
||||||
|
try:
|
||||||
|
logger.info("Opening channel…")
|
||||||
|
self._channel = self._connection.channel()
|
||||||
|
logger.info("Channel active.")
|
||||||
|
except AttributeError:
|
||||||
|
logger.warning("Connection no longer working, reconnecting…")
|
||||||
|
self._connection = self.connect()
|
||||||
|
self.get_chan()
|
||||||
|
|
||||||
|
def send_message(self, routing_key, body):
|
||||||
|
"""Sends basic message with app_id and body
|
||||||
|
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
logger.info("Sending message %s with routing_key %s." % (body, routing_key))
|
||||||
|
self._channel.basic_publish(exchange=self._exchange_name,
|
||||||
|
routing_key=routing_key,
|
||||||
|
body=body,
|
||||||
|
properties=pika.BasicProperties(
|
||||||
|
delivery_mode=2,
|
||||||
|
app_id=self._app_id,
|
||||||
|
))
|
||||||
|
except:
|
||||||
|
print "Failure on cmb.producer"
|
||||||
|
raise
|
||||||
|
|
||||||
|
def close(self):
|
||||||
|
"""Closes the connection
|
||||||
|
|
||||||
|
"""
|
||||||
|
|
||||||
|
self._channel.close()
|
||||||
|
self._connection.close()
|
50
gestion/trigger/event.py
Normal file
50
gestion/trigger/event.py
Normal file
|
@ -0,0 +1,50 @@
|
||||||
|
#!/bin/bash /usr/scripts/python.sh
|
||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
#
|
||||||
|
# Trigger library, designed to send events messages.
|
||||||
|
#
|
||||||
|
# Author : Pierre-Elliott Bécue <becue@crans.org>
|
||||||
|
# License : GPLv3
|
||||||
|
# Date : 18/05/2014
|
||||||
|
|
||||||
|
import cmb
|
||||||
|
import cPickle
|
||||||
|
import gestion.config.trigger as trigger_config
|
||||||
|
import cranslib.clogger as clogger
|
||||||
|
import pika
|
||||||
|
|
||||||
|
logger = clogger.CLogger("trigger", "info")
|
||||||
|
|
||||||
|
class Event(cmb.BasicProducer):
|
||||||
|
"""
|
||||||
|
Event tracker
|
||||||
|
"""
|
||||||
|
def __init__(self, app_id):
|
||||||
|
"""Extended
|
||||||
|
|
||||||
|
"""
|
||||||
|
logger.info("Starting trigger Event program…")
|
||||||
|
super(Event, self).__init__(trigger_config.master, 'trigger.event', app_id)
|
||||||
|
self._connection = self.connect()
|
||||||
|
self.get_chan()
|
||||||
|
|
||||||
|
def send_message(self, routing_key, body):
|
||||||
|
"""Sends basic message with app_id and body
|
||||||
|
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
logger.info("Sending message %s with routing_key %s.", body, routing_key)
|
||||||
|
body = cPickle.dumps(body)
|
||||||
|
self._channel.basic_publish(exchange=self._exchange_name,
|
||||||
|
routing_key=routing_key,
|
||||||
|
body=body,
|
||||||
|
properties=pika.BasicProperties(
|
||||||
|
delivery_mode=2,
|
||||||
|
app_id=self._app_id,
|
||||||
|
))
|
||||||
|
except:
|
||||||
|
print "Failure in trigger.event"
|
||||||
|
raise
|
||||||
|
|
||||||
|
def announce(self, body):
|
||||||
|
self.send_message("trigger.announce", body)
|
5
gestion/trigger/hosts/civet.py
Normal file
5
gestion/trigger/hosts/civet.py
Normal file
|
@ -0,0 +1,5 @@
|
||||||
|
#!/bin/bash /usr/scripts/python.sh
|
||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
|
||||||
|
from gestion.trigger.civet import civet
|
||||||
|
from gestion.trigger.host import trigger
|
4
gestion/trigger/readme
Normal file
4
gestion/trigger/readme
Normal file
|
@ -0,0 +1,4 @@
|
||||||
|
Documentation succincte de trigger
|
||||||
|
==================================
|
||||||
|
|
||||||
|
|
80
gestion/trigger/transcriptor.py
Normal file
80
gestion/trigger/transcriptor.py
Normal file
|
@ -0,0 +1,80 @@
|
||||||
|
#!/bin/bash /usr/scripts/python.sh
|
||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
#
|
||||||
|
# Author : Pierre-Elliott Bécue <becue@crans.org>
|
||||||
|
# License : GPLv3
|
||||||
|
# Date : 29/04/2014
|
||||||
|
|
||||||
|
import argparse
|
||||||
|
import cranslib.clogger as clogger
|
||||||
|
import cmb
|
||||||
|
import cPickle
|
||||||
|
import socket
|
||||||
|
import gestion.config.trigger as trigger_config
|
||||||
|
import gestion.affichage as affichage
|
||||||
|
import sys
|
||||||
|
|
||||||
|
logger = clogger.CLogger("transcriptor", "info")
|
||||||
|
|
||||||
|
def process_event(body):
|
||||||
|
"""
|
||||||
|
Makes the transcription between the ldap modlist generated
|
||||||
|
by lc_ldap and sent to the event queue and the message one
|
||||||
|
have to send to the services.
|
||||||
|
|
||||||
|
Example :
|
||||||
|
{}
|
||||||
|
|
||||||
|
"""
|
||||||
|
|
||||||
|
class EvenementListener(cmb.AsynchronousConsumer):
|
||||||
|
"""
|
||||||
|
Gestionnaire d'événement
|
||||||
|
"""
|
||||||
|
|
||||||
|
def on_message(self, channel, basic_deliver, properties, body):
|
||||||
|
"""Invoked by pika when a message is delivered from RabbitMQ. The
|
||||||
|
channel is passed for your convenience. The basic_deliver object that
|
||||||
|
is passed in carries the exchange, routing key, delivery tag and
|
||||||
|
a redelivered flag for the message. The properties passed in is an
|
||||||
|
instance of BasicProperties with the message properties and the body
|
||||||
|
is the message that was sent.
|
||||||
|
|
||||||
|
:param pika.channel.Channel channel: The channel object
|
||||||
|
:param pika.Spec.Basic.Deliver: basic_deliver method
|
||||||
|
:param pika.Spec.BasicProperties: properties
|
||||||
|
:param str|unicode body: The message body
|
||||||
|
|
||||||
|
"""
|
||||||
|
origin = properties.app_id
|
||||||
|
message_id = properties.message_id
|
||||||
|
logger.info('Received message # %s from %s: %s',
|
||||||
|
basic_deliver.delivery_tag, properties.app_id, body)
|
||||||
|
body = cPickle.loads(body)
|
||||||
|
process_event(body)
|
||||||
|
self.acknowledge_message(basic_deliver.delivery_tag)
|
||||||
|
|
||||||
|
def run(self):
|
||||||
|
"""Run the example consumer by connecting to RabbitMQ and then
|
||||||
|
starting the IOLoop to block and allow the SelectConnection to operate.
|
||||||
|
|
||||||
|
"""
|
||||||
|
logger.info("""Crans Message Broker
|
||||||
|
+--------------------------------------------+
|
||||||
|
| Welcome on Transcriptor |
|
||||||
|
+--------------------------------------------+""")
|
||||||
|
self._connection = self.connect()
|
||||||
|
self.add_queue("trigger-event", "trigger.event")
|
||||||
|
self._connection.ioloop.start()
|
||||||
|
|
||||||
|
def daemonize():
|
||||||
|
listener = EvenementListener(trigger_config.master, "trigger", "topic")
|
||||||
|
try:
|
||||||
|
listener.run()
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
logger.warning("Caught SIGINT, will now go for shutdown.")
|
||||||
|
listener.stop()
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
# Daemonize the trigger app, in order to listen and execute commands from civet.
|
||||||
|
daemonize()
|
Loading…
Add table
Add a link
Reference in a new issue