On sépare le producteur de messages des services.
This commit is contained in:
parent
3a628e9f53
commit
7d51242493
2 changed files with 77 additions and 48 deletions
63
gestion/trigger/producer.py
Normal file
63
gestion/trigger/producer.py
Normal file
|
@ -0,0 +1,63 @@
|
||||||
|
#!/usr/bin/env python
|
||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
#
|
||||||
|
# Event producer for trigger lib
|
||||||
|
#
|
||||||
|
# Author : Pierre-Elliott Bécue <becue@crans.org>
|
||||||
|
# License : GPLv3
|
||||||
|
# Date : 18/05/2014
|
||||||
|
|
||||||
|
"""
|
||||||
|
This is the message producer. It's standalone lib.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import cmb
|
||||||
|
import cPickle
|
||||||
|
import pika
|
||||||
|
|
||||||
|
# Clogger
|
||||||
|
import cranslib.clogger as clogger
|
||||||
|
|
||||||
|
# Trigger features
|
||||||
|
import gestion.config.trigger as trigger_config
|
||||||
|
|
||||||
|
logger = clogger.CLogger("trigger", "event", trigger_config.log_level, trigger_config.debug)
|
||||||
|
|
||||||
|
class EventProducer(cmb.BasicProducer):
|
||||||
|
"""
|
||||||
|
EventProducer tracker
|
||||||
|
"""
|
||||||
|
def __init__(self, app_id):
|
||||||
|
"""Extended
|
||||||
|
|
||||||
|
"""
|
||||||
|
logger.info("Starting trigger EventProducer program for app %s…", app_id)
|
||||||
|
trigger_password = secrets.get('rabbitmq_trigger_password')
|
||||||
|
credentials = pika.PlainCredentials(trigger_config.user, trigger_password)
|
||||||
|
super(EventProducer, self).__init__(url=trigger_config.master, exchange_name="trigger", app_id=app_id, port=trigger_config.port, credentials=credentials, ssl=trigger_config.ssl)
|
||||||
|
self._connection = self.connect()
|
||||||
|
self.get_chan()
|
||||||
|
|
||||||
|
def send_message(self, routing_key, body):
|
||||||
|
"""Sends basic message with app_id and body
|
||||||
|
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
logger.info("Sending message %s with routing_key %s.", body, routing_key)
|
||||||
|
body = cPickle.dumps(body)
|
||||||
|
self._channel.basic_publish(exchange=self._exchange_name,
|
||||||
|
routing_key=routing_key,
|
||||||
|
body=body,
|
||||||
|
properties=pika.BasicProperties(
|
||||||
|
delivery_mode=2,
|
||||||
|
app_id=self._app_id,
|
||||||
|
))
|
||||||
|
except:
|
||||||
|
print "Failure in trigger.event"
|
||||||
|
raise
|
||||||
|
|
||||||
|
def announce(self, body):
|
||||||
|
"""Feature to send message without giving routing_key
|
||||||
|
|
||||||
|
"""
|
||||||
|
self.send_message("trigger.event", body)
|
|
@ -13,9 +13,6 @@ database, and to make a correct diff between former and later object in order
|
||||||
to guess which services has to be updated.
|
to guess which services has to be updated.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
import cmb
|
|
||||||
import cPickle
|
|
||||||
import pika
|
|
||||||
import importlib
|
import importlib
|
||||||
import itertools
|
import itertools
|
||||||
import traceback
|
import traceback
|
||||||
|
@ -24,7 +21,8 @@ import gestion.secrets_new as secrets
|
||||||
|
|
||||||
# Trigger features
|
# Trigger features
|
||||||
import gestion.config.trigger as trigger_config
|
import gestion.config.trigger as trigger_config
|
||||||
from gestion.trigger.host import TriggerFactory, record_service, record_parser
|
from gestion.trigger.host import TriggerFactory, record_service
|
||||||
|
from gestion.trigger.producer import EventProducer
|
||||||
|
|
||||||
# Clogger
|
# Clogger
|
||||||
import cranslib.clogger as clogger
|
import cranslib.clogger as clogger
|
||||||
|
@ -41,45 +39,6 @@ for config_service in trigger_config.all_services:
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.critical("Fatal : import of %s failed, see following traceback. %s", config_service, traceback.format_exc())
|
logger.critical("Fatal : import of %s failed, see following traceback. %s", config_service, traceback.format_exc())
|
||||||
|
|
||||||
class EventProducer(cmb.BasicProducer):
|
|
||||||
"""
|
|
||||||
EventProducer tracker
|
|
||||||
"""
|
|
||||||
def __init__(self, app_id):
|
|
||||||
"""Extended
|
|
||||||
|
|
||||||
"""
|
|
||||||
logger.info("Starting trigger EventProducer program for app %s…", app_id)
|
|
||||||
trigger_password = secrets.get('rabbitmq_trigger_password')
|
|
||||||
credentials = pika.PlainCredentials(trigger_config.user, trigger_password)
|
|
||||||
super(EventProducer, self).__init__(url=trigger_config.master, exchange_name="trigger", app_id=app_id, port=trigger_config.port, credentials=credentials, ssl=trigger_config.ssl)
|
|
||||||
self._connection = self.connect()
|
|
||||||
self.get_chan()
|
|
||||||
|
|
||||||
def send_message(self, routing_key, body):
|
|
||||||
"""Sends basic message with app_id and body
|
|
||||||
|
|
||||||
"""
|
|
||||||
try:
|
|
||||||
logger.info("Sending message %s with routing_key %s.", body, routing_key)
|
|
||||||
body = cPickle.dumps(body)
|
|
||||||
self._channel.basic_publish(exchange=self._exchange_name,
|
|
||||||
routing_key=routing_key,
|
|
||||||
body=body,
|
|
||||||
properties=pika.BasicProperties(
|
|
||||||
delivery_mode=2,
|
|
||||||
app_id=self._app_id,
|
|
||||||
))
|
|
||||||
except:
|
|
||||||
print "Failure in trigger.event"
|
|
||||||
raise
|
|
||||||
|
|
||||||
def announce(self, body):
|
|
||||||
"""Feature to send message without giving routing_key
|
|
||||||
|
|
||||||
"""
|
|
||||||
self.send_message("trigger.event", body)
|
|
||||||
|
|
||||||
def diff_o_matic(body=()):
|
def diff_o_matic(body=()):
|
||||||
"""Fait un diff exhaustif des deux dicos"""
|
"""Fait un diff exhaustif des deux dicos"""
|
||||||
|
|
||||||
|
@ -144,8 +103,10 @@ def event(body=()):
|
||||||
the services. Afterwards, it sends these transcripts on the good way
|
the services. Afterwards, it sends these transcripts on the good way
|
||||||
using routing_key.
|
using routing_key.
|
||||||
|
|
||||||
body is a 3-tuple, containing LDAP dn, the former state of the object
|
body is a 5-tuple, containing timestamp, the former state of the object
|
||||||
(a simple dict), and the later state. The data are non-binding-dependant.
|
(a simple dict), and the later state, a dict with additionnal (but
|
||||||
|
non-LDAP) data and a dict of step indicators (an int). The data are
|
||||||
|
non-binding-dependant.
|
||||||
|
|
||||||
A new object has body[1] to None, a deleted one has body[2] to None.
|
A new object has body[1] to None, a deleted one has body[2] to None.
|
||||||
|
|
||||||
|
@ -176,13 +137,18 @@ def event(body=()):
|
||||||
#In [16]: b
|
#In [16]: b
|
||||||
#Out[16]: [('7', 3), (5, 6), ('lol', 'lal'), (3, 'lol')]
|
#Out[16]: [('7', 3), (5, 6), ('lol', 'lal'), (3, 'lol')]
|
||||||
functions = list(set([function for function in itertools.chain(*[TriggerFactory.get_parser(key) for key in diff]) if function is not None]))
|
functions = list(set([function for function in itertools.chain(*[TriggerFactory.get_parser(key) for key in diff]) if function is not None]))
|
||||||
msg_to_send = [function(body, diff) for function in functions]
|
|
||||||
|
# Compute the whole list of messages. This returns a list of 2-tuples. We remove None messages, which
|
||||||
|
# may occur, since there is chained-services.
|
||||||
|
msg_to_send = [msg for msg in [function(body, diff) for function in functions] if msg is not None]
|
||||||
|
|
||||||
for msg in msg_to_send:
|
for msg in msg_to_send:
|
||||||
logger.info("Sending %r on the road \\o/", msg)
|
logger.info("Sending %r on the road \\o/", msg)
|
||||||
# XXX - uncomment this when in production
|
# XXX - uncomment this when in production
|
||||||
trigger_send(*msg)
|
trigger_send(*msg)
|
||||||
|
|
||||||
def trigger_send(routing_key, body):
|
def trigger_send(routing_key, body, orig=None):
|
||||||
sender = EventProducer("civet")
|
sender = EventProducer("trigger.civet")
|
||||||
|
if orig is not None:
|
||||||
|
body = (body, orig)
|
||||||
sender.send_message("trigger.%s" % (routing_key,), body)
|
sender.send_message("trigger.%s" % (routing_key,), body)
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue