479 lines
23 KiB
Python
479 lines
23 KiB
Python
#!/bin/bash /usr/scripts/python.sh
|
|
# -*- coding: utf-8 -*-
|
|
|
|
u"""
|
|
Copyright (C) Valentin Samir
|
|
Licence : GPLv3
|
|
|
|
"""
|
|
import os
|
|
import sys
|
|
import time
|
|
import ldap
|
|
import traceback
|
|
import locale
|
|
if '/usr/scripts' not in sys.path:
|
|
sys.path.append('/usr/scripts')
|
|
from pythondialog import Dialog
|
|
from pythondialog import error as DialogError
|
|
|
|
from gestion.affich_tools import get_screen_size, coul
|
|
|
|
import lc_ldap.shortcuts
|
|
import lc_ldap.objets as objets
|
|
import lc_ldap.attributs as attributs
|
|
import lc_ldap.printing as printing
|
|
|
|
import CPS
|
|
from CPS import TailCall, tailcaller, Continue, TailCaller
|
|
|
|
class Dialog(CPS.Dialog):
|
|
def __init__(self, debug_enable=False, ldap_test=False, custom_user=None):
|
|
super(Dialog, self).__init__()
|
|
# On initialise le moteur de rendu en spécifiant qu'on va faire du dialog
|
|
printing.template(dialog=True)
|
|
self.ldap_test = ldap_test
|
|
if custom_user:
|
|
custom_user = custom_user.decode(locale.getdefaultlocale()[1] or "ascii")
|
|
self.custom_user = custom_user
|
|
self.check_ldap()
|
|
login = self.conn.current_login
|
|
dialogrc='/home/%s/.dialogrc' % login
|
|
super(Dialog, self).__init__(debug_enable=debug_enable, dialogrc=dialogrc)
|
|
|
|
def has_right(self, liste, obj=None):
|
|
"""Vérifie que l'un des droits de l'utilisateur courant est inclus dans list"""
|
|
if not isinstance(liste, list):
|
|
liste = [liste]
|
|
if obj:
|
|
droits = obj.rights()
|
|
else:
|
|
droits = self.conn.droits
|
|
for d in liste:
|
|
if d in droits:
|
|
return True
|
|
return False
|
|
|
|
def _connected_as(self):
|
|
ret = u"Vous êtes connecté en tant que %s" % self.conn.current_login
|
|
if self.ldap_test:
|
|
ret += u" sur la base de test"
|
|
return ret
|
|
|
|
def check_ldap(self):
|
|
"""Se connecte à la base ldap et vérifie les droits de l'utilisateur courant"""
|
|
self.check_ldap_last = time.time()
|
|
|
|
# S'il y a --test dans les argument, on utilise la base de test
|
|
if self.ldap_test:
|
|
self.conn = lc_ldap.shortcuts.lc_ldap_test()
|
|
else:
|
|
# On ouvre une connexion lc_ldap
|
|
self.conn = lc_ldap.shortcuts.lc_ldap_admin()
|
|
|
|
# On vérifie que l'utilisateur système existe dans ldap (pour la gestion des droits)
|
|
luser=self.conn.search(u'(&(uid=%s)(objectClass=cransAccount))' % self.conn.current_login)
|
|
if not luser:
|
|
sys.stderr.write("L'utilisateur %s n'existe pas dans la base de donnée" % self.conn.current_login)
|
|
sys.exit(1)
|
|
self.conn.droits = [str(d) for d in luser[0]['droits']]
|
|
self.conn.dn = luser[0].dn
|
|
|
|
# Si un nom d'utilisateur est donné sur la ligne de commande
|
|
# et qu'on a les droits nounou, on l'utilise
|
|
if self.custom_user and attributs.nounou in self.conn.droits:
|
|
luser=self.conn.search(u'(&(uid=%s)(objectClass=cransAccount))' % self.custom_user)
|
|
if luser:
|
|
self.conn.current_login = self.custom_user
|
|
self.conn.droits = [str(d) for d in luser[0]['droits']]
|
|
self.conn.dn = luser[0].dn
|
|
a = attributs
|
|
allowed_right = [a.cableur, a.tresorier, a.bureau, a.nounou, a.imprimeur]
|
|
for droit in allowed_right:
|
|
if droit in self.conn.droits:
|
|
break
|
|
else:
|
|
sys.stderr.write(
|
|
u"%s ne possède aucun des droits :\n * %s\nnécessaire à utiliser ce programme\n" % (
|
|
self.conn.current_login,
|
|
'\n * '.join(allowed_right)
|
|
)
|
|
)
|
|
sys.exit(1)
|
|
|
|
@property
|
|
def dialog(self):
|
|
"""
|
|
Renvois l'objet dialog. De plus renouvelle régulièrement la connexion à la base ldap
|
|
"""
|
|
# Tous les self.timeout, on refraichie la connexion ldap
|
|
if time.time() - self.check_ldap_last > self.timeout:
|
|
self.check_ldap()
|
|
return super(Dialog, self).dialog
|
|
|
|
|
|
|
|
def edit_boolean_attributs(self, obj, attribs, title, update_obj, cont, values={}):
|
|
"""
|
|
Permet d'éditer des attributs booléen de l'objet obj listé dans attribs.
|
|
update_obj est le nom du paramètre à mettre à jour dans cont pour passer l'objet modifié
|
|
"""
|
|
# Dictionnaire décrivant quelle est la valeur booléenne à donner à l'absence de l'attribut
|
|
missing = {
|
|
'default' : False, # par défaut, on dit que c'est False
|
|
attributs.dnsIpv6 : True # pour dnsIpv6, c'est True
|
|
}
|
|
choices = [(a.ldap_name, a.legend, 1 if values.get(a.ldap_name, obj[a.ldap_name][0] if obj[a.ldap_name] else missing.get(a, missing['default'])) else 0) for a in attribs]
|
|
def box():
|
|
return self.dialog.checklist("Activier ou désactiver les propriétés suivantes",
|
|
height=0,
|
|
width=0,
|
|
timeout=self.timeout,
|
|
list_height=7,
|
|
choices=choices,
|
|
title=title)
|
|
|
|
def todo(values, obj, attribs, cont):
|
|
# On met à jour chaque attribut si sa valeur à changé
|
|
with self.conn.search(dn=obj.dn, scope=0, mode='rw')[0] as obj:
|
|
for a in attribs:
|
|
if obj[a.ldap_name] and obj[a.ldap_name][0] != values[a.ldap_name]:
|
|
obj[a.ldap_name]=values[a.ldap_name]
|
|
elif not obj[a.ldap_name] and missing.get(a, missing['default']) != values[a.ldap_name]:
|
|
obj[a.ldap_name]=values[a.ldap_name]
|
|
obj.validate_changes()
|
|
obj.history_gen()
|
|
obj.save()
|
|
# On s'en va en mettant à jour dans la continuation la valeur de obj
|
|
raise Continue(cont(**{update_obj:obj}))
|
|
|
|
(code, output) = self.handle_dialog(cont, box)
|
|
|
|
# On transforme la liste des cases dialog cochée en dictionnnaire
|
|
values = dict((a.ldap_name, a.ldap_name in output) for a in attribs)
|
|
|
|
# Une continuation que l'on suivra si quelque chose se passe mal
|
|
retry_cont = TailCall(self.edit_boolean_attributs, obj=obj, update_obj=update_obj, attribs=attribs, title=title, cont=cont, values=values)
|
|
|
|
return self.handle_dialog_result(
|
|
code=code,
|
|
output=output,
|
|
cancel_cont=cont,
|
|
error_cont=retry_cont,
|
|
codes_todo=[([self.dialog.DIALOG_OK], todo, [values, obj, attribs, cont])]
|
|
)
|
|
|
|
|
|
def edit_attributs(self, obj, attr, title, update_obj, cont, tag=None, values=None):
|
|
"""
|
|
Permet d'éditer la liste d'attribut attr de l'objet obj.
|
|
update_obj est le nom du paramètre à mettre à jour dans cont pour passer l'objet modifié
|
|
"""
|
|
|
|
# Il n'y a pas inputmenu dans la lib dialog, du coup, on traite les arguments à la main.
|
|
# Ça reste relativement acceptable comme on utilise la fonction de la lib pour appeler dialog
|
|
def box(values, default_tag):
|
|
cmd = ['--inputmenu', "Édition de l'attribut %s :" % attr, "0", "0", "20"]
|
|
index=0
|
|
for value in values:
|
|
cmd.extend([str(index), str(value)])
|
|
index+=1
|
|
cmd.extend(['new', ''])
|
|
(code, output) = self.dialog._perform(*(cmd,), timeout=self.timeout, title=title, default_item=str(default_tag))
|
|
if code == self.dialog.DIALOG_EXTRA:
|
|
output = output.split(' ', 2)[1:]
|
|
else:
|
|
output = ''
|
|
return (code, output)
|
|
|
|
def todo_extra(output, values, retry_cont):
|
|
tag, value = output
|
|
if tag == 'new':
|
|
if value:
|
|
values.append(value)
|
|
elif value == '':
|
|
values.pop(int(tag))
|
|
else:
|
|
values[int(tag)] = value
|
|
raise Continue(retry_cont(values=values, tag=tag))
|
|
|
|
def todo(obj, values, cont):
|
|
with self.conn.search(dn=obj.dn, scope=0, mode='rw')[0] as obj:
|
|
obj[attr] = [unicode(value, 'utf-8') for value in values]
|
|
obj.validate_changes()
|
|
obj.history_gen()
|
|
obj.save()
|
|
raise Continue(cont(**{update_obj:obj}))
|
|
|
|
if values is None:
|
|
values = [str(a) for a in obj[attr]]
|
|
retry_cont = TailCall(self.edit_attributs, obj=obj, attr=attr, title=title, update_obj=update_obj, cont=cont, values=values)
|
|
(code, output) = self.handle_dialog(cont, box, values, tag)
|
|
|
|
return self.handle_dialog_result(
|
|
code=code,
|
|
output=output,
|
|
cancel_cont=cont,
|
|
error_cont=retry_cont,
|
|
codes_todo=[
|
|
([self.dialog.DIALOG_OK], todo, [obj, values, cont]),
|
|
([self.dialog.DIALOG_EXTRA], todo_extra, [output, values, retry_cont]),
|
|
]
|
|
)
|
|
|
|
|
|
def search(self, objectClassS, title, values={}, cont=None, disable_field=[]):
|
|
"""
|
|
Rechercher des adhérents ou des machines dans la base ldap
|
|
retourne le tuple (code de retour dialog, valeurs entrée par l'utilisateur, liste d'objets trouvés)
|
|
La fonction est découpé en trois partie :
|
|
* affichage dialog et récupération du résultat
|
|
* construction de filtres de recherche ldap et recherches ldap
|
|
* filtre sur les résultats des recherches ldap
|
|
"""
|
|
select_dict = {
|
|
#label attribut ldap search for substring param dialog: line col valeur input-line icol len max-chars
|
|
'Nom' : {'ldap':'nom', 'sub':True, 'params' : [ 1, 1, values.get('nom', ""), 1, 13, 20, 20]},
|
|
'Prénom' : {'ldap':'prenom', 'sub':True, 'params' : [ 2, 1, values.get('prenom', ""), 2, 13, 20, 20]},
|
|
'Téléphone' : {'ldap':'tel', 'sub':True, 'params' : [ 3, 1, values.get('tel', ""), 3, 13, 10, 00]},
|
|
'Chambre' : {'ldap':'chbre','sub':True, 'params' : [ 4, 1, values.get('chbre',""), 4, 13, 05, 00]},
|
|
'aid' : {'ldap' : 'aid', 'sub':False, 'params' : [ 5, 1, values.get('aid',""), 5, 13, 05, 05]},
|
|
'mail' : {'ldap' : 'mail', 'sub':True, 'params' : [ 6, 1, values.get('mail',""), 6, 13, 40, 00]},
|
|
# seconde colone
|
|
'Machine' : {'ldap' : '*', 'sub':True, 'params' : [1, 35, "", 1, 43, 0, 0]},
|
|
'Host' : {'ldap' : 'host', 'sub':True, 'params' : [2, 37, values.get('host',""), 2, 43, 17, 17]},
|
|
'Mac' : {'ldap' : 'macAddress', 'sub':False, 'params' : [3, 37, values.get('macAddress',""), 3, 43, 17, 17]},
|
|
'IP' : {'ldap' : 'ipHostNumber', 'sub':False,'params' : [4, 37, values.get('ipHostNumber',""), 4, 43, 15, 15]},
|
|
'mid' : {'ldap' : 'mid', 'sub':False, 'params' : [5, 37, values.get('mid',""), 5, 43, 5, 5]},
|
|
}
|
|
# On a besoin de l'ordre pour récupérer les valeurs ensuite
|
|
select_adherent = ['Nom', 'Prénom', 'Téléphone', 'Chambre', 'aid', 'mail']
|
|
select_machine = ['Host', 'Mac', 'IP', 'mid']
|
|
if 'club' in objectClassS and not 'adherent' in objectClassS:
|
|
select_dict['cid']=select_dict['aid']
|
|
select_dict['cid']['ldap']='cid'
|
|
select_dict['cid']['params'][2]=values.get('cid', "")
|
|
select_adherent[select_adherent.index('aid')]='cid'
|
|
def box():
|
|
# On met les argument à dialog à la main ici, sinon, c'est difficile de choisir comment mettre une seconde colone
|
|
cmd = ["--mixedform", "Entrez vos paramètres de recherche", '0', '0', '0']
|
|
for key in select_adherent:
|
|
cmd.extend(['%s :' % key] + [str(e) for e in select_dict[key]['params']] + ['2' if key in disable_field else '0'])
|
|
cmd.extend(['Machine :'] + [str(e) for e in select_dict['Machine']['params']] + ['2'])
|
|
for key in select_machine:
|
|
cmd.extend(['%s :' % key] + [str(e) for e in select_dict[key]['params']] + ['2' if key in disable_field else '0'])
|
|
cmd.extend(["Les champs vides sont ignorés.", '7', '1', "", '0', '0', '0', '0', '2' ])
|
|
# On utilise quand même la fonction de la bibliothèques pour passer les arguments
|
|
(code, output) = self.dialog._perform(*(cmd,), timeout=self.timeout, title=title, backtitle="Entrez vos paramètres de recherche")
|
|
if output:
|
|
return (code, output.split('\n')[:-1])
|
|
else: # empty selection
|
|
return (code, [])
|
|
|
|
(code, dialog_values) = self.handle_dialog(cont, box)
|
|
# Si il a appuyé sur annuler ou sur escape, on saute sur la continuation
|
|
if code in (self.dialog.DIALOG_CANCEL, self.dialog.DIALOG_ESC):
|
|
raise Continue(cont)
|
|
else:
|
|
# Transformation de la liste des valeures entrée en dictionnnaire
|
|
dialog_values = dict(zip(select_adherent + select_machine, dialog_values))
|
|
ldap_values = dict([(select_dict[key]['ldap'], value) for key, value in dialog_values.items()])
|
|
|
|
# Construction des filtres ldap pour les adhérents et les machines
|
|
filter_adherent = []
|
|
filter_machine = []
|
|
for (key, value) in dialog_values.items():
|
|
if value:
|
|
if key in select_adherent:
|
|
filter_adherent.append((u"(%s=*%s*)" if select_dict[key]['sub'] and not '*' in value else u"(%s=%s)") % (select_dict[key]['ldap'], unicode(value, 'utf-8')))
|
|
elif key in select_machine:
|
|
filter_machine.append((u"(%s=*%s*)" if select_dict[key]['sub'] and not '*' in value else u"(%s=%s)") % (select_dict[key]['ldap'], unicode(value, 'utf-8')))
|
|
if filter_adherent:
|
|
filter_adherent=u"(&%s)" % "".join(filter_adherent)
|
|
if filter_machine:
|
|
filter_machine=u"(&%s)" % "".join(filter_machine)
|
|
|
|
# Récupération des adhérents et des machines
|
|
adherents=self.conn.search(filter_adherent) if filter_adherent else []
|
|
machines=self.conn.search(filter_machine) if filter_machine else []
|
|
|
|
# Filtrage des machines en fonction des adhérents
|
|
if filter_adherent:
|
|
if filter_machine:
|
|
# Si on filtre sur des adhérent et des machines, on calcule l'intersection
|
|
adherents_dn = set([a.dn for a in adherents])
|
|
machines_f = [m for m in machines if m.parent_dn in adherents_dn]
|
|
else:
|
|
# Sinon on filtre seulement sur les adhérents, récupère les machines des adhérents trouvés
|
|
machines_f = [m for a in adherents for m in a.machines()]
|
|
else:
|
|
# Sinon si on filtre seulement sur des machines
|
|
machines_f = machines
|
|
|
|
# Filtrage des adhérents en fonction des machines
|
|
if filter_machine:
|
|
if filter_adherent:
|
|
# Si on filtre sur des adhérents et des machines, on calcule l'intersection
|
|
machines_dn = set([m.parent_dn for m in machines])
|
|
adherents_f = [a for a in adherents if a.dn in machines_dn]
|
|
else:
|
|
# Sinon on récupères les proprios des machines trouvées
|
|
adherents_f = [m.proprio() for m in machines]
|
|
else:
|
|
# Sinon si on filtre seulement sur des adhérents
|
|
adherents_f = adherents
|
|
|
|
# On filtre sur les objectClassS
|
|
return ldap_values, [ o for objectClass in objectClassS for o in machines_f+adherents_f if objectClass in o['objectClass'] ]
|
|
|
|
@tailcaller
|
|
def select_one(self, items, title, text="Que souhaitez vous faire ?", default_item=None, cont=None):
|
|
"""Fait selectionner un item parmis une liste d'items à l'utisateur"""
|
|
|
|
def box(items, default_item):
|
|
choices=[]
|
|
olist={}
|
|
count = 0
|
|
|
|
# On sépare les item d'items en fonction de leur type
|
|
for o in items:
|
|
olist[o.__class__] = olist.get(o.__class__, []) + [o]
|
|
classes = olist.keys()
|
|
classes.sort()
|
|
default_tag = items.index(default_item) if default_item in items else default_item
|
|
|
|
# On se débrouille pour faire corresponde l'ordre d'affichache des objets
|
|
# et leur ordre dans la liste items. On donne la largeur de l'affichage à la main
|
|
# pour prendre en compte la largeur du widget dialog
|
|
del items[:] # On vide la liste pour la modifier en place
|
|
items_id = {}
|
|
(line, col) = get_screen_size()
|
|
for c in classes:
|
|
items.extend(olist[c])
|
|
items_s = printing.sprint_list(olist[c], col-20).encode('utf-8').split('\n')
|
|
choices.append(("", str(items_s[0])))
|
|
next=1
|
|
if items_s[next:]:
|
|
choices.append(("", str(items_s[next])))
|
|
next+=1
|
|
for i in items_s[next:]:
|
|
if i: # on zap les lignes vides
|
|
choices.append((str(count), str(i)))
|
|
count+=1
|
|
# On laisse une ligne vide pour séparer les listes d'objets de type différent
|
|
choices.append(("", ""))
|
|
|
|
return self.dialog.menu(
|
|
text,
|
|
width=0,
|
|
height=0,
|
|
menu_height=0,
|
|
timeout=self.timeout,
|
|
item_help=0,
|
|
default_item=str(default_tag),
|
|
title=title,
|
|
scrollbar=True,
|
|
choices=choices,
|
|
colors=True)
|
|
|
|
|
|
def todo(tag, items, title, cont, retry_cont):
|
|
# Si l'utilisateur n'a pas choisis une ligne correspondant à quelque chose
|
|
if not tag:
|
|
self.dialog.msgbox("Merci de choisir l'un des item de la liste ou d'annuler", timeout=self.timeout, title="Sélection", width=0, height=0)
|
|
raise Continue(retry_cont)
|
|
# Sinon on retourne l'item choisis
|
|
elif self.confirm_item(items[int(tag)], title):
|
|
return items[int(tag)]
|
|
else:
|
|
raise Continue(cont)
|
|
|
|
(code, tag) = self.handle_dialog(cont, box, items, default_item)
|
|
retry_cont = TailCall(self.select_one, items=items, title=title, default_item=tag, cont=cont)
|
|
|
|
return self.handle_dialog_result(
|
|
code=code,
|
|
output=tag,
|
|
cancel_cont=cont,
|
|
error_cont=retry_cont,
|
|
codes_todo=[([self.dialog.DIALOG_OK], todo, [tag, items, title, cont, retry_cont])]
|
|
)
|
|
|
|
|
|
def confirm_item(self, item, title, defaultno=False, text='', text_bottom="", **params):
|
|
"""Affiche un item et demande si c'est bien celui là que l'on veux (supprimer, éditer, créer,...)"""
|
|
return self.dialog.yesno(
|
|
text + printing.sprint(item, **params) + text_bottom,
|
|
no_collapse=True,
|
|
colors=True,
|
|
no_mouse=True,
|
|
timeout=self.timeout,
|
|
title=title,
|
|
defaultno=defaultno,
|
|
width=0, height=0,
|
|
backtitle="Appuyez sur MAJ pour selectionner du texte"
|
|
) == self.dialog.DIALOG_OK
|
|
|
|
def display_item(self, item, title, **params):
|
|
"""Affiche un item"""
|
|
return self.dialog.msgbox(
|
|
printing.sprint(item, **params),
|
|
no_collapse=True,
|
|
colors=True,
|
|
timeout=self.timeout,
|
|
title=title,
|
|
width=0, height=0,
|
|
backtitle="Appuyez sur MAJ pour selectionner du texte"
|
|
)
|
|
|
|
# On a besoin du décorateur ici car select va retourner un item après avoir
|
|
# possblement traiter plusieurs tailcall
|
|
@tailcaller
|
|
def select(self, objectClassS, title, values={}, cont=None, disable_field=[]):
|
|
"""Permet de choisir un objet adhérent ou machine dans la base ldap"""
|
|
try:
|
|
# On fait effectuer une recherche à l'utilisateur
|
|
values, items = self.search(objectClassS, title, values, cont=cont, disable_field=disable_field)
|
|
# S'il n'y a pas de résultas, on recommence
|
|
if not items:
|
|
self.dialog.msgbox("Aucun Résultat", timeout=self.timeout, title="Recherche", width=0, height=0)
|
|
raise Continue(TailCall(self.select, objectClassS=objectClassS, title=title, values=values, disable_field=disable_field, cont=cont))
|
|
# S'il y a plusieurs résultats
|
|
elif len(items)>1:
|
|
# On en fait choisir un, si c'est une continuation qui est renvoyé, elle est gérée par select
|
|
return self.select_one(items, title, cont=TailCall(self.select, objectClassS=objectClassS, title=title, values=values, disable_field=disable_field, cont=cont))
|
|
# S'il y a exactement 1 résultat à la recherche, on fait confirmer son choix à l'utilisateur
|
|
elif len(items) == 1:
|
|
item=items[0]
|
|
# On fait confirmer son choix à l'utilisateur
|
|
if self.confirm_item(item, title):
|
|
return item
|
|
else:
|
|
raise Continue(TailCall(self.select, objectClassS=objectClassS, title=title, values=values, disable_field=disable_field, cont=cont))
|
|
except self.error_to_raise:
|
|
raise
|
|
except Exception as e:
|
|
self.dialog.msgbox(traceback.format_exc() if self.debug_enable else "%r" % e, timeout=self.timeout, title="Erreur rencontrée", width=0, height=0)
|
|
raise Continue(TailCall(self.select, objectClassS=objectClassS, title=title, values=values, disable_field=disable_field, cont=cont))
|
|
|
|
|
|
@TailCaller
|
|
def main(gc, cont=None):
|
|
"""
|
|
Fonction principale gérant l'appel au menu principal,
|
|
le timeout des écrans dialog et les reconnexion a ldap en cas de perte de la connexion
|
|
"""
|
|
while True:
|
|
try:
|
|
# tant que le timeout est atteint on revient au menu principal
|
|
gc.menu_principal()
|
|
except DialogError as e:
|
|
# Si l'erreur n'est pas due à un timeout, on la propage
|
|
if time.time() - gc.dialog_last_access < gc.timeout:
|
|
raise
|
|
# Si on perd la connexion à ldap, on en ouvre une nouvelle
|
|
except ldap.SERVER_DOWN:
|
|
if gc.dialog.pause(title="Erreur", duration=10, height=9, width=50, text="La connection au serveur ldap à été perdue.\nTentative de reconnexion en cours…") == gc.dialog.DIALOG_OK:
|
|
gc.check_ldap()
|
|
else:
|
|
return
|