On vire le kludge qui récupère le login dans la variable d'environnement.

C'est plus le rôle d'un wrapper (à venir) que du binding itself.
This commit is contained in:
Vincent Le Gallic 2013-05-16 03:53:00 +02:00
parent 3663d877cc
commit a620f5b5e9
2 changed files with 63 additions and 37 deletions

View file

@ -72,11 +72,6 @@ created = 'created'
modified = 'modified'
deleted = 'deleted'
#: Pour enregistrer dans l'historique, on a besoin de savoir qui exécute le script
#: Si le script a été exécuté via un sudo, la variable SUDO_USER (l'utilisateur qui a effectué le sudo)
#: est plus pertinente que USER (qui sera root)
current_user = os.getenv("SUDO_USER") or os.getenv("USER")
# Quand on a besoin du fichier de secrets
def import_secrets():
"""Importe le fichier de secrets"""
@ -131,6 +126,10 @@ class lc_ldap(ldap.ldapobject.LDAPObject, object):
le ``dn`` associé à l'uid ``user``, et effectue l'authentification
avec ce ``dn`` et ``cred``
- Sinon effectue une authentification anonyme
Si on ne se binde pas en anonyme, il faut de toutes façons fournir un ``user``.
Il sert à savoir qui fait les modifications (utilisé dans les champs historique).
Si ``test`` est à ``True``, on se connecte à la base de test sur ``vo``.
"""
if test:
@ -153,12 +152,19 @@ class lc_ldap(ldap.ldapobject.LDAPObject, object):
self.droits = self.search_s(dn, ldap.SCOPE_BASE, attrlist=['droits'])[0][1].get('droits', [])
if dn == admin_dn:
self.droits += [attributs.nounou]
current_user = self.search("uid=%s" % user)
if len(current_user) != 1:
raise ValueError("L'utilisateur %s n'est pas présent dans la base en *1* exemplaire." % user)
else:
self.current_user = current_user[0]
else:
self.conn = self.simple_bind_s()
self.dn = None
self.droits = []
def ressuscite(self, ldif_file, login=current_user):
def ressuscite(self, ldif_file, login=None):
if login is None:
login = self.conn.current_user["uid"][0]
ldif={}
for line in open(ldif_file).readlines():
line = line.split(':',1)
@ -258,6 +264,8 @@ class lc_ldap(ldap.ldapobject.LDAPObject, object):
"""Crée une nouvelle machine: ``realm`` peut être:
fil, fil-v6, wifi, wifi-v6, adm, gratuit, personnel-ens, special
--Partiellement implémenté"""
if login is None:
login = self.current_user["uid"][0].value
#adm, serveurs, bornes, wifi, adherents, gratuit ou personnel-ens"""
owner = self.search('objectClass=*', dn=parent, scope=0)[0]
@ -295,7 +303,6 @@ class lc_ldap(ldap.ldapobject.LDAPObject, object):
# Tout doit disparaître !!
machine = self._create_entity('mid=%s,%s' % (ldif['mid'][0], parent), ldif)
login = login or current_user
machine.history_add(login, "inscription")
if machine.may_be(created, self.droits + self._check_parent(machine.dn)):
return machine
@ -580,6 +587,7 @@ class CransLdapObject(object):
services.services_to_restart(self.conn, {}, self._modifs)
def bury(self, comm, login):
"""Sauvegarde l'objet dans un fichier dans le cimetière."""
self.history_add(login, u"destruction (%s)" % comm)
self.save()
#On produit un ldif
@ -594,7 +602,10 @@ class CransLdapObject(object):
f.write(ldif.encode("UTF-8"))
f.close()
def delete(self, comm="", login=current_user):
def delete(self, comm="", login=None):
"""Supprime l'objet de la base LDAP. Appelle :py:meth:`CransLdapObject.bury`."""
if login is None:
login = self.conn.current_user["uid"][0].value
if self.mode not in ['w', 'rw']:
raise EnvironmentError("Objet en lecture seule, réessayer en lecture/écriture")
if not self.may_be(deleted, self.conn.droits):
@ -887,7 +898,8 @@ class proprio(CransLdapObject):
def update_solde(self, diff, comment=u"", login=None):
"""Modifie le solde du proprio. diff peut être négatif ou positif."""
login = login or current_user
if login is None:
login = self.conn.current_user["uid"][0].value
assert isinstance(diff, int) or isinstance(diff, float)
assert isinstance(comment, unicode)
@ -911,7 +923,10 @@ class proprio(CransLdapObject):
m._proprio = self
return self._machines
def delete(self, comm="", login=current_user):
def delete(self, comm="", login=None):
"""Supprimme l'objet de la base LDAP. En supprimant ses enfants d'abord."""
if login is None:
login = self.conn.current_user["uid"][0].value
if self.mode not in ['w', 'rw']:
raise EnvironmentError("Objet en lecture seule, réessayer en lecture/écriture")
if not self.may_be(deleted, self.conn.droits):
@ -1081,10 +1096,11 @@ class machineWifi(machine):
def set_ipv4(self, login=None):
u"""Définie une ipv4 à la machine si elle n'est possède pas déjà une."""
if login is None:
login = self.conn.current_user["uid"][0].value
if not 'ipHostNumber' in self.attrs.keys() or not self['ipHostNumber']:
rid = self['rid']=[ unicode(self.conn._find_id('rid', range(config.rid['wifi'][0], config.rid['wifi'][1]))) ]
ip = self['ipHostNumber'] = [ unicode(crans_utils.ip4_of_rid(int(rid[0]))) ]
login = login or current_user
self.history_add(login, u"rid")
self.history_add(login, u"ipHostNumber (N/A -> %s)" % ip[0])
self.save()

62
test.py
View file

@ -4,11 +4,18 @@ import psycopg2
import traceback
import random
import string
import os
import sys
sys.path.append("/usr/scripts/")
import lc_ldap
import config
from affich_tools import anim, OK, cprint, ERREUR
from gestion.affich_tools import anim, OK, cprint, ERREUR
show_traceback = False
if "--traceback" in sys.argv:
show_traceback = True
fast_test = False
if "--fast" in sys.argv:
fast_test = True
def randomMAC():
mac = [ 0x00, 0x16, 0x3e,
@ -117,7 +124,8 @@ def tests_machines(parent_dn, realm_list, ipsec=False):
print "Test de la librairie lc_ldap"
print "Connection"
conn= lc_ldap.lc_ldap_test()
current_user = os.getenv("SUDO_USER") or os.getenv("USER")
conn= lc_ldap.lc_ldap_test(user=current_user)
print u"Tests effectués avec les droits %s " % ', '.join(conn.droits)
@ -126,30 +134,32 @@ print u"Tests effectués avec les droits %s " % ', '.join(conn.droits)
# les adhérents et de toutes les machines #
###############################################
anim("Appel de allMachinesAdherents en rw")
try:
machines, adherents = conn.allMachinesAdherents(mode='rw')
except EnvironmentError as error:
print ERREUR
if show_traceback: print traceback.format_exc()
else: print "\t%s" % error
exit(1)
except Exception as error:
print ERREUR
if show_traceback: print traceback.format_exc()
else: print "\t%r" % error
anim("Fallback en ro")
machines, adherents = conn.allMachinesAdherents()
print OK
else:
print OK
machines_attrs_keys = keys_of_list_of_dict(machines, 'machines')
print "Test des attributs des machines"
test_list_of_dict(machines_attrs_keys, machines)
if not fast_test:
anim("Appel de allMachinesAdherents en rw")
try:
machines, adherents = conn.allMachinesAdherents(mode='rw')
except EnvironmentError as error:
print ERREUR
if show_traceback: print traceback.format_exc()
else: print "\t%s" % error
exit(1)
except Exception as error:
print ERREUR
if show_traceback: print traceback.format_exc()
else: print "\t%r" % error
anim("Fallback en ro")
machines, adherents = conn.allMachinesAdherents()
print OK
else:
print OK
adherents_attrs_keys = keys_of_list_of_dict(adherents, 'adherents')
print "Test des attributs des adhérents"
test_list_of_dict(adherents_attrs_keys, adherents)
machines_attrs_keys = keys_of_list_of_dict(machines, 'machines')
print "Test des attributs des machines"
test_list_of_dict(machines_attrs_keys, machines)
adherents_attrs_keys = keys_of_list_of_dict(adherents, 'adherents')
print "Test des attributs des adhérents"
test_list_of_dict(adherents_attrs_keys, adherents)
print "Test de création d'objets"