#!/bin/bash /usr/scripts/python.sh # -*- coding: utf-8 -*- u""" Copyright (C) Valentin Samir Licence : GPLv3 """ import sys import time import ldap import traceback if '/usr/scripts' not in sys.path: sys.path.append('/usr/scripts') from pythondialog import Dialog from pythondialog import error as DialogError from gestion.affich_tools import get_screen_size, coul import lc_ldap.shortcuts import lc_ldap.objets as objets import lc_ldap.attributs as attributs import lc_ldap.printing as printing import CPS from CPS import TailCall, tailcaller, Continue, TailCaller class Dialog(CPS.Dialog): def __init__(self, debug_enable=False, ldap_test=False, custom_user=None): super(Dialog, self).__init__(debug_enable=debug_enable) # On initialise le moteur de rendu en spécifiant qu'on va faire du dialog printing.template(dialog=True) self.ldap_test = ldap_test self.custom_user = custom_user self.check_ldap() def has_right(self, liste, obj=None): """Vérifie que l'un des droits de l'utilisateur courant est inclus dans list""" if not isinstance(liste, list): liste = [liste] if obj: droits = obj.rights() else: droits = self.conn.droits for d in liste: if d in droits: return True return False def _connected_as(self): ret = u"Vous êtes connecté en tant que %s" % self.conn.current_login if self.ldap_test: ret += u" sur la base de test" return ret def check_ldap(self): """Se connecte à la base ldap et vérifie les droits de l'utilisateur courant""" self.check_ldap_last = time.time() # S'il y a --test dans les argument, on utilise la base de test if self.ldap_test: self.conn = lc_ldap.shortcuts.lc_ldap_test() else: # On ouvre une connexion lc_ldap self.conn = lc_ldap.shortcuts.lc_ldap_admin() # On vérifie que l'utilisateur système existe dans ldap (pour la gestion des droits) luser=self.conn.search(u'(&(uid=%s)(objectClass=cransAccount))' % self.conn.current_login) if not luser: sys.stderr.write("L'utilisateur %s n'existe pas dans la base de donnée" % self.conn.current_login) sys.exit(1) self.conn.droits = [str(d) for d in luser[0]['droits']] self.conn.dn = luser[0].dn # Si un nom d'utilisateur est donné sur la ligne de commande # et qu'on a les droits nounou, on l'utilise if self.custom_user and attributs.nounou in self.conn.droits: luser=self.conn.search(u'(&(uid=%s)(objectClass=cransAccount))' % self.custom_user) if luser: self.conn.current_login = self.custom_user self.conn.droits = [str(d) for d in luser[0]['droits']] self.conn.dn = luser[0].dn a = attributs allowed_right = [a.cableur, a.tresorier, a.bureau, a.nounou, a.imprimeur] for droit in allowed_right: if droit in self.conn.droits: break else: sys.stderr.write( u"%s ne possède aucun des droits :\n * %s\nnécessaire à utiliser ce programme\n" % ( self.conn.current_login, '\n * '.join(allowed_right) ) ) sys.exit(1) @property def dialog(self): """ Renvois l'objet dialog. De plus renouvelle régulièrement la connexion à la base ldap """ # Tous les self.timeout, on refraichie la connexion ldap if time.time() - self.check_ldap_last > self.timeout: self.check_ldap() return super(Dialog, self).dialog def edit_boolean_attributs(self, obj, attribs, title, update_obj, cont, values={}): """ Permet d'éditer des attributs booléen de l'objet obj listé dans attribs. update_obj est le nom du paramètre à mettre à jour dans cont pour passer l'objet modifié """ # Dictionnaire décrivant quelle est la valeur booléenne à donner à l'absence de l'attribut missing = { 'default' : False, # par défaut, on dit que c'est False attributs.dnsIpv6 : True # pour dnsIpv6, c'est True } choices = [(a.ldap_name, a.legend, 1 if values.get(a.ldap_name, obj[a.ldap_name][0] if obj[a.ldap_name] else missing.get(a, missing['default'])) else 0) for a in attribs] def box(): return self.dialog.checklist("Activier ou désactiver les propriétés suivantes", height=0, width=0, timeout=self.timeout, list_height=7, choices=choices, title=title) def todo(values, obj, attribs, cont): # On met à jour chaque attribut si sa valeur à changé with self.conn.search(dn=obj.dn, scope=0, mode='rw')[0] as obj: for a in attribs: if obj[a.ldap_name] and obj[a.ldap_name][0] != values[a.ldap_name]: obj[a.ldap_name]=values[a.ldap_name] elif not obj[a.ldap_name] and missing.get(a, missing['default']) != values[a.ldap_name]: obj[a.ldap_name]=values[a.ldap_name] obj.validate_changes() obj.history_gen() obj.save() # On s'en va en mettant à jour dans la continuation la valeur de obj raise Continue(cont(**{update_obj:obj})) (code, output) = self.handle_dialog(cont, box) # On transforme la liste des cases dialog cochée en dictionnnaire values = dict((a.ldap_name, a.ldap_name in output) for a in attribs) # Une continuation que l'on suivra si quelque chose se passe mal retry_cont = TailCall(self.edit_boolean_attributs, obj=obj, update_obj=update_obj, attribs=attribs, title=title, cont=cont, values=values) return self.handle_dialog_result( code=code, output=output, cancel_cont=cont, error_cont=retry_cont, codes_todo=[([self.dialog.DIALOG_OK], todo, [values, obj, attribs, cont])] ) def edit_attributs(self, obj, attr, title, update_obj, cont, tag=None, values=None): """ Permet d'éditer la liste d'attribut attr de l'objet obj. update_obj est le nom du paramètre à mettre à jour dans cont pour passer l'objet modifié """ # Il n'y a pas inputmenu dans la lib dialog, du coup, on traite les arguments à la main. # Ça reste relativement acceptable comme on utilise la fonction de la lib pour appeler dialog def box(values, default_tag): cmd = ['--inputmenu', "Édition de l'attribut %s :" % attr, "0", "0", "20"] index=0 for value in values: cmd.extend([str(index), str(value)]) index+=1 cmd.extend(['new', '']) (code, output) = self.dialog._perform(*(cmd,), timeout=self.timeout, title=title, default_item=str(default_tag)) if code == self.dialog.DIALOG_EXTRA: output = output.split(' ', 2)[1:] else: output = '' return (code, output) def todo_extra(output, values, retry_cont): tag, value = output if tag == 'new': if value: values.append(value) elif value == '': values.pop(int(tag)) else: values[int(tag)] = value raise Continue(retry_cont(values=values, tag=tag)) def todo(obj, values, cont): with self.conn.search(dn=obj.dn, scope=0, mode='rw')[0] as obj: obj[attr] = [unicode(value, 'utf-8') for value in values] obj.validate_changes() obj.history_gen() obj.save() raise Continue(cont(**{update_obj:obj})) if values is None: values = [str(a) for a in obj[attr]] retry_cont = TailCall(self.edit_attributs, obj=obj, attr=attr, title=title, update_obj=update_obj, cont=cont, values=values) (code, output) = self.handle_dialog(cont, box, values, tag) return self.handle_dialog_result( code=code, output=output, cancel_cont=cont, error_cont=retry_cont, codes_todo=[ ([self.dialog.DIALOG_OK], todo, [obj, values, cont]), ([self.dialog.DIALOG_EXTRA], todo_extra, [output, values, retry_cont]), ] ) def search(self, objectClassS, title, values={}, cont=None, disable_field=[]): """ Rechercher des adhérents ou des machines dans la base ldap retourne le tuple (code de retour dialog, valeurs entrée par l'utilisateur, liste d'objets trouvés) La fonction est découpé en trois partie : * affichage dialog et récupération du résultat * construction de filtres de recherche ldap et recherches ldap * filtre sur les résultats des recherches ldap """ select_dict = { #label attribut ldap search for substring param dialog: line col valeur input-line icol len max-chars 'Nom' : {'ldap':'nom', 'sub':True, 'params' : [ 1, 1, values.get('nom', ""), 1, 13, 20, 20]}, 'Prénom' : {'ldap':'prenom', 'sub':True, 'params' : [ 2, 1, values.get('prenom', ""), 2, 13, 20, 20]}, 'Téléphone' : {'ldap':'tel', 'sub':True, 'params' : [ 3, 1, values.get('tel', ""), 3, 13, 10, 00]}, 'Chambre' : {'ldap':'chbre','sub':True, 'params' : [ 4, 1, values.get('chbre',""), 4, 13, 05, 00]}, 'aid' : {'ldap' : 'aid', 'sub':False, 'params' : [ 5, 1, values.get('aid',""), 5, 13, 05, 05]}, 'mail' : {'ldap' : 'mail', 'sub':True, 'params' : [ 6, 1, values.get('mail',""), 6, 13, 40, 00]}, # seconde colone 'Machine' : {'ldap' : '*', 'sub':True, 'params' : [1, 35, "", 1, 43, 0, 0]}, 'Host' : {'ldap' : 'host', 'sub':True, 'params' : [2, 37, values.get('host',""), 2, 43, 17, 17]}, 'Mac' : {'ldap' : 'macAddress', 'sub':False, 'params' : [3, 37, values.get('macAddress',""), 3, 43, 17, 17]}, 'IP' : {'ldap' : 'ipHostNumber', 'sub':False,'params' : [4, 37, values.get('ipHostNumber',""), 4, 43, 15, 15]}, 'mid' : {'ldap' : 'mid', 'sub':False, 'params' : [5, 37, values.get('mid',""), 5, 43, 5, 5]}, } # On a besoin de l'ordre pour récupérer les valeurs ensuite select_adherent = ['Nom', 'Prénom', 'Téléphone', 'Chambre', 'aid', 'mail'] select_machine = ['Host', 'Mac', 'IP', 'mid'] if 'club' in objectClassS and not 'adherent' in objectClassS: select_dict['cid']=select_dict['aid'] select_dict['cid']['ldap']='cid' select_dict['cid']['params'][2]=values.get('cid', "") select_adherent[select_adherent.index('aid')]='cid' def box(): # On met les argument à dialog à la main ici, sinon, c'est difficile de choisir comment mettre une seconde colone cmd = ["--mixedform", "Entrez vos paramètres de recherche", '0', '0', '0'] for key in select_adherent: cmd.extend(['%s :' % key] + [str(e) for e in select_dict[key]['params']] + ['2' if key in disable_field else '0']) cmd.extend(['Machine :'] + [str(e) for e in select_dict['Machine']['params']] + ['2']) for key in select_machine: cmd.extend(['%s :' % key] + [str(e) for e in select_dict[key]['params']] + ['2' if key in disable_field else '0']) cmd.extend(["Les champs vides sont ignorés.", '7', '1', "", '0', '0', '0', '0', '2' ]) # On utilise quand même la fonction de la bibliothèques pour passer les arguments (code, output) = self.dialog._perform(*(cmd,), timeout=self.timeout, title=title, backtitle="Entrez vos paramètres de recherche") if output: return (code, output.split('\n')[:-1]) else: # empty selection return (code, []) (code, dialog_values) = self.handle_dialog(cont, box) # Si il a appuyé sur annuler ou sur escape, on saute sur la continuation if code in (self.dialog.DIALOG_CANCEL, self.dialog.DIALOG_ESC): raise Continue(cont) else: # Transformation de la liste des valeures entrée en dictionnnaire dialog_values = dict(zip(select_adherent + select_machine, dialog_values)) ldap_values = dict([(select_dict[key]['ldap'], value) for key, value in dialog_values.items()]) # Construction des filtres ldap pour les adhérents et les machines filter_adherent = [] filter_machine = [] for (key, value) in dialog_values.items(): if value: if key in select_adherent: filter_adherent.append((u"(%s=*%s*)" if select_dict[key]['sub'] and not '*' in value else u"(%s=%s)") % (select_dict[key]['ldap'], unicode(value, 'utf-8'))) elif key in select_machine: filter_machine.append((u"(%s=*%s*)" if select_dict[key]['sub'] and not '*' in value else u"(%s=%s)") % (select_dict[key]['ldap'], unicode(value, 'utf-8'))) if filter_adherent: filter_adherent=u"(&%s)" % "".join(filter_adherent) if filter_machine: filter_machine=u"(&%s)" % "".join(filter_machine) # Récupération des adhérents et des machines adherents=self.conn.search(filter_adherent) if filter_adherent else [] machines=self.conn.search(filter_machine) if filter_machine else [] # Filtrage des machines en fonction des adhérents if filter_adherent: if filter_machine: # Si on filtre sur des adhérent et des machines, on calcule l'intersection adherents_dn = set([a.dn for a in adherents]) machines_f = [m for m in machines if m.parent_dn in adherents_dn] else: # Sinon on filtre seulement sur les adhérents, récupère les machines des adhérents trouvés machines_f = [m for a in adherents for m in a.machines()] else: # Sinon si on filtre seulement sur des machines machines_f = machines # Filtrage des adhérents en fonction des machines if filter_machine: if filter_adherent: # Si on filtre sur des adhérents et des machines, on calcule l'intersection machines_dn = set([m.parent_dn for m in machines]) adherents_f = [a for a in adherents if a.dn in machines_dn] else: # Sinon on récupères les proprios des machines trouvées adherents_f = [m.proprio() for m in machines] else: # Sinon si on filtre seulement sur des adhérents adherents_f = adherents # On filtre sur les objectClassS return ldap_values, [ o for objectClass in objectClassS for o in machines_f+adherents_f if objectClass in o['objectClass'] ] @tailcaller def select_one(self, items, title, text="Que souhaitez vous faire ?", default_item=None, cont=None): """Fait selectionner un item parmis une liste d'items à l'utisateur""" def box(items, default_item): choices=[] olist={} count = 0 # On sépare les item d'items en fonction de leur type for o in items: olist[o.__class__] = olist.get(o.__class__, []) + [o] classes = olist.keys() classes.sort() default_tag = items.index(default_item) if default_item in items else default_item # On se débrouille pour faire corresponde l'ordre d'affichache des objets # et leur ordre dans la liste items. On donne la largeur de l'affichage à la main # pour prendre en compte la largeur du widget dialog del items[:] # On vide la liste pour la modifier en place items_id = {} (line, col) = get_screen_size() for c in classes: items.extend(olist[c]) items_s = printing.sprint_list(olist[c], col-20).encode('utf-8').split('\n') choices.append(("", str(items_s[0]))) next=1 if items_s[next:]: choices.append(("", str(items_s[next]))) next+=1 for i in items_s[next:]: if i: # on zap les lignes vides choices.append((str(count), str(i))) count+=1 # On laisse une ligne vide pour séparer les listes d'objets de type différent choices.append(("", "")) return self.dialog.menu( text, width=0, height=0, menu_height=0, timeout=self.timeout, item_help=0, default_item=str(default_tag), title=title, scrollbar=True, choices=choices, colors=True) def todo(tag, items, title, cont, retry_cont): # Si l'utilisateur n'a pas choisis une ligne correspondant à quelque chose if not tag: self.dialog.msgbox("Merci de choisir l'un des item de la liste ou d'annuler", timeout=self.timeout, title="Sélection", width=0, height=0) raise Continue(retry_cont) # Sinon on retourne l'item choisis elif self.confirm_item(items[int(tag)], title): return items[int(tag)] else: raise Continue(cont) (code, tag) = self.handle_dialog(cont, box, items, default_item) retry_cont = TailCall(self.select_one, items=items, title=title, default_item=tag, cont=cont) return self.handle_dialog_result( code=code, output=tag, cancel_cont=cont, error_cont=retry_cont, codes_todo=[([self.dialog.DIALOG_OK], todo, [tag, items, title, cont, retry_cont])] ) def confirm_item(self, item, title, defaultno=False, text='', text_bottom="", **params): """Affiche un item et demande si c'est bien celui là que l'on veux (supprimer, éditer, créer,...)""" return self.dialog.yesno( text + printing.sprint(item, **params) + text_bottom, no_collapse=True, colors=True, no_mouse=True, timeout=self.timeout, title=title, defaultno=defaultno, width=0, height=0, backtitle="Appuyez sur MAJ pour selectionner du texte" ) == self.dialog.DIALOG_OK def display_item(self, item, title, **params): """Affiche un item""" return self.dialog.msgbox( printing.sprint(item, **params), no_collapse=True, colors=True, timeout=self.timeout, title=title, width=0, height=0, backtitle="Appuyez sur MAJ pour selectionner du texte" ) # On a besoin du décorateur ici car select va retourner un item après avoir # possblement traiter plusieurs tailcall @tailcaller def select(self, objectClassS, title, values={}, cont=None, disable_field=[]): """Permet de choisir un objet adhérent ou machine dans la base ldap""" try: # On fait effectuer une recherche à l'utilisateur values, items = self.search(objectClassS, title, values, cont=cont, disable_field=disable_field) # S'il n'y a pas de résultas, on recommence if not items: self.dialog.msgbox("Aucun Résultat", timeout=self.timeout, title="Recherche", width=0, height=0) raise Continue(TailCall(self.select, objectClassS=objectClassS, title=title, values=values, disable_field=disable_field, cont=cont)) # S'il y a plusieurs résultats elif len(items)>1: # On en fait choisir un, si c'est une continuation qui est renvoyé, elle est gérée par select return self.select_one(items, title, cont=TailCall(self.select, objectClassS=objectClassS, title=title, values=values, disable_field=disable_field, cont=cont)) # S'il y a exactement 1 résultat à la recherche, on fait confirmer son choix à l'utilisateur elif len(items) == 1: item=items[0] # On fait confirmer son choix à l'utilisateur if self.confirm_item(item, title): return item else: raise Continue(TailCall(self.select, objectClassS=objectClassS, title=title, values=values, disable_field=disable_field, cont=cont)) except self.error_to_raise: raise except Exception as e: self.dialog.msgbox(traceback.format_exc() if self.debug_enable else "%r" % e, timeout=self.timeout, title="Erreur rencontrée", width=0, height=0) raise Continue(TailCall(self.select, objectClassS=objectClassS, title=title, values=values, disable_field=disable_field, cont=cont)) @TailCaller def main(gc, cont=None): """ Fonction principale gérant l'appel au menu principal, le timeout des écrans dialog et les reconnexion a ldap en cas de perte de la connexion """ while True: try: # tant que le timeout est atteint on revient au menu principal gc.menu_principal() except DialogError as e: # Si l'erreur n'est pas due à un timeout, on la propage if time.time() - gc.dialog_last_access < gc.timeout: raise # Si on perd la connexion à ldap, on en ouvre une nouvelle except ldap.SERVER_DOWN: if gc.dialog.pause(title="Erreur", duration=10, height=9, width=50, text="La connection au serveur ldap à été perdue.\nTentative de reconnexion en cours…") == gc.dialog.DIALOG_OK: gc.check_ldap() else: return