From a620f5b5e98c0425e70e58688f10464a1147238f Mon Sep 17 00:00:00 2001 From: Vincent Le Gallic Date: Thu, 16 May 2013 03:53:00 +0200 Subject: [PATCH] =?UTF-8?q?On=20vire=20le=20kludge=20qui=20r=C3=A9cup?= =?UTF-8?q?=C3=A8re=20le=20login=20dans=20la=20variable=20d'environnement.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit C'est plus le rôle d'un wrapper (à venir) que du binding itself. --- lc_ldap.py | 38 +++++++++++++++++++++++---------- test.py | 62 +++++++++++++++++++++++++++++++----------------------- 2 files changed, 63 insertions(+), 37 deletions(-) diff --git a/lc_ldap.py b/lc_ldap.py index 1923ad8..11762d8 100644 --- a/lc_ldap.py +++ b/lc_ldap.py @@ -72,11 +72,6 @@ created = 'created' modified = 'modified' deleted = 'deleted' -#: Pour enregistrer dans l'historique, on a besoin de savoir qui exécute le script -#: Si le script a été exécuté via un sudo, la variable SUDO_USER (l'utilisateur qui a effectué le sudo) -#: est plus pertinente que USER (qui sera root) -current_user = os.getenv("SUDO_USER") or os.getenv("USER") - # Quand on a besoin du fichier de secrets def import_secrets(): """Importe le fichier de secrets""" @@ -131,6 +126,10 @@ class lc_ldap(ldap.ldapobject.LDAPObject, object): le ``dn`` associé à l'uid ``user``, et effectue l'authentification avec ce ``dn`` et ``cred`` - Sinon effectue une authentification anonyme + + Si on ne se binde pas en anonyme, il faut de toutes façons fournir un ``user``. + Il sert à savoir qui fait les modifications (utilisé dans les champs historique). + Si ``test`` est à ``True``, on se connecte à la base de test sur ``vo``. """ if test: @@ -153,12 +152,19 @@ class lc_ldap(ldap.ldapobject.LDAPObject, object): self.droits = self.search_s(dn, ldap.SCOPE_BASE, attrlist=['droits'])[0][1].get('droits', []) if dn == admin_dn: self.droits += [attributs.nounou] + current_user = self.search("uid=%s" % user) + if len(current_user) != 1: + raise ValueError("L'utilisateur %s n'est pas présent dans la base en *1* exemplaire." % user) + else: + self.current_user = current_user[0] else: self.conn = self.simple_bind_s() self.dn = None self.droits = [] - def ressuscite(self, ldif_file, login=current_user): + def ressuscite(self, ldif_file, login=None): + if login is None: + login = self.conn.current_user["uid"][0] ldif={} for line in open(ldif_file).readlines(): line = line.split(':',1) @@ -258,6 +264,8 @@ class lc_ldap(ldap.ldapobject.LDAPObject, object): """Crée une nouvelle machine: ``realm`` peut être: fil, fil-v6, wifi, wifi-v6, adm, gratuit, personnel-ens, special --Partiellement implémenté""" + if login is None: + login = self.current_user["uid"][0].value #adm, serveurs, bornes, wifi, adherents, gratuit ou personnel-ens""" owner = self.search('objectClass=*', dn=parent, scope=0)[0] @@ -295,7 +303,6 @@ class lc_ldap(ldap.ldapobject.LDAPObject, object): # Tout doit disparaître !! machine = self._create_entity('mid=%s,%s' % (ldif['mid'][0], parent), ldif) - login = login or current_user machine.history_add(login, "inscription") if machine.may_be(created, self.droits + self._check_parent(machine.dn)): return machine @@ -580,6 +587,7 @@ class CransLdapObject(object): services.services_to_restart(self.conn, {}, self._modifs) def bury(self, comm, login): + """Sauvegarde l'objet dans un fichier dans le cimetière.""" self.history_add(login, u"destruction (%s)" % comm) self.save() #On produit un ldif @@ -594,7 +602,10 @@ class CransLdapObject(object): f.write(ldif.encode("UTF-8")) f.close() - def delete(self, comm="", login=current_user): + def delete(self, comm="", login=None): + """Supprime l'objet de la base LDAP. Appelle :py:meth:`CransLdapObject.bury`.""" + if login is None: + login = self.conn.current_user["uid"][0].value if self.mode not in ['w', 'rw']: raise EnvironmentError("Objet en lecture seule, réessayer en lecture/écriture") if not self.may_be(deleted, self.conn.droits): @@ -887,7 +898,8 @@ class proprio(CransLdapObject): def update_solde(self, diff, comment=u"", login=None): """Modifie le solde du proprio. diff peut être négatif ou positif.""" - login = login or current_user + if login is None: + login = self.conn.current_user["uid"][0].value assert isinstance(diff, int) or isinstance(diff, float) assert isinstance(comment, unicode) @@ -911,7 +923,10 @@ class proprio(CransLdapObject): m._proprio = self return self._machines - def delete(self, comm="", login=current_user): + def delete(self, comm="", login=None): + """Supprimme l'objet de la base LDAP. En supprimant ses enfants d'abord.""" + if login is None: + login = self.conn.current_user["uid"][0].value if self.mode not in ['w', 'rw']: raise EnvironmentError("Objet en lecture seule, réessayer en lecture/écriture") if not self.may_be(deleted, self.conn.droits): @@ -1081,10 +1096,11 @@ class machineWifi(machine): def set_ipv4(self, login=None): u"""Définie une ipv4 à la machine si elle n'est possède pas déjà une.""" + if login is None: + login = self.conn.current_user["uid"][0].value if not 'ipHostNumber' in self.attrs.keys() or not self['ipHostNumber']: rid = self['rid']=[ unicode(self.conn._find_id('rid', range(config.rid['wifi'][0], config.rid['wifi'][1]))) ] ip = self['ipHostNumber'] = [ unicode(crans_utils.ip4_of_rid(int(rid[0]))) ] - login = login or current_user self.history_add(login, u"rid") self.history_add(login, u"ipHostNumber (N/A -> %s)" % ip[0]) self.save() diff --git a/test.py b/test.py index bd78c90..dc29057 100755 --- a/test.py +++ b/test.py @@ -4,11 +4,18 @@ import psycopg2 import traceback import random import string +import os +import sys +sys.path.append("/usr/scripts/") import lc_ldap -import config -from affich_tools import anim, OK, cprint, ERREUR +from gestion.affich_tools import anim, OK, cprint, ERREUR show_traceback = False +if "--traceback" in sys.argv: + show_traceback = True +fast_test = False +if "--fast" in sys.argv: + fast_test = True def randomMAC(): mac = [ 0x00, 0x16, 0x3e, @@ -117,7 +124,8 @@ def tests_machines(parent_dn, realm_list, ipsec=False): print "Test de la librairie lc_ldap" print "Connection" -conn= lc_ldap.lc_ldap_test() +current_user = os.getenv("SUDO_USER") or os.getenv("USER") +conn= lc_ldap.lc_ldap_test(user=current_user) print u"Tests effectués avec les droits %s " % ', '.join(conn.droits) @@ -126,30 +134,32 @@ print u"Tests effectués avec les droits %s " % ', '.join(conn.droits) # les adhérents et de toutes les machines # ############################################### -anim("Appel de allMachinesAdherents en rw") -try: - machines, adherents = conn.allMachinesAdherents(mode='rw') -except EnvironmentError as error: - print ERREUR - if show_traceback: print traceback.format_exc() - else: print "\t%s" % error - exit(1) -except Exception as error: - print ERREUR - if show_traceback: print traceback.format_exc() - else: print "\t%r" % error - anim("Fallback en ro") - machines, adherents = conn.allMachinesAdherents() - print OK -else: - print OK -machines_attrs_keys = keys_of_list_of_dict(machines, 'machines') -print "Test des attributs des machines" -test_list_of_dict(machines_attrs_keys, machines) +if not fast_test: + anim("Appel de allMachinesAdherents en rw") + try: + machines, adherents = conn.allMachinesAdherents(mode='rw') + except EnvironmentError as error: + print ERREUR + if show_traceback: print traceback.format_exc() + else: print "\t%s" % error + exit(1) + except Exception as error: + print ERREUR + if show_traceback: print traceback.format_exc() + else: print "\t%r" % error + anim("Fallback en ro") + machines, adherents = conn.allMachinesAdherents() + print OK + else: + print OK -adherents_attrs_keys = keys_of_list_of_dict(adherents, 'adherents') -print "Test des attributs des adhérents" -test_list_of_dict(adherents_attrs_keys, adherents) + machines_attrs_keys = keys_of_list_of_dict(machines, 'machines') + print "Test des attributs des machines" + test_list_of_dict(machines_attrs_keys, machines) + + adherents_attrs_keys = keys_of_list_of_dict(adherents, 'adherents') + print "Test des attributs des adhérents" + test_list_of_dict(adherents_attrs_keys, adherents) print "Test de création d'objets"