Un event qui traîne depuis trop longtemps en RAM est détruit.

This commit is contained in:
Pierre-Elliott Bécue 2015-09-23 17:33:21 +02:00
parent de8832a3f9
commit 5271e51f30
2 changed files with 33 additions and 10 deletions

View file

@ -16,6 +16,12 @@ user = "trigger"
port = 5671 port = 5671
ssl = True ssl = True
# TTL en secondes pour les messages en attente.
# Une suite d'opérations a faire a un ob_id, qui est un hash.
# Quand cette suite traîne depuis trop longtemps en attente sans que rien
# ne se passe, on la jette.
MSG_TTL = 3600
# Liste des services associés aux hôtes # Liste des services associés aux hôtes
# useradd : Envoie le mail de bienvenue, et crée le home # useradd : Envoie le mail de bienvenue, et crée le home
# userdel : Détruit le home, déconnecte l'utilisateur sur zamok, détruit les indexes dovecot, désinscrit l'adresse crans des mailing listes associées # userdel : Détruit le home, déconnecte l'utilisateur sur zamok, détruit les indexes dovecot, désinscrit l'adresse crans des mailing listes associées

View file

@ -1,21 +1,22 @@
#!/bin/bash /usr/scripts/python.sh #!/bin/bash /usr/scripts/python.sh
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
# #
# Trigger library, designed to send events messages. # event.py, le service maître hébergé sur le serveur RabbitMQ
# #
# Author : Pierre-Elliott Bécue <becue@crans.org> # Author : Pierre-Elliott Bécue <becue@crans.org>
# License : GPLv3 # License : GPLv3
# Date : 10/03/2015 # Date : 10/03/2015
""" """
This service (event) is designed to receive any modification done on LDAP Quand une modification est effectuée sur la base LDAP, elle est envoyée
database, and to make a correct diff between former and later object in order au serveur RabbitMQ maître, qui la traite grâce à ce service. Il calcule
to guess which services has to be updated. un diff entre les anciennes et les nouvelles données, et crée des messages
à envoyer aux autres serveurs, qui les traiteront également.
""" """
import importlib import importlib
import itertools import itertools
import traceback import traceback
import time
# Trigger features # Trigger features
import gestion.config.trigger as trigger_config import gestion.config.trigger as trigger_config
@ -37,21 +38,23 @@ for config_service in trigger_config.all_services:
LOGGER.critical("Fatal : import of %r failed, see following traceback. %r", config_service, traceback.format_exc()) LOGGER.critical("Fatal : import of %r failed, see following traceback. %r", config_service, traceback.format_exc())
class EventList(list): class EventList(list):
"""List which is designed to grow up when one try to acces an element out of """Une liste tolérante aux accès "out of range".
range""" Elle remplit les données intermédiaires n'existant pas avec des
dict vides.
"""
def __fill(self, index): def __fill(self, index):
"""Fills the intermediates indexes if needed""" """Remplit les données intermédiaires entre len(self) et index."""
while len(self) <= index: while len(self) <= index:
self.append({}) self.append({})
def __getitem__(self, index): def __getitem__(self, index):
"""Gets the item after filling if needed""" """On remplit si besoin, puis on retourne l'item."""
self.__fill(index) self.__fill(index)
return super(EventList, self).__getitem__(index) return super(EventList, self).__getitem__(index)
def __setitem__(self, index, value): def __setitem__(self, index, value):
"""Sets the item after filling if needed""" """On remplit si besoin, puis on affecte l'item voulu."""
self.__fill(index) self.__fill(index)
return super(EventList, self).__setitem__(index, value) return super(EventList, self).__setitem__(index, value)
@ -65,6 +68,7 @@ class EventTracker(object):
file should be loaded to RAM.""" file should be loaded to RAM."""
event_chain = {} event_chain = {}
event_times = {}
@classmethod @classmethod
def record_event_to_chain(cls, ob_id, pos, service_name, service_data): def record_event_to_chain(cls, ob_id, pos, service_name, service_data):
@ -87,6 +91,7 @@ class EventTracker(object):
else: else:
LOGGER.debug("[%r] Adding %r to EventTracker.event_chain[%r][%r][%r].", ob_id, service_data, ob_id, pos, service_name) LOGGER.debug("[%r] Adding %r to EventTracker.event_chain[%r][%r][%r].", ob_id, service_data, ob_id, pos, service_name)
cls.event_chain[ob_id][pos][service_name] = service_data cls.event_chain[ob_id][pos][service_name] = service_data
cls.event_times[ob_id] = time.time()
@classmethod @classmethod
def check_empty(cls, ob_id): def check_empty(cls, ob_id):
@ -100,6 +105,15 @@ class EventTracker(object):
LOGGER.debug("[%r] EventTracker.cls_event_chain free of %r.", ob_id, ob_id) LOGGER.debug("[%r] EventTracker.cls_event_chain free of %r.", ob_id, ob_id)
return True return True
@classmethod
def flush_expired_records(cls):
"""Flushes all records older than trigger_config.MSG_TTL seconds"""
for ob_id in cls.event_chain:
if time.time() - cls.event_times[ob_id] > trigger_config.MSG_TTL:
cls.event_times.pop(ob_id)
cls.event_chain.pop(ob_id)
@classmethod @classmethod
def get_off_record(cls, ob_id): def get_off_record(cls, ob_id):
"""Expedits a formatted record""" """Expedits a formatted record"""
@ -108,6 +122,9 @@ class EventTracker(object):
if cls.check_empty(ob_id): if cls.check_empty(ob_id):
return [] return []
# We purge every expired record
cls.flush_expired_records()
dico = False dico = False
while not dico: while not dico:
if len(cls.event_chain[ob_id]) > 0: if len(cls.event_chain[ob_id]) > 0: