[objets, attributs] Vérifications de contraintes liées au certificats d'un machine

This commit is contained in:
Valentin Samir 2014-02-23 15:29:26 +01:00
parent afb436f706
commit 626851baca
2 changed files with 148 additions and 7 deletions

View file

@ -1444,6 +1444,7 @@ class issuerCN(Attr):
@crans_attribute
class serialNumber(Attr):
ldap_name = "serialNumber"
python_type = int
@crans_attribute
class start(intAttr):
@ -1469,8 +1470,78 @@ class certificat(Attr):
ldap_name = "certificat"
binary = True
python_type = str
def __init__(self, *args, **kwargs):
super(certificat, self).__init__(*args, **kwargs)
self.data = None
def _decode_subjectAltName(self, data):
from pyasn1.codec.der import decoder
from pyasn1_modules.rfc2459 import SubjectAltName
altName = []
sa_names = decoder.decode(data, asn1Spec=SubjectAltName())[0]
for name in sa_names:
name_type = name.getName()
if name_type == 'dNSName':
altName.append(unicode(name.getComponent()))
else:
raise ValueError("Seulement les dNSName sont supporté pour l'extension de certificat SubjectAltName")
return altName
def _format_cert(self, certificat):
import OpenSSL
if certificat.startswith('-----BEGIN CERTIFICATE-----'):
certificat = ssl.PEM_cert_to_DER_cert(certificat)
try:
x509 = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_ASN1, certificat)
except:
certificat = base64.b64decode(certificat)
x509 = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_ASN1, certificat)
data = {}
data['subject'] = dict(x509.get_subject().get_components())
data['issuer'] = dict(x509.get_issuer().get_components())
data['start'] = int(time.mktime(time.strptime(x509.get_notBefore(), '%Y%m%d%H%M%SZ')))
data['end'] = int(time.mktime(time.strptime(x509.get_notAfter(), '%Y%m%d%H%M%SZ')))
data['serialNumber'] = unicode(int(x509.get_serial_number()))
data['extensions'] = {}
for i in range(0, x509.get_extension_count()):
ext = x509.get_extension(i)
ext_name = ext.get_short_name()
if ext_name == 'subjectAltName':
data['extensions'][ext_name] = self._decode_subjectAltName(ext.get_data())
else:
data['extensions'][ext_name] = str(ext)
return (certificat, data)
def parse_value(self, certificat):
if self.parent.mode in ['w', 'rw']:
(certificat, data) = self._format_cert(certificat)
self.data = data
if not 'x509Cert' in self.parent['objectClass']:
self.parent.x509(unicode(data['issuer']['CN']), data['start'], data['end'], data['serialNumber'])
else:
self.parent['issuerCN'] = unicode(data['issuer']['CN'])
self.parent['start'] = data['start']
self.parent['end'] = data['end']
self.parent['serialNumber'] = data['serialNumber']
for hostname in [data['subject']['CN']] + data['extensions'].get('subjectAltName',[]):
if hostname not in self.parent['hostCert']:
self.parent['hostCert'].append(hostname)
self.value = certificat
def __getitem__(self, key):
if not self.data:
self.data=self._format_cert(self.value)[1]
return self.data[key]
def pem(self):
return ssl.DER_cert_to_PEM_cert(self.value)
def __unicode__(self):
return unicode(ssl.DER_cert_to_PEM_cert(self.value))
return unicode(base64.b64encode(self.value))
def __str__(self):
return self.value

View file

@ -447,6 +447,13 @@ class CransLdapObject(object):
"""Est-ce que notre objet a l'attribut en question ?"""
return attr in [attrib.__name__ for attrib in self.attribs]
def _check_setitem(self, attr, values):
"""
Vérifie des contraintes non liées à LDAP lors d'un __setitem__,
lève une exception si elles ne sont pas vérifiées
"""
pass
def __setitem__(self, attr, values):
"""Permet d'affecter des valeurs à l'objet comme
s'il était un dictionnaire."""
@ -464,6 +471,7 @@ class CransLdapObject(object):
# (on peut pas utiliser self._modifs, car il ne faut
# faire le changement que si on peut)
attrs_before_verif = [ attributs.attrify(val, attr, self.conn, Parent=self) for val in values ]
self._check_setitem(attr, attrs_before_verif)
if attr in self.attrs.keys():
for attribut in attrs_before_verif:
attribut.check_uniqueness([unicode(content) for content in self.attrs[attr]])
@ -701,9 +709,7 @@ class proprio(CransLdapObject):
raise EnvironmentError("Vous n'avez pas le droit de supprimer %s." % self.dn)
for machine in self.machines():
machine.delete(comm, login)
self.bury(comm, login)
self.conn.delete_s(self.dn)
services.services_to_restart(self.conn, self.attrs, {}, deleted_object=[self])
super(proprio, self).delete(comm, login)
class machine(CransLdapObject):
u""" Une machine """
@ -727,6 +733,20 @@ class machine(CransLdapObject):
self._proprio = None
self._certificats = None
def _check_setitem(self, attr, values):
"""
Vérifie des contraintes non liées à LDAP lors d'un __setitem__,
lève une exception si elles ne sont pas vérifiées.
Ici on vérifie qu'on ne retire pas un host à une machine tant
qu'un de ses certificats l'utilise.
"""
if attr in ['host', 'hostAlias']:
deleted = [ value for value in self[attr] if value not in values ]
for domain in deleted:
for certificat in self.certificats():
if domain in certificat['hostCert']:
raise EnvironmentError("Vous devez d'abord supprimer ou éditer les certificats utilisant le nom de domaine %s avant de le retirer de la machine" % domain)
def proprio(self, mode=None):
u"""Renvoie le propriétaire de la machine"""
parent_dn = self.dn.split(',', 1)[1]
@ -854,6 +874,18 @@ class machine(CransLdapObject):
ip6 = sbm['ip6HostNumber'][1]
self['ip6HostNumber'] = ip6
def delete(self, comm="", login=None):
"""Supprimme l'objet de la base LDAP. En supprimant ses enfants d'abord."""
if login is None:
login = self.conn.current_login
if self.mode not in ['w', 'rw']:
raise EnvironmentError("Objet en lecture seule, réessayer en lecture/écriture")
if not self.may_be(variables.deleted, self.conn.droits):
raise EnvironmentError("Vous n'avez pas le droit de supprimer %s." % self.dn)
for certificat in self.certificats():
certificat.delete(comm, login)
super(machine, self).delete(comm, login)
class AssociationCrans(proprio):
""" Association crans (propriétaire particulier)."""
def save(self):
@ -1105,17 +1137,33 @@ class baseCert(CransLdapObject):
ldap_name = "baseCert"
protected_issuer = [u'CAcert Class 3 Root', u'CA Cert Signing Authority']
_machine = None
def __repr__(self):
return str(self.__class__) + " : xid=" + str(self['xid'][0])
def __init__(self, conn, dn, mode='ro', ldif=None):
super(baseCert, self).__init__(conn, dn, mode, ldif)
super(baseCert, self).__init__(conn, dn, 'ro', ldif)
if "TLSACert" in self['objectClass']:
self.attribs.extend(self.tlsa_attribs)
if 'x509Cert' in self['objectClass']:
self.attribs.extend(self.x509_attribs)
super(baseCert, self).__init__(conn, dn, mode, ldif)
def _check_setitem(self, attr, values):
"""
Vérifie des contraintes non liées à LDAP lors d'un __setitem__,
lève une exception si elles ne sont pas vérifiées.
Ici on vérifie d'on ne retire pas un host du certificat
s'il est réèlement présent dans les données du certificat.
"""
if attr in ['hostCert']:
deleted = [ value for value in self[attr] if value not in values ]
for domain in deleted:
if domain in [self['certificat'][0]['subject']['CN']] + self['certificat'][0]['extensions'].get('subjectAltName',[]):
raise EnvironmentError("Vous ne pouvez pas retirer le domaine %s alors qu'il est déclaré dans le certificat" % domain)
def tlsa(self, certificatUsage, matchingType):
if not self.mode in ['w', 'rw']:
@ -1128,8 +1176,20 @@ class baseCert(CransLdapObject):
self['matchingType']=matchingType
self['selector']=0
def x509(issuerCN, start, end, serialNumber, crlUrl=None):
pass
def x509(self, issuerCN, start, end, serialNumber, crlUrl=None):
if not self.mode in ['w', 'rw']:
return
if u"x509Cert" in self['objectClass']:
return
self._modifs['objectClass'].append(u"x509Cert")
self.attribs.extend(self.x509_attribs)
self['issuerCN'] = issuerCN
self['start'] = start
self['end'] = end
self['serialNumber'] = serialNumber
if crlUrl:
self['crlUrl'] = crlUrl
def machine(self):
u"""Renvoie la machine du certificat"""
@ -1139,6 +1199,16 @@ class baseCert(CransLdapObject):
return self._machine
def delete(self, comm="", login=None):
"""Supprimme l'objet de la base LDAP."""
if u"x509Cert" in self['objectClass']:
if not self['revocked'] or not self['revocked'][0]:
for issuer in self['issuerCN']:
if issuer in self.protected_issuer:
raise EnvironmentError("Vous n'avez pas le droit de supprimer %s tant que le certificat n'aura pas été marqué comme révoqué" % self.dn)
super(baseCert, self).delete(comm, login)
@crans_object
class service(CransLdapObject):
ldap_name = "service"