[global] Passage à tout unicode et quelques corrections mineures.

* Typos
 * Docstrings unicode
 * Erreurs unicode
 * Ajout de _post_delete et _post_create
 * ?
This commit is contained in:
Pierre-Elliott Bécue 2013-05-27 23:18:24 +02:00
parent b2abbef3b4
commit 60ded9f830
7 changed files with 244 additions and 237 deletions

View file

@ -46,6 +46,8 @@ import string
from unicodedata import normalize
from crans_utils import format_tel, format_mac, mailexist, validate_name, ip4_of_rid, ip6_of_mac
sys.path.append("/usr/scripts/")
import cranslib.deprecated
sys.path.append("/usr/scripts/gestion")
import config
import config.impression
@ -122,13 +124,14 @@ def attrify(val, attr, conn, Parent=None):
return val
else:
if not isinstance(val, unicode):
cranslib.deprecated.usage("attrify ne devrait être appelé qu'avec des unicode (%r)" % val, level=3)
val = val.decode('utf-8')
return AttributeFactory.get(attr, fallback=Attr)(val, conn, Parent)
class AttrsDict(dict):
def __init__(self, conn, ldif={}, Parent=None):
super(AttrsDict, self).__init__(ldif)
def __init__(self, conn, uldif={}, Parent=None):
super(AttrsDict, self).__init__(uldif)
self._conn = conn
self._parent = Parent
self._iterator = None
@ -211,7 +214,7 @@ class Attr(object):
def parse_value(self, val):
"""Transforme l'attribut pour travailler avec notre validateur
Le ldif est en dépendance car à certains endroits, il peut servir
(par exemple, pour l'ipv6, ou l'ipv4..."""
(par exemple, pour l'ipv6, ou l'ipv4"""
self.value = val
def __str__(self):
@ -221,15 +224,16 @@ class Attr(object):
return str(self.__class__) + " : " + repr(self.value)
def __unicode__(self):
# XXX - Vérifier que cette méthode produit un objet parsable
assert isinstance(self.value, unicode)
if isinstance(self.value, unicode):
return self.value
else:
return unicode(self.value)
def check_uniqueness(self, liste_exclue):
"""Vérifie l'unicité dans la base de la valeur (``mailAlias``, ``chbre``,
etc...)"""
attr = self.__class__.__name__
if str(self) in liste_exclue:
if unicode(self) in liste_exclue:
return
if self.unique:
res = self.conn.search('%s=%s' % (attr, str(self)))
@ -438,7 +442,7 @@ class mail(Attr):
singlevalue = False
optional = False
unique = True
legend = "Le mail de l'adhérent"
legend = "Adresse mail de l'adhérent"
can_modify = [soi, nounou, cableur]
category = 'mail'
ldap_name = "mail"
@ -456,7 +460,7 @@ class mail(Attr):
res = smtp.getreply()[0] in [250, 252]
smtp.close()
except:
raise ValueError(u'Serveur de mail injoignable')
raise ValueError('Serveur de mail injoignable')
if res:
raise ValueError("Le mail %s est déjà pris." % (str(self)))
@ -466,8 +470,8 @@ class mail(Attr):
raise ValueError("Le mail %s est déjà pris." % (str(self)))
def parse_value(self, mail):
if not re.match('^[-_.0-9A-Za-z]+@([A-Za-z0-9]{1}[A-Za-z0-9-_]+[.])+[a-z]{2,6}$', mail):
raise ValueError("Adresse mail invalide (%s)" % mail)
if not re.match(u'^[-_.0-9A-Za-z]+@([A-Za-z0-9]{1}[A-Za-z0-9-_]+[.])+[a-z]{2,6}$', mail):
raise ValueError("%s invalide %r" % (self.legend, mail))
self.value = mail
@ -482,9 +486,7 @@ class canonicalAlias(mail):
def parse_value(self, mail):
mail = u".".join([ a.capitalize() for a in mail.split(u'.', 1) ])
if not re.match('^[-_.0-9A-Za-z]+@([A-Za-z0-9]{1}[A-Za-z0-9-_]+[.])+[a-z]{2,6}$', mail):
raise ValueError("Alias mail invalide (%s)" % mail)
self.value = mail
super(canonicalAlias, self).parse_value(mail)
@crans_attribute
class mailAlias(mail):
@ -498,9 +500,7 @@ class mailAlias(mail):
def parse_value(self, mail):
mail = mail.lower()
if not re.match('^[-_.0-9A-Za-z]+@([A-Za-z0-9]{2}[A-Za-z0-9-_]+[.])+[a-z]{2,6}$', mail):
raise ValueError("Alias mail invalide (%r)" % mail)
self.value = mail
super(mailAlias, self).parse_value(mail)
@crans_attribute
class mailExt(mail):
@ -514,9 +514,7 @@ class mailExt(mail):
def parse_value(self, mail):
mail = mail.lower()
if not re.match('^[-_.0-9A-Za-z]+@([A-Za-z0-9]{2}[A-Za-z0-9-_]+[.])+[a-z]{2,6}$', mail):
raise ValueError("Mail externe invalide (%r)" % mail)
self.value = mail
super(mailExt, self).parse_value(mail)
@crans_attribute
class mailInvalide(boolAttr):
@ -545,10 +543,6 @@ class etudes(Attr):
category = 'perso'
ldap_name = "etudes"
def parse_value(self, etudes):
# who cares
self.value = etudes
@crans_attribute
class chbre(Attr):
singlevalue = True
@ -564,20 +558,16 @@ class chbre(Attr):
if u'club' in [str(o) for o in self.parent['objectClass']]:
if chambre in annuaires_pg.locaux_clubs():
self.value = chambre
return
else:
raise ValueError("Club devrait etre en XclN, pas en %r" % chambre)
if chambre in (u"EXT", u"????"):
elif chambre in (u"EXT", u"????"):
self.value = chambre
return
else:
try:
annuaires_pg.chbre_prises(chambre[0], chambre[1:])
except NameError:
import annuaires_pg_test
annuaires_pg_test.chbre_prises(chambre[0], chambre[1:])
self.value = chambre
@crans_attribute
@ -617,11 +607,15 @@ class solde(Attr):
# on évite les dépassements, sauf si on nous dit de ne pas vérifier
#if not (float(solde) >= config.impression.decouvert and float(solde) <= 1024.):
# raise ValueError("Solde invalide: %r" % solde)
self.value = solde
self.value = float(solde)
def __unicode__(self):
return u"%.2f" % self.value
class dnsAttr(Attr):
category = 'dns'
ldap_name = "dnsAttr"
def parse_value(self, val):
val = val.lower()
names = val.split('.')
@ -810,9 +804,6 @@ class positionBorne(Attr):
optional = True
ldap_name = "positionBorne"
def parse_value(self, pos):
self.value = unicode(pos)
@crans_attribute
class nvram(Attr):
legend = u"Configuration speciale"
@ -820,10 +811,6 @@ class nvram(Attr):
can_modify = [nounou]
ldap_name = "nvram"
def parse_value(self, nvr):
# XXX - on fait quoi ici ?
self.value = nvr
class portAttr(Attr):
singlevalue = False
optional = True
@ -861,7 +848,7 @@ class portAttr(Attr):
@crans_attribute
class portTCPout(portAttr):
legend = u'Port TCP ouvert vers l\'extérieur'
legend = u"Port TCP ouvert vers l'extérieur"
ldap_name = "portTCPout"
@crans_attribute
@ -902,10 +889,6 @@ class prise(Attr):
can_modify = [nounou]
ldap_name = "prise"
def parse_value(self, prise):
### Tu es Beau, je te fais confiance
self.value = prise
@crans_attribute
class cid(intAttr):
singlevalue = True
@ -1058,7 +1041,7 @@ class charteMA(Attr):
ldap_name = "charteMA"
def parse_value(self, charteSignee):
if charteSignee.upper() not in ["TRUE", "FALSE"]:
if charteSignee.upper() not in [u"TRUE", u"FALSE"]:
raise ValueError("La charte MA est soit TRUE ou FALSE, pas %r" % charteSignee)
self.value = charteSignee.upper()
@ -1071,8 +1054,8 @@ class homeDirectory(Attr):
ldap_name = "homeDirectory"
def parse_value(self, home):
uid = str(self.parent['uid'][0])
if uid.startswith('club-'):
uid = unicode(self.parent['uid'][0])
if uid.startswith(u'club-'):
uid = uid.split('-', 1)[1]
if home != u'/home/%s' % uid and home != u'/home/club/%s' % uid:
raise ValueError("Le répertoire personnel n'est pas bon: %r (devrait être %r ou %r)" % (home, '/home/%s' % self.parent['uid'][0], '/home/club/%s' % self.parent['uid'][0]))
@ -1089,30 +1072,29 @@ class loginShell(Attr):
def parse_value(self, shell):
#with open('/etc/shells') as f:
# shells = [ l.strip() for l in f.readlines() if not l.startswith('#') ]
shells = ['/bin/csh',
'/bin/sh',
'/usr/bin/es',
'/usr/bin/ksh',
'/bin/ksh',
'/usr/bin/rc',
'/usr/bin/tcsh',
'/bin/tcsh',
'/usr/bin/esh',
'/bin/bash',
'/bin/rbash',
'/bin/zsh',
'/usr/bin/zsh',
'/usr/bin/screen',
'/bin/dash',
'/usr/bin/rssh',
'/usr/local/bin/disconnect_shell',
'/usr/scripts/surveillance/disconnect_shell',
'/usr/local/bin/badPassSh',
'/usr/bin/passwd',
'/bin/false',
'/bin//zsh',
'/usr/sbin/nologin'
'']
shells = [u'/bin/csh',
u'/bin/sh',
u'/usr/bin/es',
u'/usr/bin/ksh',
u'/bin/ksh',
u'/usr/bin/rc',
u'/usr/bin/tcsh',
u'/bin/tcsh',
u'/usr/bin/esh',
u'/bin/bash',
u'/bin/rbash',
u'/bin/zsh',
u'/usr/bin/zsh',
u'/usr/bin/screen',
u'/bin/dash',
u'/usr/bin/rssh',
u'/usr/local/bin/disconnect_shell',
u'/usr/scripts/surveillance/disconnect_shell',
u'/usr/local/bin/badPassSh',
u'/usr/bin/passwd',
u'/bin/false',
u'/usr/sbin/nologin'
u'']
if shell not in shells:
raise ValueError("Shell %r invalide" % shell)
self.value = shell
@ -1142,9 +1124,6 @@ class gecos(Attr):
category = 'id'
ldap_name = "gecos"
def parse_value(self, gecos):
self.value = gecos
@crans_attribute
class sshFingerprint(Attr):
singlevalue = False
@ -1197,7 +1176,7 @@ class controle(Attr):
def parse_value(self, ctrl):
if ctrl not in [u"", u"c", u"p", u"cp", u"pc"]:
raise ValueError("control peut prendre les valeurs [c][p]")
raise ValueError("Contrôle peut prendre les valeurs [c][p]")
self.value = ctrl
@crans_attribute

View file

@ -59,7 +59,7 @@ def ip4_of_rid(rid):
try:
return netaddr.IPAddress(config.rid_machines_speciales[rid])
except KeyError:
return ValueError(u"Machine speciale inconnue: %d" % rid)
return ValueError("Machine speciale inconnue: %d" % rid)
return netaddr.IPAddress(netaddr.IPNetwork(config.NETs[net][0]).first + rid - plage[0])
@ -97,10 +97,10 @@ def ip6_of_mac(mac, rid):
# En théorie, format_mac est inutile, car on ne devrait avoir
# que des mac formatées.
mac = format_mac(mac).replace(':', '')
mac = format_mac(mac).replace(u':', u'')
# hex retourne un str, donc on concatène, suivant la RFC
euid64v6 = hex(int(mac[:2], 16)^0b00000010) + mac[2:6] + 'fffe' + mac[6:12]
euid64v6 = hex(int(mac[:2], 16)^0b00000010) + mac[2:6] + u'fffe' + mac[6:12]
# fil-v6 ou wifi-v6, we don't care
if net != "special":
@ -133,7 +133,7 @@ def mailexist(mail):
return r
def format_ldap_time(tm):
u"""Formatage des dates provenant de la base LDAP
"""Formatage des dates provenant de la base LDAP
Transforme la date YYYYMMDDHHMMSS.XXXXXXZ (UTC)
en date DD/MM/YY HH:MM (local)"""
tm_st = time.strptime(tm.split('.')[0], "%Y%m%d%H%M%S") # struct_time UTC
@ -142,17 +142,17 @@ def format_ldap_time(tm):
return time.strftime("%d/%m/%Y %H:%M", tm_st)
def format_mac(mac):
u""" Formatage des adresses mac
""" Formatage des adresses mac
Transforme une adresse pour obtenir la forme xx:xx:xx:xx:xx:xx
Retourne la mac formatée.
"""
mac = netaddr.EUI(mac)
if not mac:
raise ValueError(u"MAC nulle interdite\nIl doit être possible de modifier l'adresse de la carte.")
return str(mac).replace('-', ':')
return unicode(str(mac).replace('-', ':'))
def format_tel(tel):
u"""Formatage des numéros de téléphone
"""Formatage des numéros de téléphone
Transforme un numéro de téléphone pour ne contenir que des chiffres
(00ii... pour les numéros internationaux)
Retourne le numéro formaté.
@ -162,8 +162,9 @@ def format_tel(tel):
tel_f = u"00" + tel_f[1:]
if u"(0)" in tel_f:
tel_f = tel_f.replace(u"(0)", u"")
# \D = non-digit
tel_f = re.sub(r'\D', '', tel_f)
return tel_f
return unicode(tel_f)
def validate_name(value, more_chars=""):
"""Valide un nom: ie un unicode qui contient lettres, espaces et
@ -172,7 +173,7 @@ def validate_name(value, more_chars=""):
normalize('NFKD', value).encode('ASCII', 'ignore')):
return unicode(value)
else:
raise ValueError("Nom invalide ('%s')" % value)
raise ValueError("Nom invalide (%r)" % value)
def process_status(pid):
"""

View file

@ -41,12 +41,6 @@ import re
import ldap
## import de /usr/scripts/
if not "/usr/scripts/" in sys.path:
sys.path.append('/usr/scripts/')
import gestion.config as config
## import locaux
import crans_utils
import attributs
@ -54,6 +48,12 @@ import objets
import ldap_locks
import variables
## import de /usr/scripts/
if not "/usr/scripts/" in sys.path:
sys.path.append('/usr/scripts/')
import gestion.config as config
# A priori, ldif_to_uldif et ldif_to_cldif sont obsolètes,
# du fait de l'apparition de AttrsDict dans attributs.py
def ldif_to_uldif(ldif):
@ -169,7 +169,8 @@ class lc_ldap(ldap.ldapobject.LDAPObject, object):
ldap_res = self.search_ext_s(dn, scope, filterstr, sizelimit=sizelimit)
ret = []
for dn, ldif in ldap_res:
ret.append(objets.new_cransldapobject(self, dn, mode, ldif))
uldif = ldif_to_uldif(ldif)
ret.append(objets.new_cransldapobject(self, dn, mode, uldif))
return ret
def allMachinesAdherents(self, mode='ro'):
@ -183,13 +184,13 @@ class lc_ldap(ldap.ldapobject.LDAPObject, object):
for dn, attrs in self.search_s(variables.base_dn, scope=2):
# On crée les listes des machines et propriétaires
if dn.startswith('mid='): # les machines
m = objets.new_cransldapobject(self, dn, mode, ldif = attrs)
m = objets.new_cransldapobject(self, dn, mode, uldif=ldif_to_uldif(attrs))
parent_dn = dn.split(',', 1)[1]
if not machines.has_key(parent_dn):
machines[parent_dn] = []
machines[parent_dn].append(m)
elif (dn.startswith('aid=') or dn.startswith('cid=') or dn == variables.base_dn) and not parent.has_key(dn):
parent[dn] = objets.new_cransldapobject(self, dn, mode, ldif = attrs)
parent[dn] = objets.new_cransldapobject(self, dn, mode, uldif=ldif_to_uldif(attrs))
allmachines = []
for dn, mlist in machines.iteritems(): # on associe propriétaires et machines
parent[dn]._machines = mlist
@ -214,7 +215,7 @@ class lc_ldap(ldap.ldapobject.LDAPObject, object):
_,adherents = self.allMachinesAdherents(mode)
return adherents
def newMachine(self, parent, realm, ldif, login=None):
def newMachine(self, parent, realm, uldif, login=None):
"""Crée une nouvelle machine: ``realm`` peut être:
fil, fil-v6, wifi, wifi-v6, adm, gratuit, personnel-ens, special
--Partiellement implémenté"""
@ -224,75 +225,76 @@ class lc_ldap(ldap.ldapobject.LDAPObject, object):
owner = self.search('objectClass=*', dn=parent, scope=0)[0]
if realm in ["adm", "serveurs", "serveurs-v6", "adm-v6"]:
ldif['objectClass'] = ['machineCrans']
uldif['objectClass'] = [u'machineCrans']
assert isinstance(owner, objets.AssociationCrans)
elif realm == "bornes":
ldif['objectClass'] = ['borneWifi']
uldif['objectClass'] = [u'borneWifi']
assert isinstance(owner, objets.AssociationCrans)
elif realm in ["wifi", "wifi-v6"]:
ldif['objectClass'] = ['machineWifi']
uldif['objectClass'] = [u'machineWifi']
assert isinstance(owner, objets.adherent) or isinstance(owner, objets.club)
elif realm in ["adherents", "fil-v6", "personnel-ens"]:
ldif['objectClass'] = ['machineFixe']
elif realm in ["adherents", "adherents-v6", "personnel-ens"]:
uldif['objectClass'] = [u'machineFixe']
assert isinstance(owner, objets.adherent) or isinstance(owner, objets.club)
else: raise ValueError("Realm inconnu: %r" % realm)
else:
raise ValueError("Realm inconnu: %r" % realm)
# On récupère la plage des mids
plage = xrange( *(config.rid[realm]))
# On récupère le premier id libre dans la plages s'il n'est pas
# déjà précisé dans le ldiff
rid = ldif.setdefault('rid', [ str(self._find_id('rid', plage)) ])
rid = uldif.setdefault('rid', [unicode(self._find_id('rid', plage)) ])
# La machine peut-elle avoir une ipv4 ?
if 'v6' not in realm:
ldif['ipHostNumber'] = [ str(crans_utils.ip4_of_rid(int(rid[0]))) ]
ldif['ip6HostNumber'] = [ str(crans_utils.ip6_of_mac(ldif['macAddress'][0], int(rid[0]))) ]
uldif['ipHostNumber'] = [ unicode(crans_utils.ip4_of_rid(int(rid[0]))) ]
uldif['ip6HostNumber'] = [ unicode(crans_utils.ip6_of_mac(uldif['macAddress'][0], int(rid[0]))) ]
# Mid
ldif['mid'] = [ str(self._find_id('mid')) ]
uldif['mid'] = [ unicode(self._find_id('mid')) ]
# Tout doit disparaître !!
machine = self._create_entity('mid=%s,%s' % (ldif['mid'][0], parent), ldif)
machine.history_add(login, "inscription")
machine = self._create_entity('mid=%s,%s' % (uldif['mid'][0], parent), uldif)
machine.history_add(login, u"inscription")
if machine.may_be(variables.created, self.droits + self._check_parent(machine.dn)):
return machine
else:
raise EnvironmentError("Vous n'avez pas le droit de créer cette machine.")
def newAdherent(self, ldif):
def newAdherent(self, uldif):
"""Crée un nouvel adhérent"""
aid = ldif.setdefault('aid', [ str(self._find_id('aid')) ])
ldif['objectClass'] = ['adherent']
adherent = self._create_entity('aid=%s,%s' % (aid[0], variables.base_dn), ldif)
aid = uldif.setdefault('aid', [ unicode(self._find_id('aid')) ])
uldif['objectClass'] = [u'adherent']
adherent = self._create_entity('aid=%s,%s' % (aid[0], variables.base_dn), uldif)
if adherent.may_be(variables.created, self.droits):
return adherent
else:
raise EnvironmentError("Vous n'avez pas le droit de créer cet adhérent.")
def newClub(self, ldif):
def newClub(self, uldif):
"""Crée un nouveau club"""
cid = ldif.setdefault('cid', [ str(self._find_id('cid')) ])
ldif['objectClass'] = ['club']
club = self._create_entity('cid=%s,%s' % (cid[0], variables.base_dn), ldif)
cid = uldif.setdefault('cid', [ unicode(self._find_id('cid')) ])
uldif['objectClass'] = [u'club']
club = self._create_entity('cid=%s,%s' % (cid[0], variables.base_dn), uldif)
if club.may_be(variables.created, self.droits):
return club
else:
raise EnvironmentError("Vous n'avez pas le droit de créer cet adhérent.")
def newFacture(self, ldif):
def newFacture(self, uldif):
"""Crée une nouvelle facture
--Non implémenté !"""
raise NotImplementedError()
def _create_entity(self, dn, ldif):
def _create_entity(self, dn, uldif):
'''Crée une nouvelle entité ldap avec le dn ``dn`` et les
attributs de ``ldif``. Attention, ldif doit contenir des
données encodées.'''
return objets.new_cransldapobject(self, dn, 'rw', ldif)
return objets.new_cransldapobject(self, dn, 'rw', uldif)
def _find_id(self, attr, plage=None):
'''Trouve un id libre. Si une plage est fournie, cherche

153
objets.py
View file

@ -43,16 +43,9 @@ import sys
import re
import datetime
import time
import ldap
from ldap.modlist import addModlist, modifyModlist
## import de /usr/scripts/
if not "/usr/scripts/" in sys.path:
sys.path.append('/usr/scripts/')
import gestion.config as config
## import locaux
import lc_ldap
import crans_utils
@ -61,10 +54,17 @@ import ldap_locks
import services
import variables
## import de /usr/scripts/
if not "/usr/scripts/" in sys.path:
sys.path.append('/usr/scripts/')
import gestion.config as config
from gestion.gen_confs.dhcpd_new import dydhcp
#: Champs à ignorer dans l'historique
HIST_IGNORE_FIELDS = ["modifiersName", "entryCSN", "modifyTimestamp", "historique"]
def new_cransldapobject(conn, dn, mode='ro', ldif = None):
def new_cransldapobject(conn, dn, mode='ro', uldif=None):
"""Crée un objet :py:class:`CransLdapObject` en utilisant la classe correspondant à
l'``objectClass`` du ``ldif``
--pour usage interne à la librairie uniquement !"""
@ -75,8 +75,8 @@ def new_cransldapobject(conn, dn, mode='ro', ldif = None):
classe = AssociationCrans
elif dn == variables.invite_dn:
classe = BaseInvites
elif ldif:
classe = ObjectFactory.get(ldif['objectClass'][0])
elif uldif:
classe = ObjectFactory.get(uldif['objectClass'][0])
else:
res = conn.search_s(dn, 0)
if not res:
@ -84,7 +84,7 @@ def new_cransldapobject(conn, dn, mode='ro', ldif = None):
_, attrs = res[0]
classe = ObjectFactory.get(attrs['objectClass'][0])
return classe(conn, dn, mode, ldif)
return classe(conn, dn, mode, uldif)
class CransLdapObject(object):
"""Classe de base des objets :py:class:`CransLdap`.
@ -98,10 +98,10 @@ class CransLdapObject(object):
attribs = []
def __init__(self, conn, dn, mode='ro', ldif = None):
def __init__(self, conn, dn, mode='ro', uldif=None):
'''
Créée une instance d'un objet Crans (machine, adhérent,
etc...) à ce ``dn``, si ``ldif`` est précisé, n'effectue pas de
etc...) à ce ``dn``, si ``uldif`` est précisé, n'effectue pas de
recherche dans la base ldap.
'''
@ -116,25 +116,21 @@ class CransLdapObject(object):
self.dn = dn
orig = {}
if ldif:
self.attrs = attributs.AttrsDict(self.conn, ldif, Parent=self)
self._modifs = attributs.AttrsDict(self.conn, ldif, Parent=self)
orig = ldif
if uldif:
self.attrs = attributs.AttrsDict(self.conn, uldif, Parent=self)
self._modifs = attributs.AttrsDict(self.conn, uldif, Parent=self)
elif dn != variables.base_dn:
res = self.conn.search_s(dn, 0)
if not res:
raise ValueError ('objet inexistant: %s' % dn)
self.dn, res_attrs = res[0]
self.dn, ldif = res[0]
# L'objet sortant de la base ldap, on ne fait pas de vérifications sur
# l'état des données.
self.attrs = attributs.AttrsDict(self.conn, res_attrs, Parent=self)
# Pour test en cas de mode w ou rw
orig = res[0][1]
self._modifs = attributs.AttrsDict(self.conn, res[0][1], Parent=self)
uldif = lc_ldap.ldif_to_ldif(ldif)
self.attrs = attributs.AttrsDict(self.conn, uldif, Parent=self)
self._modifs = attributs.AttrsDict(self.conn, uldif, Parent=self)
if mode in ['w', 'rw']:
if not self.may_be(variables.modified, self.conn.droits + self.conn._check_parent(dn) + self.conn._check_self(dn)):
@ -142,55 +138,63 @@ class CransLdapObject(object):
self.mode = mode
# Je m'interroge sur la pertinence de cette partie, je pense qu'elle n'est
# pas utile. -- PEB 27/01/2013
if mode in ['w', 'rw']:
### Vérification que `λv. str(Attr(v))` est bien une projection
### C'est-à-dire que si on str(Attr(str(Attr(v)))) on retombe sur str(Attr(v))
oldif = orig
nldif = self.attrs.to_ldif()
# Vérification que `λv. str(Attr(v))` est bien une projection
# C'est-à-dire que si on str(Attr(str(Attr(v)))) on retombe sur str(Attr(v))
oldif = lc_ldap.ldif_to_uldif(self.attrs.to_ldif())
nldif = lc_ldap.ldif_to_uldif(attributs.AttrsDict(self.conn, lc_ldap.ldif_to_uldif(self.attrs.to_ldif()), Parent=self).to_ldif())
for attr, vals in oldif.items():
if nldif[attr] != vals:
for v in nldif[attr]:
if v in vals:
vals.remove(v)
nvals = [nldif[attr][v.index(v)] for v in vals ]
nvals = [nldif[attr][vals.index(v)] for v in vals ]
raise EnvironmentError("λv. str(Attr(v)) n'est peut-être pas une projection (ie non idempotente):", attr, nvals, vals)
# def _get_fields(self):
# """Renvoie la liste des champs LDAP de l'objet"""
# return self.attribs
# attribs = property(_get_fields)
def history_add(self, login, chain):
"""Ajoute une ligne à l'historique de l'objet.
###ATTENTION : C'est un kludge pour pouvoir continuer à faire "comme avant",
### mais on devrait tout recoder pour utiliser l'historique LDAP"""
assert isinstance(login, str) or isinstance(login, unicode)
assert isinstance(chain, str) or isinstance(chain, unicode)
assert isinstance(login, unicode)
assert isinstance(chain, unicode)
new_line = "%s, %s : %s" % (time.strftime("%d/%m/%Y %H:%M"), login, chain)
new_line = u"%s, %s : %s" % (time.strftime("%d/%m/%Y %H:%M"), login, chain)
# Attention, le __setitem__ est surchargé, mais pas .append sur l'historique
self["historique"] = self.get("historique", []) + [new_line]
def _check_optionnal(self, comment):
"""Vérifie que les attributs qui ne sont pas optionnels sont effectivement peuplés."""
objet = self.ldap_name
for attribut in self.attribs:
if not attribut.optional:
nom_attr = attribut.ldap_name
if len(self._modifs.get(nom_attr, [])) <= 0:
raise attributs.OptionalError("L'objet %s que vous %s doit posséder au moins un attribut %s" % (objet, comment, nom_attr))
def _post_creation(self):
"""Fonction qui effectue quelques tâches lorsque la création est
faite"""
pass
def _post_deletion(self):
"""Fonction qui effectue quelques tâches lorsque la création est
faite"""
pass
def create(self):
"""Crée l'objet dans la base ldap, cette méthode vise à faire en sorte que
l'objet se crée lui-même, si celui qui essaye de le modifier a les droits
de le faire."""
objet = self.__class__.__name__
for attribut in self.attribs:
if not attribut.optional:
nom_attr = attribut.__name__
if len(self._modifs.get(nom_attr, [])) <= 0:
raise attributs.OptionalError("L'objet %s que vous créez doit posséder au moins un attribut %s" % (objet, nom_attr))
self._check_optionnal(comment="créez")
# Création de la requête LDAP
modlist = addModlist(self._modifs.to_ldif())
# Requête LDAP de création de l'objet
self.conn.add_s(self.dn, modlist)
services.services_to_restart(self.conn, {}, self._modifs)
self._post_creation()
def bury(self, comm, login):
"""Sauvegarde l'objet dans un fichier dans le cimetière."""
@ -202,7 +206,6 @@ class CransLdapObject(object):
for value in self.attrs[key]:
ldif += u"%s: %s\n" % (key, value)
import datetime
file = "%s %s" % (datetime.datetime.now(), self.dn)
f = open('/home/cimetiere_lc/%s/%s' % (self['objectClass'][0], file.replace(' ', '_')), 'w')
f.write(ldif.encode("UTF-8"))
@ -218,6 +221,7 @@ class CransLdapObject(object):
raise EnvironmentError("Vous n'avez pas le droit de supprimer %s." % self.dn)
self.bury(comm, login)
self.conn.delete_s(self.dn)
self._post_deletion()
services.services_to_restart(self.conn, self.attrs, {})
def save(self):
@ -227,13 +231,7 @@ class CransLdapObject(object):
if self.mode not in ['w', 'rw']:
raise EnvironmentError("Objet en lecture seule, réessayer en lecture/écriture")
objet = self.__class__.__name__
for attribut in self.attribs:
if not attribut.optional:
nom_attr = attribut.__name__
if len(self._modifs.get(nom_attr, [])) <= 0:
raise attributs.OptionalError("L'objet %s que vous créez doit posséder au moins un attribut %s" % (objet, nom_attr))
self._check_optionnal(comment="modifiez")
# On récupère la liste des modifications
modlist = self.get_modlist()
@ -246,7 +244,9 @@ class CransLdapObject(object):
services.services_to_restart(self.conn, self.attrs, self._modifs)
# Vérification des modifications
self.attrs = attributs.AttrsDict(self.conn, self.conn.search_s(self.dn, 0)[0][1], Parent=self)
old_ldif = self.conn.search_s(self.dn, ldap.SCOPE_BASE)[0][1]
old_uldif = lc_ldap.ldif_to_uldif(old_ldif)
self.attrs = attributs.AttrsDict(self.conn, old_uldif, Parent=self)
differences = []
# On fait les différences entre les deux dicos
for attr in set(self.attrs.keys()).union(set(self._modifs.keys())):
@ -317,7 +317,7 @@ class CransLdapObject(object):
attrs_before_verif = [ attributs.attrify(val, attr, self.conn, Parent=self) for val in values ]
if attr in self.attrs.keys():
for attribut in attrs_before_verif:
attribut.check_uniqueness([str(content) for content in self.attrs[attr]])
attribut.check_uniqueness([content.value for content in self.attrs[attr]])
# On groupe les attributs précédents, et les nouveaux
mixed_attrs = attrs_before_verif + self.attrs[attr]
@ -390,7 +390,7 @@ class CransLdapObject(object):
return blacklist_liste
def blacklist(self, sanction, commentaire, debut="now", fin = '-'):
u"""
"""
Blacklistage de la ou de toutes la machines du propriétaire
* debut et fin sont le nombre de secondes depuis epoch
* pour un début ou fin immédiate mettre now
@ -502,6 +502,7 @@ class proprio(CransLdapObject):
return bool_carte
return True
# XXX - To Delete
def update_solde(self, diff, comment=u"", login=None):
"""Modifie le solde du proprio. diff peut être négatif ou positif."""
if login is None:
@ -578,16 +579,28 @@ class machine(CransLdapObject):
black.extend(filter((lambda bl: bl.is_actif()), attrs.get("blacklist",[])))
return black
def _post_creation(self):
"""Fonction qui effectue quelques tâches lorsque la création est
faite"""
if self._proprio:
if self._proprio._machines:
self._proprio._machines.append(self)
def _post_deletion(self):
"""Fonction qui effectue quelques tâches lorsque l'on veut effacer"""
if self._proprio:
if self._proprio._machines:
self._proprio._machines.remove(self)
class AssociationCrans(proprio):
u""" Association crans (propriétaire particulier)."""
def save(self):
raise EnvironmentError("AssociationCrans.save(): done.")
pass
def ressuscite(self, comm, login):
raise EnvironmentError("Ressusciter le Crans ? Hum…")
pass
def delete(self, comm, login):
raise EnvironmentError("Casser le Crans ? Hum…")
pass
class BaseInvites(proprio):
@ -609,13 +622,11 @@ class adherent(proprio):
def __init__(self, conn, dn, mode='ro', ldif = None):
super(adherent, self).__init__(conn, dn, mode, ldif)
if u'cransAccount' in [ str(o) for o in self['objectClass']]:
if u'cransAccount' in [ unicode(o) for o in self['objectClass']]:
self.attribs = self.attribs + [attributs.uid, attributs.canonicalAlias, attributs.solde,
attributs.contourneGreylist, attributs.derniereConnexion,
attributs.homepageAlias, attributs.mailAlias, attributs.loginShell ]
def compte(self, login = None, uidNumber=0, hash_pass = '', shell=config.login_shell):
u"""Renvoie le nom du compte crans. S'il n'existe pas, et que uid
est précisé, le crée."""
@ -707,8 +718,9 @@ class machineWifi(machine):
self.history_add(login, u"rid")
self.history_add(login, u"ipHostNumber (N/A -> %s)" % ip[0])
self.save()
from gen_confs.dhcpd_new import dydhcp
dhcp=dydhcp()
for server in config.dhcp_servers:
dhcp=dydhcp(server)
dhcp.add_host(str(self['ipHostNumber'][0]), str(self['macAddress'][0]), str(self['host'][0]))
@crans_object
@ -730,6 +742,16 @@ class borneWifi(machine):
attributs.prise, attributs.positionBorne, attributs.nvram]
ldap_name = "borneWifi"
@crans_object
class switchCrans(machine):
can_be_by = { variables.created: [attributs.nounou],
variables.modified: [attributs.nounou],
variables.deleted: [attributs.nounou],
}
attribs = machine.attribs + [attributs.nombrePrises]
ldap_name = "switchCrans"
@crans_object
class facture(CransLdapObject):
can_be_by = { variables.created: [attributs.nounou, attributs.bureau, attributs.cableur],
@ -742,3 +764,4 @@ class facture(CransLdapObject):
@crans_object
class service(CransLdapObject):
ldap_name = "service"

View file

@ -8,6 +8,10 @@ import objets
import variables
import config
from gen_confs.dhcpd_new import dydhcp
import sys
if not '/usr/scripts' in sys.path:
sys.path.append('/usr/scripts')
import gestion.config as config
# liste des attributs dont dépend un service
services_to_attrs = {}
@ -211,20 +215,16 @@ def services_to_restart(conn, old_attrs={}, new_attrs={}):
# Cas du dhcp
if attr.__class__ in services_to_attrs['dhcp']:
for server in config.dhcp_servers:
dhcp=dydhcp(server)
if old_attrs.get('ipHostNumber', []) and old_attrs.get('macAddress', []):
if new_attrs.get('ipHostNumber', []) and new_attrs.get('macAddress', []):
if str(old_attrs['ipHostNumber'][0]) != str(new_attrs['ipHostNumber'][0]) or str(old_attrs['macAddress'][0]) != str(new_attrs['macAddress'][0]):
for server in config.dhcp_servers:
dhcp=dydhcp(server)
dhcp.del_host(str(old_attrs['ipHostNumber'][0]), str(old_attrs['macAddress'][0]))
dhcp.add_host(str(new_attrs['ipHostNumber'][0]), str(new_attrs['macAddress'][0]), str(new_attrs['host'][0]))
else:
for server in config.dhcp_servers:
dhcp=dydhcp(server)
dhcp.del_host(str(old_attrs['ipHostNumber'][0]), str(old_attrs['macAddress'][0]))
elif new_attrs.get('ipHostNumber', []) and new_attrs.get('macAddress', []):
for server in config.dhcp_servers:
dhcp=dydhcp(server)
dhcp.add_host(str(new_attrs['ipHostNumber'][0]), str(new_attrs['macAddress'][0]), str(new_attrs['host'][0]))
if service in services_to_time.keys():

36
test.py
View file

@ -9,15 +9,15 @@ import string
import os
import sys
## import dans /usr/scripts/
sys.path.append("/usr/scripts/")
from gestion.affich_tools import anim, OK, cprint, ERREUR
## import locaux
import lc_ldap
import shortcuts
import variables
## import dans /usr/scripts/
sys.path.append("/usr/scripts/")
from gestion.affich_tools import anim, OK, cprint, ERREUR
show_traceback = False
if "--traceback" in sys.argv:
show_traceback = True
@ -30,28 +30,28 @@ def randomMAC():
random.randint(0x00, 0x7f),
random.randint(0x00, 0xff),
random.randint(0x00, 0xff) ]
return ':'.join(map(lambda x: "%02x" % x, mac))
return u':'.join(map(lambda x: u"%02x" % x, mac))
def randomStr(n=10):
return ''.join( random.choice(string.lowercase + string.digits) for i in range(n))
adherent_ldif = {
'prenom' : ['toto'],
'nom' : ['passoir'],
'chbre' : ['EXT'],
'tel' : ['0000000000'],
'mail' : ['nobody@test.org'],
'etudes' : ['ENS', '1', 'A0'],
'prenom' : [u'Totoé'],
'nom' : [u'passoir'],
'chbre' : [u'EXT'],
'tel' : [u'0000000000'],
'mail' : [u'nobody@test.org'],
'etudes' : [u'ÉNS', u'1', u'A0'],
}
machine_ldif = {
'macAddress' : [randomMAC()],
'host' : ["autotest-%s.crans.org" % randomStr() ]
'host' : [u"autotest-%s.crans.org" % randomStr() ]
}
club_ldif = {
'nom' : [ 'autotest-club' ],
'chbre' : [ 'EXT' ],
'nom' : [ u'autotest-club' ],
'chbre' : [ u'EXT' ],
}
facture_ldif = {}
@ -109,7 +109,7 @@ def tests_machines(parent_dn, realm_list, ipsec=False):
anim("Creation de machines %s" % realm)
try:
machine = conn.newMachine(parent_dn, realm, machine_ldif)
if ipsec: machine['ipsec'] = 'auto'
if ipsec: machine['ipsec'] = u'auto'
machine.create()
except Exception as error:
print ERREUR
@ -127,11 +127,9 @@ def tests_machines(parent_dn, realm_list, ipsec=False):
else: print "\t%r" % error
else: print OK
print "Test de la librairie lc_ldap"
print "Connection"
print "Connexion"
conn = shortcuts.lc_ldap_test()
print u"Tests effectués avec les droits %s " % ', '.join(conn.droits)
@ -211,7 +209,7 @@ tests_machines(variables.base_dn, ["bornes"])
anim("Creation d'un club")
try:
club = conn.newClub(club_ldif)
club['responsable'] = str(adherent['aid'][0])
club['responsable'] = unicode(adherent['aid'][0])
club.create()
except Exception:
print ERREUR

View file

@ -3,20 +3,24 @@
""" Définitions de variables utiles pour lc_ldap. """
##: Encodage qu'on utilise pour parler à l'utilisateur si on n'a pas réussi à le détecter
#fallback_encoding = 'utf-8'
#: Encodage de la base LDAP
ldap_encoding = "utf-8"
#: uri par défaut de la base LDAP
uri = 'ldap://ldap.adm.crans.org/'
uri = "ldap://ldap.adm.crans.org/"
#: dn racine de l'endroit où sont stockées les données
base_dn = 'ou=data,dc=crans,dc=org'
base_dn = "ou=data,dc=crans,dc=org"
#: dn racine de l'endroit où sont stockés les logs
log_dn = "cn=log"
#: dn racine de l'endroit où sont stockés les services à redémarrer
services_dn = 'ou=services,dc=crans,dc=org'
services_dn = "ou=services,dc=crans,dc=org"
#: dn pour se binder en root
admin_dn = "cn=admin,dc=crans,dc=org"
#: dn pour se binder en readonly
readonly_dn = "cn=readonly,dc=crans,dc=org"
#: dn racine de l'endroit où sont stockés les invités (artefact garbage ?)
invite_dn = 'ou=invites,ou=data,dc=crans,dc=org'
invite_dn = "ou=invites,ou=data,dc=crans,dc=org"
# Protection contre les typos