scripts/gestion/trigger/services/event.py

283 lines
9.9 KiB
Python

#!/bin/bash /usr/scripts/python.sh
# -*- coding: utf-8 -*-
#
# Trigger library, designed to send events messages.
#
# Author : Pierre-Elliott Bécue <becue@crans.org>
# License : GPLv3
# Date : 10/03/2015
"""
This service (event) is designed to receive any modification done on LDAP
database, and to make a correct diff between former and later object in order
to guess which services has to be updated.
"""
import importlib
import itertools
import traceback
# Trigger features
import gestion.config.trigger as trigger_config
from gestion.trigger.host import TriggerFactory, record_service
from gestion.trigger.producer import EventProducer
# Clogger
import cranslib.clogger as clogger
LOGGER = clogger.CLogger("trigger", "event", trigger_config.log_level, trigger_config.debug)
PRODUCER = EventProducer("trigger.civet")
SERVICES = []
for config_service in trigger_config.all_services:
try:
SERVICES.append(importlib.import_module("gestion.trigger.parsers.%s" % (config_service,)))
except Exception:
LOGGER.critical("Fatal : import of %r failed, see following traceback. %r", config_service, traceback.format_exc())
class EventList(list):
"""List which is designed to grow up when one try to acces an element out of
range"""
def __fill(self, index):
"""Fills the intermediates indexes if needed"""
while len(self) <= index:
self.append({})
def __getitem__(self, index):
"""Gets the item after filling if needed"""
self.__fill(index)
return super(EventList, self).__getitem__(index)
def __setitem__(self, index, value):
"""Sets the item after filling if needed"""
self.__fill(index)
return super(EventList, self).__setitem__(index, value)
class EventTracker(object):
"""Stores events actions from event service. It allows to track all services
regeneration, and to chain services execution when needed. To avoid data loss
during process, the EventTracker duplicates its own data in a file.
This file will be synced, but, by default, RAM data is considered as the
current state of the factory. A sanity check method allows to guess if the
file should be loaded to RAM."""
event_chain = {}
@classmethod
def record_event_to_chain(cls, ob_id, pos, service_name, service_data):
"""Records a chain of events. args contains a tuple which arguments
is a list of dicts. ob_id is a unique identifier of the current chain.
Each dicts points to a message to send independently via trigger.
args should look like :
([("dhcp", {'update':...}, ob_id), (...., ob_id)], [...])"""
# If no entry, we create an EventList.
if ob_id not in cls.event_chain:
cls.event_chain[ob_id] = EventList()
# If service is already there, we are facing a double setting of service, which is not
# normal.
if service_name in cls.event_chain[ob_id][pos]:
LOGGER.critical("[%r] Weird. event_chain[%r][%r][%r] set to %r, but asking me to set it to %r.", ob_id, ob_id, pos, service_name, cls.event_chain[ob_id][pos][service_name], service_data)
else:
LOGGER.debug("[%r] Adding %r to EventTracker.event_chain[%r][%r][%r].", ob_id, service_data, ob_id, pos, service_name)
cls.event_chain[ob_id][pos][service_name] = service_data
@classmethod
def check_empty(cls, ob_id):
"""Checks if cls.event_chain[ob_id] is empty"""
if ob_id not in cls.event_chain:
LOGGER.debug("[%r] EventTracker.cls_event_chain free of %r.", ob_id, ob_id)
return True
if len(cls.event_chain[ob_id]) == 0:
cls.event_chain.pop(ob_id)
LOGGER.debug("[%r] EventTracker.cls_event_chain free of %r.", ob_id, ob_id)
return True
@classmethod
def get_off_record(cls, ob_id):
"""Expedits a formatted record"""
# We will pop items from event_chain[ob_id]
# untill we have a non-empty dict.
if cls.check_empty(ob_id):
return []
dico = False
while not dico:
if len(cls.event_chain[ob_id]) > 0:
dico = cls.event_chain[ob_id][0]
# Should not happen.
if not dico:
cls.event_chain[ob_id].pop(0)
else:
# If we are at the end of the list
dico = True
# then, we have nothing to do.
if dico == True:
return []
if isinstance(dico, bool):
dico = {}
return [
(ob_id, service_name, service_data)
for (service_name, service_data) in dico.iteritems()
]
@classmethod
def ack(cls, ob_id, service_name):
"""Removes service_name from the event_chain, since
everything is ok."""
if cls.check_empty(ob_id):
LOGGER.info("[%r] Ack for %r, but nothing to ack...", ob_id, service_name)
return None
if service_name not in cls.event_chain[ob_id][0]:
LOGGER.info("[%r] Ack for %r, but nothing in event_chain[%r][0]...", ob_id, service_name, ob_id)
return None
# Remove the service_name from the dict.
cls.event_chain[ob_id][0].pop(service_name)
# If dict is empty, we drop it.
if not cls.event_chain[ob_id][0]:
cls.event_chain[ob_id].pop(0)
return None
# If the list is empty, we drop it.
if not cls.event_chain[ob_id]:
cls.event_chain.pop(ob_id)
return None
return True
def diff_o_matic(before, after):
"""Fait un diff exhaustif des deux dicos"""
if not before and not after:
raise ValueError("diff_o_matic received %r as an argument, which is unusable." % ((before, after),))
# set(dico) retourne un set de dico.keys()
keys_pool = set(before).union(set(after))
diff = {}
for key in keys_pool:
if before.has_key(key):
if not isinstance(before[key], list):
blist = [before[key]]
else:
blist = list(before[key])
else:
blist = []
if after.has_key(key):
if not isinstance(after[key], list):
alist = [after[key]]
else:
alist = list(after[key])
else:
alist = []
moins, plus = compare_lists(blist, alist)
if moins != [] or plus != []:
diff[key] = (moins, plus)
return diff
def compare_lists(list1, list2):
"""Compare deux listes, retourne deux listes, une
avec les données perdues, et une avec les données
apparues
Insensible à la casse.
"""
moins, plus = [], []
for elem in [] + list1:
try:
ind = list2.index(elem.lower())
except ValueError:
moins.append(elem)
continue
list1.remove(elem)
list2.pop(ind)
plus = plus + list2
return moins, plus
@record_service(ack=False)
def event(ob_id, before, after, more):
"""When any event arrives on trigger-civet-event, this method is called
and designed to transcript the body (ldap data) in something usable for
the services. Afterwards, it sends these transcripts on the good way
using routing_key.
body is a 4-tuple, containing hash, the former state of the object
(a simple dict), and the later state, a dict with additionnal (but
non-LDAP) data. The data are non-binding-dependant.
A new object has body[1] to None, a deleted one has body[2] to None.
"""
LOGGER.info("[%r] Received message %r", ob_id, (ob_id, before, after, more))
# Hey, I'll follow you 'till your end.
diff = diff_o_matic(before, after)
# Some debug if needed.
LOGGER.debug("[%r] in service event, diff is %r.", ob_id, diff)
# Now, diff is a dict containing attributes which has been modified.
# diff['macAddress'] could look like (['aa:bb:cc:dd:ee:fg'], ['aa:bb:cc:dd:ee:ff']),
# where the list on the left is the former value of attributes, and the list on the
# right the latter values.
# -*- Explain -*-
#In [11]: import itertools
#
#In [12]: a = [[(3, 'lol'), ('7', 3)], [(5, 6), None], [None], [('lol', 'lal')]]
#
#In [13]: a
#Out[13]: [[(3, 'lol'), ('7', 3)], [(5, 6), None], [None], [('lol', 'lal')]]
#
#In [14]: list(set([message for message in itertools.chain(*a)]))
#Out[14]: [('7', 3), (5, 6), None, ('lol', 'lal'), (3, 'lol')] # Only one None from a, since [None, x, y, None] is equivalent for itertools to [x, y]
#
#In [15]: b = list(set([message for message in itertools.chain(*a) if message is not None]))
#
#In [16]: b
#Out[16]: [('7', 3), (5, 6), ('lol', 'lal'), (3, 'lol')]
functions = list(set([function for function in itertools.chain(*[TriggerFactory.get_parser(key) for key in diff]) if function is not None]))
LOGGER.debug("[%r] in service event, functions are %r.", ob_id, functions)
# Compute the whole list of messages. This returns a list of 2-tuples. We remove None messages, which
# should not occcur... But, whatever.
msgs_to_send = [msg for msg in [function(ob_id, (before, after), diff, more) for function in functions] if msg is not None]
LOGGER.debug("[%r] in service event, messages are %r.", ob_id, msgs_to_send)
for msg in msgs_to_send:
service_name, operations, pos = msg[0], msg[1], msg[2]
LOGGER.info("[%r] Adding %r on the EventTracker", ob_id, (pos, service_name, operations))
EventTracker.record_event_to_chain(ob_id, pos, service_name, operations)
# Sends the first wave on the way.
todo = EventTracker.get_off_record(ob_id)
for msg in todo:
LOGGER.info("Sending %r on the road \\o/", msg)
# XXX - uncomment this when in production
trigger_send(*msg)
def trigger_send(ob_id, routing_key, operations):
"""Sends a message via civet/trigger"""
msg = tuple([ob_id] + [operations])
PRODUCER.send_message("trigger.%s" % (routing_key,), msg)