Implémentation d'un gestionnaire d'événements sommaire.
This commit is contained in:
parent
6c97d6998f
commit
201377528c
12 changed files with 411 additions and 239 deletions
|
@ -5,7 +5,7 @@
|
|||
#
|
||||
# Author : Pierre-Elliott Bécue <becue@crans.org>
|
||||
# License : GPLv3
|
||||
# Date : 18/05/2014
|
||||
# Date : 10/03/2015
|
||||
|
||||
"""
|
||||
This service (event) is designed to receive any modification done on LDAP
|
||||
|
@ -17,8 +17,6 @@ import importlib
|
|||
import itertools
|
||||
import traceback
|
||||
|
||||
import gestion.secrets_new as secrets
|
||||
|
||||
# Trigger features
|
||||
import gestion.config.trigger as trigger_config
|
||||
from gestion.trigger.host import TriggerFactory, record_service
|
||||
|
@ -27,26 +25,145 @@ from gestion.trigger.producer import EventProducer
|
|||
# Clogger
|
||||
import cranslib.clogger as clogger
|
||||
|
||||
# lc_ldap
|
||||
import lc_ldap.attributs
|
||||
LOGGER = clogger.CLogger("trigger", "event", trigger_config.log_level, trigger_config.debug)
|
||||
|
||||
logger = clogger.CLogger("trigger", "event", trigger_config.log_level, trigger_config.debug)
|
||||
PRODUCER = EventProducer("trigger.civet")
|
||||
|
||||
services = []
|
||||
SERVICES = []
|
||||
for config_service in trigger_config.all_services:
|
||||
try:
|
||||
services.append(importlib.import_module("gestion.trigger.parsers.%s" % (config_service,)))
|
||||
except Exception as e:
|
||||
logger.critical("Fatal : import of %s failed, see following traceback. %s", config_service, traceback.format_exc())
|
||||
SERVICES.append(importlib.import_module("gestion.trigger.parsers.%s" % (config_service,)))
|
||||
except Exception:
|
||||
LOGGER.critical("Fatal : import of %r failed, see following traceback. %r", config_service, traceback.format_exc())
|
||||
|
||||
def diff_o_matic(body=()):
|
||||
class EventList(list):
|
||||
"""List which is designed to grow up when one try to acces an element out of
|
||||
range"""
|
||||
|
||||
def __fill(self, index):
|
||||
"""Fills the intermediates indexes if needed"""
|
||||
while len(self) <= index:
|
||||
self.append({})
|
||||
|
||||
def __getitem__(self, index):
|
||||
"""Gets the item after filling if needed"""
|
||||
self.__fill(index)
|
||||
return super(EventList, self).__getitem__(index)
|
||||
|
||||
def __setitem__(self, index, value):
|
||||
"""Sets the item after filling if needed"""
|
||||
self.__fill(index)
|
||||
return super(EventList, self).__setitem__(index, value)
|
||||
|
||||
class EventTracker(object):
|
||||
"""Stores events actions from event service. It allows to track all services
|
||||
regeneration, and to chain services execution when needed. To avoid data loss
|
||||
during process, the EventTracker duplicates its own data in a file.
|
||||
|
||||
This file will be synced, but, by default, RAM data is considered as the
|
||||
current state of the factory. A sanity check method allows to guess if the
|
||||
file should be loaded to RAM."""
|
||||
|
||||
event_chain = {}
|
||||
|
||||
@classmethod
|
||||
def record_event_to_chain(cls, ob_id, pos, service_name, service_data):
|
||||
"""Records a chain of events. args contains a tuple which arguments
|
||||
is a list of dicts. ob_id is a unique identifier of the current chain.
|
||||
|
||||
Each dicts points to a message to send independently via trigger.
|
||||
|
||||
args should look like :
|
||||
([("dhcp", {'update':...}, ob_id), (...., ob_id)], [...])"""
|
||||
|
||||
# If no entry, we create an EventList.
|
||||
if ob_id not in cls.event_chain:
|
||||
cls.event_chain[ob_id] = EventList()
|
||||
|
||||
# If service is already there, we are facing a double setting of service, which is not
|
||||
# normal.
|
||||
if service_name in cls.event_chain[ob_id][pos]:
|
||||
LOGGER.critical("[%r] Weird. event_chain[%r][%r][%r] set to %r, but asking me to set it to %r.", ob_id, ob_id, pos, service_name, cls.event_chain[ob_id][pos][service_name], service_data)
|
||||
else:
|
||||
LOGGER.debug("[%r] Adding %r to EventTracker.event_chain[%r][%r][%r].", ob_id, service_data, ob_id, pos, service_name)
|
||||
cls.event_chain[ob_id][pos][service_name] = service_data
|
||||
|
||||
@classmethod
|
||||
def check_empty(cls, ob_id):
|
||||
"""Checks if cls.event_chain[ob_id] is empty"""
|
||||
if ob_id not in cls.event_chain:
|
||||
LOGGER.debug("[%r] EventTracker.cls_event_chain free of %r.", ob_id, ob_id)
|
||||
return True
|
||||
|
||||
if len(cls.event_chain[ob_id]) == 0:
|
||||
cls.event_chain.pop(ob_id)
|
||||
LOGGER.debug("[%r] EventTracker.cls_event_chain free of %r.", ob_id, ob_id)
|
||||
return True
|
||||
|
||||
@classmethod
|
||||
def get_off_record(cls, ob_id):
|
||||
"""Expedits a formatted record"""
|
||||
# We will pop items from event_chain[ob_id]
|
||||
# untill we have a non-empty dict.
|
||||
if cls.check_empty(ob_id):
|
||||
return []
|
||||
|
||||
dico = False
|
||||
while not dico:
|
||||
if len(cls.event_chain[ob_id]) > 0:
|
||||
dico = cls.event_chain[ob_id][0]
|
||||
# Should not happen.
|
||||
if not dico:
|
||||
cls.event_chain[ob_id].pop(0)
|
||||
else:
|
||||
# If we are at the end of the list
|
||||
dico = True
|
||||
|
||||
# then, we have nothing to do.
|
||||
if dico == True:
|
||||
return []
|
||||
|
||||
if isinstance(bool, dico):
|
||||
dico = {}
|
||||
|
||||
return [
|
||||
(ob_id, service_name, service_data)
|
||||
for (service_name, service_data) in dico.iteritems()
|
||||
]
|
||||
|
||||
@classmethod
|
||||
def ack(cls, ob_id, service_name):
|
||||
"""Removes service_name from the event_chain, since
|
||||
everything is ok."""
|
||||
|
||||
if cls.check_empty(ob_id):
|
||||
LOGGER.info("[%r] Ack for %r, but nothing to ack...", ob_id, service_name)
|
||||
return None
|
||||
|
||||
if service_name not in cls.event_chain[ob_id][0]:
|
||||
LOGGER.info("[%r] Ack for %r, but nothing in event_chain[%r][0]...", ob_id, service_name, ob_id)
|
||||
return None
|
||||
|
||||
# Remove the service_name from the dict.
|
||||
cls.event_chain[ob_id][0].pop(service_name)
|
||||
|
||||
# If dict is empty, we drop it.
|
||||
if not cls.event_chain[ob_id][0]:
|
||||
cls.event_chain[ob_id].pop(0)
|
||||
return None
|
||||
|
||||
# If the list is empty, we drop it.
|
||||
if not cls.event_chain[ob_id]:
|
||||
cls.event_chain.pop(ob_id)
|
||||
return None
|
||||
|
||||
return True
|
||||
|
||||
def diff_o_matic(before, after):
|
||||
"""Fait un diff exhaustif des deux dicos"""
|
||||
|
||||
if not body:
|
||||
raise ValueError("diff_o_matic received %r as an argument, which is unusable." % (body,))
|
||||
|
||||
before = dict(body[1]) or {}
|
||||
after = dict(body[2]) or {}
|
||||
if not before and not after:
|
||||
raise ValueError("diff_o_matic received %r as an argument, which is unusable." % ((before, after),))
|
||||
|
||||
# set(dico) retourne un set de dico.keys()
|
||||
keys_pool = set(before).union(set(after))
|
||||
|
@ -96,25 +213,28 @@ def compare_lists(list1, list2):
|
|||
return moins, plus
|
||||
|
||||
|
||||
@record_service
|
||||
def event(body=()):
|
||||
@record_service(ack=False)
|
||||
def event(ob_id, before, after, more):
|
||||
"""When any event arrives on trigger-civet-event, this method is called
|
||||
and designed to transcript the body (ldap data) in something usable for
|
||||
the services. Afterwards, it sends these transcripts on the good way
|
||||
using routing_key.
|
||||
|
||||
body is a 5-tuple, containing timestamp, the former state of the object
|
||||
body is a 4-tuple, containing hash, the former state of the object
|
||||
(a simple dict), and the later state, a dict with additionnal (but
|
||||
non-LDAP) data and a dict of step indicators (an int). The data are
|
||||
non-binding-dependant.
|
||||
non-LDAP) data. The data are non-binding-dependant.
|
||||
|
||||
A new object has body[1] to None, a deleted one has body[2] to None.
|
||||
|
||||
"""
|
||||
|
||||
logger.info("Received message %r…", body)
|
||||
LOGGER.info("[%r] Received message %r…", ob_id, (ob_id, before, after, more))
|
||||
|
||||
diff = diff_o_matic(body)
|
||||
# Hey, I'll follow you 'till your end.
|
||||
diff = diff_o_matic(before, after)
|
||||
|
||||
# Some debug if needed.
|
||||
LOGGER.debug("[%r] in service event, diff is %r.", ob_id, diff)
|
||||
|
||||
# Now, diff is a dict containing attributes which has been modified.
|
||||
# diff['macAddress'] could look like (['aa:bb:cc:dd:ee:fg'], ['aa:bb:cc:dd:ee:ff']),
|
||||
|
@ -140,15 +260,23 @@ def event(body=()):
|
|||
|
||||
# Compute the whole list of messages. This returns a list of 2-tuples. We remove None messages, which
|
||||
# may occur, since there is chained-services.
|
||||
msg_to_send = [msg for msg in [function(body, diff) for function in functions] if msg is not None]
|
||||
msgs_to_send = [msg for msg in [function(ob_id, (before, after), diff) for function in functions] if msg is not None]
|
||||
LOGGER.debug("[%r] in service event, messages are %r.", ob_id, msgs_to_send)
|
||||
|
||||
for msg in msg_to_send:
|
||||
logger.info("Sending %r on the road \\o/", msg)
|
||||
for msg in msgs_to_send:
|
||||
service_name, body, pos = msg[0], msg[1], msg[2]
|
||||
LOGGER.info("[%r] Adding %r on the EventTracker", ob_id, (pos, service_name, body))
|
||||
EventTracker.record_event_to_chain(ob_id, pos, service_name, body)
|
||||
|
||||
# Sends the first wave on the way.
|
||||
todo = EventTracker.get_off_record(ob_id)
|
||||
for msg in todo:
|
||||
LOGGER.info("Sending %r on the road \\o/", msg)
|
||||
# XXX - uncomment this when in production
|
||||
trigger_send(*msg)
|
||||
|
||||
def trigger_send(routing_key, body, orig=None):
|
||||
sender = EventProducer("trigger.civet")
|
||||
if orig is not None:
|
||||
body = (body, orig)
|
||||
sender.send_message("trigger.%s" % (routing_key,), body)
|
||||
def trigger_send(ob_id, routing_key, body):
|
||||
"""Sends a message via civet/trigger"""
|
||||
|
||||
body = tuple([ob_id] + [elem for elem in body])
|
||||
PRODUCER.send_message("trigger.%s" % (routing_key,), body)
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue