lc_ldap/attributs.py

678 lines
20 KiB
Python

#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# ATTRIBUTS.PY-- Description des attributs ldap
#
# Copyright (C) 2010 Cr@ns <roots@crans.org>
# Author: Antoine Durand-Gasselin <adg@crans.org>
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
# * Neither the name of the Cr@ns nor the names of its contributors may
# be used to endorse or promote products derived from this software
# without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL <COPYRIGHT
# HOLDER> BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
# PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
# LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
# NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
import re, sys, netaddr, time
from unicodedata import normalize
from crans_utils import format_tel, format_mac, mailexist, validate_name
sys.path.append("/usr/scripts/gestion")
import config, annuaires_pg
from midtools import Mid
def attrify(val, attr, ldif, conn, ctxt_check = True):
"""Transforme un n'importe quoi en Attr.
Doit effectuer les normalisations et sanity check si un str ou un
unicode est passé en argument.
Devrait insulter en cas de potentiel problème d'encodage.
Doit effectuer les vérifications de contexte dans le *ldif* si
ctxt_check est vrai"""
if isinstance(val, Attr):
return val
else:
return CRANS_ATTRIBUTES.get(attr, Attr)(val, ldif, conn, ctxt_check)
class Attr(object):
legend = "Human-readable description of attribute"
singlevalue = None
optional = None
conn = None
can_modify = ['Nounou']
"""La liste des droits qui suffisent à avoir le droit de modifier la valeur"""
def __init__(self, val, ldif, conn, ctxt_check):
"""Crée un nouvel objet représentant un attribut.
val: valeur de l'attribut
ldif: objet contenant l'attribut (permet de faire les validations sur l'environnement)
ctxt_check: effectue les validations
"""
self.ctxt_check=ctxt_check
self.value = None
self.conn = conn
assert isinstance(val, unicode)
self.parse_value(val, ldif)
if ctxt_check:
self.validate(ldif)
def parse_value(self, val, ldif):
"""Transforme l'attribut pour travailler avec notre validateur"""
self.value = val
def __str__(self):
return self.__unicode__().encode('utf-8')
def __unicode__(self):
# XXX - Vérifier que cette méthode produit un objet parsable
assert isinstance(self.value, unicode)
return self.value
def validate(self, ldif):
"""validates:
vérifie déjà que ce qu'on a rentré est parsable"""
own_values = ldif[self.__class__.__name__]
self._check_cardinality(own_values)
self._check_uniqueness()
self._check_users_restrictions(own_values)
def _check_cardinality(self, values):
"""Vérifie qu'il y a un nombre correct de valeur =1, <=1, {0,1},
etc..."""
if self.singlevalue and len(values) > 1:
raise ValueError('%s doit avoir au maximum une valeur (affecte %s)' % (self.__class__, values))
if not self.optional and len(values) == 0:
raise ValueError('%s doit avoir au moins une valeur' % self.__class__)
def _check_uniqueness(self):
"""Vérifie l'unicité dans la base de la valeur (mailAlias, chbre,
etc...)"""
attr = self.__class__.__name__
if attr in ["mid", "uid", "cid", "fid", "aid", "ipHostNumber", "uidNumber"]:
res = self.conn.search('%s=%s' % (attr, str(self)))
if res:
raise ValueError("%s déjà existant" % attr, [r.dn for r in res])
if attr in ["mailAlias", "canonicalAlias", 'mail', 'uid']:
res = self.conn.search('(|(mail=%s)(mailAlias=%s)(canonicalAlias=%s))' % ((str(self),)*3))
if res:
raise ValueError("Mail déjà existant", [r.dn for r in res])
if attr in ["host", "hostAlias"]:
res = self.conn.search('(|(host=%s)(hostAlias=%s))' % ((str(self),)*2))
if res:
raise ValueError("Hôte déjà existant", [r.dn for r in res])
def _check_users_restrictions(self, values):
"""Vérifie les restrictions supplémentaires imposées selon les
niveaux de droits (<= 3 mailAlias, pas de mac identiques,
etc...)"""
### On l'implémente dans les classes filles !
pass
class objectClass(Attr):
singlevalue = False
optional = False
legend = "entité"
def parse_value(self, val, ldif):
if val not in [ 'top', 'posixAccount', 'shadowAccount', 'proprio',
'adherent', 'club', 'machine', 'machineCrans',
'borneWifi', 'machineWifi', 'machineFixe',
'cransAccount', 'service', 'facture', 'freeMid' ]:
print(val)
raise ValueError("Pourquoi insérer un objectClass=%s ?" % val)
else:
self.value = unicode(val)
class intAttr(Attr):
def parse_value(self, val, ldif):
if int(val) <= 0:
raise ValueError("Valeur entière invalide : %s" % val)
self.value = int(val)
def __unicode__(self):
return unicode(self.value)
class aid(intAttr):
singlevalue = True
optional = True
legend = u"Identifiant de l'adhérent"
class uid(Attr):
singlevalue = True
option = False
legend = u"L'identifiant canonique de l'adhérent"
class nom(Attr):
singlevalue = True
optional = False
legend = "Nom"
def parse_value(self, val, ldif):
if u'club' in ldif['objectClass']:
self.value = validate_name(val,"0123456789\[\]")
else:
self.value = validate_name(val)
class prenom(Attr):
singlevalue = True
optional = False
legend = u"Prénom"
def parse_value(self, val, ldif):
self.value = validate_name(val)
class tel(Attr):
singlevalue = True
optional = False
legend = u"Téléphone"
can_modify = ["self", "Nounou", "Cableur"]
def parse_value(self, val, ldif):
self.value = format_tel(val)
if len(self.value) == 0:
raise ValueError("Numéro de téléphone invalide ('%s')" % val)
class yearAttr(intAttr):
singlevalue = False
optional= True
def parse_value(self, val, ldif):
if int(val) < 1998 or int(val) > config.ann_scol:
raise ValueError("Annee invalide (%s)" % val)
self.value = int(val)
class paiement(yearAttr):
legend = u"Paiement"
can_modify = ["Cableur", "Nounou", "tresorier"]
class carteEtudiant(yearAttr):
legend = u"Carte d'étudiant"
class mailAlias(Attr):
singlevalue = False
optional = True
legend = u"Alias mail"
can_modify = ["self", "Cableur", "Nounou"]
def parse_value(self, val, ldif):
val = val.lower()
if not re.match('[a-z][-_.0-9a-z]+', val):
raise ValueError("Alias mail invalide (%s)" % val)
self.value = val
class canonicalAlias(Attr):
singlevalue = True
optional = False
legend = u"Alias mail canonique"
def parse_value(self, val, ldif):
val = u".".join([ a.capitalize() for a in val.split(u'.', 1) ])
if not re.match('[A-Za-z][-_.0-9A-Za-z]+', val):
raise ValueError("Alias mail invalide (%s)" % val)
self.value = val
class etudes(Attr):
singlevalue = False
optional = False
legend = u"Études"
can_modify = ["self", "Cableur", "Nounou"]
def parse_value(self, val, ldif):
# who cares
self.value = val
class chbre(Attr):
singlevalue = True
optional = False
legend = u"Chambre sur le campus"
can_modify = ["self", "Cableur", "Nounou"]
def parse_value(self, val, ldif):
if self.ctxt_check: # Si ce n'est pas la peine de vérifier, on ne vérifie pas
if u'club' in ldif['objectClass']:
if val in annuaires_pg.locaux_clubs():
self.value = val
return
else:
raise ValueError("Club devrait etre en XclN, pas en %s" % val)
if val in (u"EXT", u"????"):
self.value = val
return
try:
annuaires_pg.chbre_prises(val[0], val[1:])
except NameError:
import annuaires_pg_test
annuaires_pg_test.chbre_prises(val[0], val[1:])
self.value = val
class droits(Attr):
singlevalue = False
optional = True
legend = u"Droits sur les serveurs"
can_modify = ["Nounou", "Bureau"]
def parse_value(self, val, ldif):
if val.lower() not in ['apprenti', 'nounou', 'cableur', 'tresorier', 'bureau',
'webmaster', 'webradio', 'imprimeur', 'multimachines', 'victime', 'moderateur', 'nounours']:
raise ValueError("Ces droits n'existent pas ('%s')" % val)
if val.lower() == 'webmaster':
self.value = u'WebMaster'
else:
self.value = val.capitalize()
class solde(Attr):
singlevalue = True
optional = True
legend = u"Solde d'impression"
can_modify = ["imprimeur", "Nounou", "Tresorier"]
def parse_value(self, solde, ldif):
# on évite les dépassements, sauf si on nous dit de ne pas vérifier
if self.ctxt_check and not (float(solde) >= config.impression.decouvert and float(solde) <= 1024.):
raise ValueError("Solde invalide: %s" % solde)
self.value = solde
class dnsAttr(Attr):
def parse_value(self, dns, ldif):
dns = dns.lower()
name, net = dns.split('.', 1)
if self.ctxt_check and (net not in ['crans.org', 'wifi.crans.org'] or
not re.match('[a-z][-_a-z0-9]+', name)):
raise ValueError("Nom d'hote invalide '%s'" % dns)
self.value = dns
class host(dnsAttr):
singlevalue = True
optional = False
hname = legend = u"Nom d'hôte"
can_modify = ["parent", "Nounou", "Cableur"]
class macAddress(Attr):
singlevalue = True
optional = False
legend = u"Adresse physique de la carte réseau"
hname = "Adresse MAC"
can_modify = ["parent", "Nounou", "Cableur"]
def parse_value(self, mac, ldif):
self.value = format_mac(mac)
def __unicode__(self):
return unicode(self.value).lower()
class ipHostNumber(Attr):
singlevalue = True
optional = False
legend = u"Adresse IPv4 de la machine"
hname = "IPv4"
def parse_value(self, ip, ldif):
if ip == '<automatique>':
ip = Mid(mid= ldif['mid'][0]).ipv4()
self.value = netaddr.ip.IPAddress(ip)
def __unicode__(self):
return unicode(self.value)
class mid(Attr):
singlevalue = True
optional = False
legend = "Identifiant de machine"
def parse_value(self, mid, ldif):
self.value = Mid(mid = int(mid))
def __unicode__(self):
return unicode(int(self.value))
class hostAlias(dnsAttr):
singlevalue = False
optional = True
legend = u'Alias de nom de machine'
class ipsec(Attr):
singlevalue = False
optional = True
legend = u'Clef wifi'
class puissance(Attr):
singlevalue = True
optional = True
legend = u"puissance d'émission pour les bornes wifi"
class canal(intAttr):
singlevalue = True
optional = True
legend = u'Canal d\'émission de la borne'
class portAttr(Attr):
singlevalue = False
optional = True
def parse_value(self, port, ldif):
if ":" in port:
a,b = port.split(":", 1)
if a:
if int(a) <0 or int(a)> 65535:
raise ValueError("Port invalide: %s" % port)
else:
a = 0
if b:
if int(a) <0 or int(a)> 65535:
raise ValueError("Port invalide: %s" % port)
else:
b = 65535
self.value = [int(a), int(b)]
else:
if int(port) <0 or int(port)> 65535:
raise ValueError("Port invalide: %s" % port)
self.value = [int(port)]
def __unicode__(self):
if len(self.value) > 1:
a, b = self.value
a = '' if a == 0 else str(a)
b = '' if b == 65535 else str(b)
return unicode('%s:%s' % (a, b))
else:
return unicode(self.value[0])
class portTCPout(portAttr):
legend = u'Port TCP ouvert vers l\'extérieur'
class portTCPin(portAttr):
legend = u"Port TCP ouvert depuis l'extérieur"
class portUDPout(portAttr):
legend = u"Port UDP ouvert vers l'extérieur"
class portUDPin(portAttr):
legend = u"Port UDP ouvert depuis l'extérieur"
class prise(Attr):
singlevalue = True
optional = True
legend = u"Prise sur laquelle est branchée la machine"
def parse_value(self, prise, ldif):
### Tu es une Nounou, je te fais confiance
self.value = prise
class cid(intAttr):
singlevalue = True
optional = True
legend = u"Identifiant du club"
class responsable(Attr):
singlevalue = True
optional = True
legend = u"Responsable du club"
def get_respo(self):
if self.value == None:
self.value=self.conn.search('aid=%s' % self.__resp)[0]
return self.value
def parse_value(self, resp, ldif):
self.__resp=resp
self.value = property(self.get_respo)
def __unicode__(self):
return self.value.attrs['aid'][0].__unicode__()
class blacklist(Attr):
singlevalue = False
optional = True
legend = u"Blackliste"
def parse_value(self, bl, ldif):
bl_debut, bl_fin, bl_type, bl_comm = bl.split('$')
now = time.time()
self.value = { 'debut' : bl_debut if bl_debut == '-' else int (bl_debut),
'fin' : bl_fin if bl_fin == '-' else int(bl_fin),
'type' : bl_type,
'comm' : bl_comm,
'actif' : (bl_debut == '-' or int(bl_debut) < now) and (bl_fin == '-' or int(bl_fin) > now) }
def is_actif(self):
return self.value['actif']
def terminer(self):
self.value['fin'] = int(max(self.value['debut'], time.time() - 60))
self.value['actif'] = False
def __unicode__(self):
return u'%(debut)s$%(fin)s$%(type)s$%(comm)s' % self.value
class historique(Attr):
singlevalue = False
optional = True
legend = u"Historique de l'objet"
class info(Attr):
singlevalue = False
optional = True
legend = u"Quelques informations"
can_modify = ["Nounou", "Cableur", "Bureau"]
class homepageAlias(Attr):
singlevalue = True
optional = True
legend = u'Un alias pour la page personnelle'
can_modify = ["WebMaster", "Nounou", "Cableur"]
class charteMA(Attr):
singlevalue = True
optional = True
legend= "Signale si l'adhérent a signé la charte de membres actifs"
can_modify = ["Nounou", "Bureau"]
def parse_value(self, signed, ldif):
if signed.upper() not in ["TRUE", "FALSE"]:
raise ValueError("La charte MA est soit TRUE ou FALSE, pas %s" % signed)
self.value = signed.upper()
class homeDirectory(Attr):
singlevalue=True
optional = True
legend="Le chemin du home de l'adhérent"
def parse_value(self, home, ldif):
uid = ldif['uid'][0]
if uid.startswith('club-'):
uid = uid.split('-',1)[1]
if home != u'/home/%s' % uid and home != u'/home/club/%s' % uid:
raise ValueError("Le répertoire personnel n'est pas bon: %s (devrait être %s ou %s)" %
(home, '/home/%s' % ldif['uid'][0], '/home/club/%s' % ldif['uid'][0]))
self.value = home
class loginShell(Attr):
singlevalue = True
optional = True
legend = "Le shell de l'adherent"
can_modify = ["self", "Nounou", "Cableur"]
def parse_value(self, shell, ldif):
#with open('/etc/shells') as f:
# shells = [ l.strip() for l in f.readlines() if not l.startswith('#') ]
shells=['/bin/csh',
'/bin/sh',
'/usr/bin/es',
'/usr/bin/ksh',
'/bin/ksh',
'/usr/bin/rc',
'/usr/bin/tcsh',
'/bin/tcsh',
'/usr/bin/esh',
'/bin/bash',
'/bin/rbash',
'/bin/zsh',
'/usr/bin/zsh',
'/usr/bin/screen',
'/bin/dash',
'/usr/bin/rssh',
'/usr/local/bin/disconnect_shell',
'/usr/scripts/surveillance/disconnect_shell',
'/usr/local/bin/badPassSh',
'/usr/bin/passwd',
'/bin/false',
'/bin//zsh'
'']
if self.ctxt_check and (shell not in shells):
raise ValueError("Shell %s invalide" % shell)
self.value = shell
class uidNumber(intAttr):
singlevalue = True
optional = True
legend = "L'uid du compte de l'adherent"
class gidNumber(intAttr):
singlevalue = True
optional = True
legend = "Le gid du compte de l'adhérent"
class gecos(Attr):
singlevalue = True
optional = True
legend = "Le gecos"
def parse_value(self, gecos, ldif):
a, b, c, d = gecos.split(',')
self.value = gecos
class mail(Attr):
singlevalue = True
optional = False
legend = "Le mail de l'adhérent"
can_modify = ["self", "Nounou", "Cableur"]
# XXX - to be implemented
#def parse_value(self, mail, ldif):
# pass
class cn(Attr):
singlevalue = True
optional = False
class dn(Attr):
singlevalue = True
optional = False
class postalAddress(Attr):
singlevalue = False
optional = True
can_modify = ["self", "Cableur", "Nounou", "Bureau"]
class controle(Attr):
singlevalue = True
optional = False
can_modify = ["Tresorier", "Nounou"]
def parse_value(self, ctrl, ldif):
if ctrl not in [u"", u"c", u"p", u"cp", u"pc"]:
raise ValueError("control peut prendre les valeurs [c][p]")
if ctrl == u'pc':
self.value = u'cp'
else:
self.value = ctrl
CRANS_ATTRIBUTES= {
'objectClass' : objectClass,
'cn' : cn,
'dn' : dn,
'aid': aid,
'uid': uid,
'nom' : nom,
'prenom' : prenom,
'tel' : tel,
'paiement' : paiement,
'carteEtudiant' : carteEtudiant,
'mailAlias' : mailAlias,
'canonicalAlias' : canonicalAlias,
'etudes' : etudes,
'chbre' : chbre,
'droits' : droits,
'solde' : solde,
'mid' : mid,
'host' : host,
'macAddress': macAddress,
'ipHostNumber': ipHostNumber,
'hostAlias' : hostAlias,
'ipsec' : ipsec,
'puissance' : puissance,
'canal' : canal,
'portTCPout' : portTCPout,
'portTCPin' : portTCPin,
'portUDPout' : portUDPout,
'portUDPin' : portUDPin,
'prise' : prise,
'cid' : cid,
'responsable' : responsable,
'blacklist' : blacklist,
'historique' : historique,
'info': info,
'homepageAlias': homepageAlias,
'charteMA': charteMA,
'mail' : mail,
'postalAddress': postalAddress,
# {posix,shadow}Account
'homeDirectory': homeDirectory,
'loginShell': loginShell,
'uidNumber': uidNumber,
'gecos': gecos,
'gidNumber': gidNumber
}
ADHERENT_ATTRS = [ nom, prenom, tel, chbre, postalAddress, mail, uid,
canonicalAlias, mailAlias, etudes, paiement,
solde, carteEtudiant, droits, loginShell, blacklist ]
MACHINE_ATTRS = [ host, macAddress, hostAlias, ipHostNumber, portTCPout,
portTCPin, portUDPout, portUDPin ]