scripts/gestion/dialog/lc.py

472 lines
22 KiB
Python

#!/bin/bash /usr/scripts/python.sh
# -*- coding: utf-8 -*-
u"""
Copyright (C) Valentin Samir
Licence : GPLv3
"""
import sys
import time
import ldap
import traceback
if '/usr/scripts' not in sys.path:
sys.path.append('/usr/scripts')
from pythondialog import Dialog
from pythondialog import error as DialogError
from gestion.affich_tools import get_screen_size, coul
import lc_ldap.shortcuts
import lc_ldap.objets as objets
import lc_ldap.attributs as attributs
import lc_ldap.printing as printing
import CPS
from CPS import TailCall, tailcaller, Continue, TailCaller
class Dialog(CPS.Dialog):
def __init__(self, debug_enable=False, ldap_test=False, custom_user=None):
super(Dialog, self).__init__(debug_enable=debug_enable)
# On initialise le moteur de rendu en spécifiant qu'on va faire du dialog
printing.template(dialog=True)
self.ldap_test = ldap_test
self.custom_user = custom_user
self.check_ldap()
def has_right(self, liste, obj=None):
"""Vérifie que l'un des droits de l'utilisateur courant est inclus dans list"""
if not isinstance(liste, list):
liste = [liste]
if obj:
droits = obj.rights()
else:
droits = self.conn.droits
for d in liste:
if d in droits:
return True
return False
def _connected_as(self):
ret = u"Vous êtes connecté en tant que %s" % self.conn.current_login
if self.ldap_test:
ret += u" sur la base de test"
return ret
def check_ldap(self):
"""Se connecte à la base ldap et vérifie les droits de l'utilisateur courant"""
self.check_ldap_last = time.time()
# S'il y a --test dans les argument, on utilise la base de test
if self.ldap_test:
self.conn = lc_ldap.shortcuts.lc_ldap_test()
else:
# On ouvre une connexion lc_ldap
self.conn = lc_ldap.shortcuts.lc_ldap_admin()
# On vérifie que l'utilisateur système existe dans ldap (pour la gestion des droits)
luser=self.conn.search(u'(&(uid=%s)(objectClass=cransAccount))' % self.conn.current_login)
if not luser:
sys.stderr.write("L'utilisateur %s n'existe pas dans la base de donnée" % self.conn.current_login)
sys.exit(1)
self.conn.droits = [str(d) for d in luser[0]['droits']]
self.conn.dn = luser[0].dn
# Si un nom d'utilisateur est donné sur la ligne de commande
# et qu'on a les droits nounou, on l'utilise
if self.custom_user and attributs.nounou in self.conn.droits:
luser=self.conn.search(u'(&(uid=%s)(objectClass=cransAccount))' % self.custom_user)
if luser:
self.conn.current_login = self.custom_user
self.conn.droits = [str(d) for d in luser[0]['droits']]
self.conn.dn = luser[0].dn
a = attributs
allowed_right = [a.cableur, a.tresorier, a.bureau, a.nounou, a.imprimeur]
for droit in allowed_right:
if droit in self.conn.droits:
break
else:
sys.stderr.write(
u"%s ne possède aucun des droits :\n * %s\nnécessaire à utiliser ce programme\n" % (
self.conn.current_login,
'\n * '.join(allowed_right)
)
)
sys.exit(1)
@property
def dialog(self):
"""
Renvois l'objet dialog. De plus renouvelle régulièrement la connexion à la base ldap
"""
# Tous les self.timeout, on refraichie la connexion ldap
if time.time() - self.check_ldap_last > self.timeout:
self.check_ldap()
return super(Dialog, self).dialog
def edit_boolean_attributs(self, obj, attribs, title, update_obj, cont, values={}):
"""
Permet d'éditer des attributs booléen de l'objet obj listé dans attribs.
update_obj est le nom du paramètre à mettre à jour dans cont pour passer l'objet modifié
"""
# Dictionnaire décrivant quelle est la valeur booléenne à donner à l'absence de l'attribut
missing = {
'default' : False, # par défaut, on dit que c'est False
attributs.dnsIpv6 : True # pour dnsIpv6, c'est True
}
choices = [(a.ldap_name, a.legend, 1 if values.get(a.ldap_name, obj[a.ldap_name][0] if obj[a.ldap_name] else missing.get(a, missing['default'])) else 0) for a in attribs]
def box():
return self.dialog.checklist("Activier ou désactiver les propriétés suivantes",
height=0,
width=0,
timeout=self.timeout,
list_height=7,
choices=choices,
title=title)
def todo(values, obj, attribs, cont):
# On met à jour chaque attribut si sa valeur à changé
with self.conn.search(dn=obj.dn, scope=0, mode='rw')[0] as obj:
for a in attribs:
if obj[a.ldap_name] and obj[a.ldap_name][0] != values[a.ldap_name]:
obj[a.ldap_name]=values[a.ldap_name]
elif not obj[a.ldap_name] and missing.get(a, missing['default']) != values[a.ldap_name]:
obj[a.ldap_name]=values[a.ldap_name]
obj.validate_changes()
obj.history_gen()
obj.save()
# On s'en va en mettant à jour dans la continuation la valeur de obj
raise Continue(cont(**{update_obj:obj}))
(code, output) = self.handle_dialog(cont, box)
# On transforme la liste des cases dialog cochée en dictionnnaire
values = dict((a.ldap_name, a.ldap_name in output) for a in attribs)
# Une continuation que l'on suivra si quelque chose se passe mal
retry_cont = TailCall(self.edit_boolean_attributs, obj=obj, update_obj=update_obj, attribs=attribs, title=title, cont=cont, values=values)
return self.handle_dialog_result(
code=code,
output=output,
cancel_cont=cont,
error_cont=retry_cont,
codes_todo=[([self.dialog.DIALOG_OK], todo, [values, obj, attribs, cont])]
)
def edit_attributs(self, obj, attr, title, update_obj, cont, tag=None, values=None):
"""
Permet d'éditer la liste d'attribut attr de l'objet obj.
update_obj est le nom du paramètre à mettre à jour dans cont pour passer l'objet modifié
"""
# Il n'y a pas inputmenu dans la lib dialog, du coup, on traite les arguments à la main.
# Ça reste relativement acceptable comme on utilise la fonction de la lib pour appeler dialog
def box(values, default_tag):
cmd = ['--inputmenu', "Édition de l'attribut %s :" % attr, "0", "0", "20"]
index=0
for value in values:
cmd.extend([str(index), str(value)])
index+=1
cmd.extend(['new', ''])
(code, output) = self.dialog._perform(*(cmd,), timeout=self.timeout, title=title, default_item=str(default_tag))
if code == self.dialog.DIALOG_EXTRA:
output = output.split(' ', 2)[1:]
else:
output = ''
return (code, output)
def todo_extra(output, values, retry_cont):
tag, value = output
if tag == 'new':
if value:
values.append(value)
elif value == '':
values.pop(int(tag))
else:
values[int(tag)] = value
raise Continue(retry_cont(values=values, tag=tag))
def todo(obj, values, cont):
with self.conn.search(dn=obj.dn, scope=0, mode='rw')[0] as obj:
obj[attr] = [unicode(value, 'utf-8') for value in values]
obj.validate_changes()
obj.history_gen()
obj.save()
raise Continue(cont(**{update_obj:obj}))
if values is None:
values = [str(a) for a in obj[attr]]
retry_cont = TailCall(self.edit_attributs, obj=obj, attr=attr, title=title, update_obj=update_obj, cont=cont, values=values)
(code, output) = self.handle_dialog(cont, box, values, tag)
return self.handle_dialog_result(
code=code,
output=output,
cancel_cont=cont,
error_cont=retry_cont,
codes_todo=[
([self.dialog.DIALOG_OK], todo, [obj, values, cont]),
([self.dialog.DIALOG_EXTRA], todo_extra, [output, values, retry_cont]),
]
)
def search(self, objectClassS, title, values={}, cont=None, disable_field=[]):
"""
Rechercher des adhérents ou des machines dans la base ldap
retourne le tuple (code de retour dialog, valeurs entrée par l'utilisateur, liste d'objets trouvés)
La fonction est découpé en trois partie :
* affichage dialog et récupération du résultat
* construction de filtres de recherche ldap et recherches ldap
* filtre sur les résultats des recherches ldap
"""
select_dict = {
#label attribut ldap search for substring param dialog: line col valeur input-line icol len max-chars
'Nom' : {'ldap':'nom', 'sub':True, 'params' : [ 1, 1, values.get('nom', ""), 1, 13, 20, 20]},
'Prénom' : {'ldap':'prenom', 'sub':True, 'params' : [ 2, 1, values.get('prenom', ""), 2, 13, 20, 20]},
'Téléphone' : {'ldap':'tel', 'sub':True, 'params' : [ 3, 1, values.get('tel', ""), 3, 13, 10, 00]},
'Chambre' : {'ldap':'chbre','sub':True, 'params' : [ 4, 1, values.get('chbre',""), 4, 13, 05, 00]},
'aid' : {'ldap' : 'aid', 'sub':False, 'params' : [ 5, 1, values.get('aid',""), 5, 13, 05, 05]},
'mail' : {'ldap' : 'mail', 'sub':True, 'params' : [ 6, 1, values.get('mail',""), 6, 13, 40, 00]},
# seconde colone
'Machine' : {'ldap' : '*', 'sub':True, 'params' : [1, 35, "", 1, 43, 0, 0]},
'Host' : {'ldap' : 'host', 'sub':True, 'params' : [2, 37, values.get('host',""), 2, 43, 17, 17]},
'Mac' : {'ldap' : 'macAddress', 'sub':False, 'params' : [3, 37, values.get('macAddress',""), 3, 43, 17, 17]},
'IP' : {'ldap' : 'ipHostNumber', 'sub':False,'params' : [4, 37, values.get('ipHostNumber',""), 4, 43, 15, 15]},
'mid' : {'ldap' : 'mid', 'sub':False, 'params' : [5, 37, values.get('mid',""), 5, 43, 5, 5]},
}
# On a besoin de l'ordre pour récupérer les valeurs ensuite
select_adherent = ['Nom', 'Prénom', 'Téléphone', 'Chambre', 'aid', 'mail']
select_machine = ['Host', 'Mac', 'IP', 'mid']
if 'club' in objectClassS and not 'adherent' in objectClassS:
select_dict['cid']=select_dict['aid']
select_dict['cid']['ldap']='cid'
select_dict['cid']['params'][2]=values.get('cid', "")
select_adherent[select_adherent.index('aid')]='cid'
def box():
# On met les argument à dialog à la main ici, sinon, c'est difficile de choisir comment mettre une seconde colone
cmd = ["--mixedform", "Entrez vos paramètres de recherche", '0', '0', '0']
for key in select_adherent:
cmd.extend(['%s :' % key] + [str(e) for e in select_dict[key]['params']] + ['2' if key in disable_field else '0'])
cmd.extend(['Machine :'] + [str(e) for e in select_dict['Machine']['params']] + ['2'])
for key in select_machine:
cmd.extend(['%s :' % key] + [str(e) for e in select_dict[key]['params']] + ['2' if key in disable_field else '0'])
cmd.extend(["Les champs vides sont ignorés.", '7', '1', "", '0', '0', '0', '0', '2' ])
# On utilise quand même la fonction de la bibliothèques pour passer les arguments
(code, output) = self.dialog._perform(*(cmd,), timeout=self.timeout, title=title, backtitle="Entrez vos paramètres de recherche")
if output:
return (code, output.split('\n')[:-1])
else: # empty selection
return (code, [])
(code, dialog_values) = self.handle_dialog(cont, box)
# Si il a appuyé sur annuler ou sur escape, on saute sur la continuation
if code in (self.dialog.DIALOG_CANCEL, self.dialog.DIALOG_ESC):
raise Continue(cont)
else:
# Transformation de la liste des valeures entrée en dictionnnaire
dialog_values = dict(zip(select_adherent + select_machine, dialog_values))
ldap_values = dict([(select_dict[key]['ldap'], value) for key, value in dialog_values.items()])
# Construction des filtres ldap pour les adhérents et les machines
filter_adherent = []
filter_machine = []
for (key, value) in dialog_values.items():
if value:
if key in select_adherent:
filter_adherent.append((u"(%s=*%s*)" if select_dict[key]['sub'] and not '*' in value else u"(%s=%s)") % (select_dict[key]['ldap'], unicode(value, 'utf-8')))
elif key in select_machine:
filter_machine.append((u"(%s=*%s*)" if select_dict[key]['sub'] and not '*' in value else u"(%s=%s)") % (select_dict[key]['ldap'], unicode(value, 'utf-8')))
if filter_adherent:
filter_adherent=u"(&%s)" % "".join(filter_adherent)
if filter_machine:
filter_machine=u"(&%s)" % "".join(filter_machine)
# Récupération des adhérents et des machines
adherents=self.conn.search(filter_adherent) if filter_adherent else []
machines=self.conn.search(filter_machine) if filter_machine else []
# Filtrage des machines en fonction des adhérents
if filter_adherent:
if filter_machine:
# Si on filtre sur des adhérent et des machines, on calcule l'intersection
adherents_dn = set([a.dn for a in adherents])
machines_f = [m for m in machines if m.parent_dn in adherents_dn]
else:
# Sinon on filtre seulement sur les adhérents, récupère les machines des adhérents trouvés
machines_f = [m for a in adherents for m in a.machines()]
else:
# Sinon si on filtre seulement sur des machines
machines_f = machines
# Filtrage des adhérents en fonction des machines
if filter_machine:
if filter_adherent:
# Si on filtre sur des adhérents et des machines, on calcule l'intersection
machines_dn = set([m.parent_dn for m in machines])
adherents_f = [a for a in adherents if a.dn in machines_dn]
else:
# Sinon on récupères les proprios des machines trouvées
adherents_f = [m.proprio() for m in machines]
else:
# Sinon si on filtre seulement sur des adhérents
adherents_f = adherents
# On filtre sur les objectClassS
return ldap_values, [ o for objectClass in objectClassS for o in machines_f+adherents_f if objectClass in o['objectClass'] ]
@tailcaller
def select_one(self, items, title, text="Que souhaitez vous faire ?", default_item=None, cont=None):
"""Fait selectionner un item parmis une liste d'items à l'utisateur"""
def box(items, default_item):
choices=[]
olist={}
count = 0
# On sépare les item d'items en fonction de leur type
for o in items:
olist[o.__class__] = olist.get(o.__class__, []) + [o]
classes = olist.keys()
classes.sort()
default_tag = items.index(default_item) if default_item in items else default_item
# On se débrouille pour faire corresponde l'ordre d'affichache des objets
# et leur ordre dans la liste items. On donne la largeur de l'affichage à la main
# pour prendre en compte la largeur du widget dialog
del items[:] # On vide la liste pour la modifier en place
items_id = {}
(line, col) = get_screen_size()
for c in classes:
items.extend(olist[c])
items_s = printing.sprint_list(olist[c], col-20).encode('utf-8').split('\n')
choices.append(("", str(items_s[0])))
next=1
if items_s[next:]:
choices.append(("", str(items_s[next])))
next+=1
for i in items_s[next:]:
if i: # on zap les lignes vides
choices.append((str(count), str(i)))
count+=1
# On laisse une ligne vide pour séparer les listes d'objets de type différent
choices.append(("", ""))
return self.dialog.menu(
text,
width=0,
height=0,
menu_height=0,
timeout=self.timeout,
item_help=0,
default_item=str(default_tag),
title=title,
scrollbar=True,
choices=choices,
colors=True)
def todo(tag, items, title, cont, retry_cont):
# Si l'utilisateur n'a pas choisis une ligne correspondant à quelque chose
if not tag:
self.dialog.msgbox("Merci de choisir l'un des item de la liste ou d'annuler", timeout=self.timeout, title="Sélection", width=0, height=0)
raise Continue(retry_cont)
# Sinon on retourne l'item choisis
elif self.confirm_item(items[int(tag)], title):
return items[int(tag)]
else:
raise Continue(cont)
(code, tag) = self.handle_dialog(cont, box, items, default_item)
retry_cont = TailCall(self.select_one, items=items, title=title, default_item=tag, cont=cont)
return self.handle_dialog_result(
code=code,
output=tag,
cancel_cont=cont,
error_cont=retry_cont,
codes_todo=[([self.dialog.DIALOG_OK], todo, [tag, items, title, cont, retry_cont])]
)
def confirm_item(self, item, title, defaultno=False, text='', text_bottom="", **params):
"""Affiche un item et demande si c'est bien celui là que l'on veux (supprimer, éditer, créer,...)"""
return self.dialog.yesno(
text + printing.sprint(item, **params) + text_bottom,
no_collapse=True,
colors=True,
no_mouse=True,
timeout=self.timeout,
title=title,
defaultno=defaultno,
width=0, height=0,
backtitle="Appuyez sur MAJ pour selectionner du texte"
) == self.dialog.DIALOG_OK
def display_item(self, item, title, **params):
"""Affiche un item"""
return self.dialog.msgbox(
printing.sprint(item, **params),
no_collapse=True,
colors=True,
timeout=self.timeout,
title=title,
width=0, height=0,
backtitle="Appuyez sur MAJ pour selectionner du texte"
)
# On a besoin du décorateur ici car select va retourner un item après avoir
# possblement traiter plusieurs tailcall
@tailcaller
def select(self, objectClassS, title, values={}, cont=None, disable_field=[]):
"""Permet de choisir un objet adhérent ou machine dans la base ldap"""
try:
# On fait effectuer une recherche à l'utilisateur
values, items = self.search(objectClassS, title, values, cont=cont, disable_field=disable_field)
# S'il n'y a pas de résultas, on recommence
if not items:
self.dialog.msgbox("Aucun Résultat", timeout=self.timeout, title="Recherche", width=0, height=0)
raise Continue(TailCall(self.select, objectClassS=objectClassS, title=title, values=values, disable_field=disable_field, cont=cont))
# S'il y a plusieurs résultats
elif len(items)>1:
# On en fait choisir un, si c'est une continuation qui est renvoyé, elle est gérée par select
return self.select_one(items, title, cont=TailCall(self.select, objectClassS=objectClassS, title=title, values=values, disable_field=disable_field, cont=cont))
# S'il y a exactement 1 résultat à la recherche, on fait confirmer son choix à l'utilisateur
elif len(items) == 1:
item=items[0]
# On fait confirmer son choix à l'utilisateur
if self.confirm_item(item, title):
return item
else:
raise Continue(TailCall(self.select, objectClassS=objectClassS, title=title, values=values, disable_field=disable_field, cont=cont))
except self.error_to_raise:
raise
except Exception as e:
self.dialog.msgbox(traceback.format_exc() if self.debug_enable else "%r" % e, timeout=self.timeout, title="Erreur rencontrée", width=0, height=0)
raise Continue(TailCall(self.select, objectClassS=objectClassS, title=title, values=values, disable_field=disable_field, cont=cont))
@TailCaller
def main(gc, cont=None):
"""
Fonction principale gérant l'appel au menu principal,
le timeout des écrans dialog et les reconnexion a ldap en cas de perte de la connexion
"""
while True:
try:
# tant que le timeout est atteint on revient au menu principal
gc.menu_principal()
except DialogError as e:
# Si l'erreur n'est pas due à un timeout, on la propage
if time.time() - gc.dialog_last_access < gc.timeout:
raise
# Si on perd la connexion à ldap, on en ouvre une nouvelle
except ldap.SERVER_DOWN:
if gc.dialog.pause(title="Erreur", duration=10, height=9, width=50, text="La connection au serveur ldap à été perdue.\nTentative de reconnexion en cours…") == gc.dialog.DIALOG_OK:
gc.check_ldap()
else:
return