diff --git a/attributs.py b/attributs.py index a98d7fa..963e1e7 100644 --- a/attributs.py +++ b/attributs.py @@ -1444,6 +1444,7 @@ class issuerCN(Attr): @crans_attribute class serialNumber(Attr): ldap_name = "serialNumber" + python_type = int @crans_attribute class start(intAttr): @@ -1469,8 +1470,78 @@ class certificat(Attr): ldap_name = "certificat" binary = True python_type = str + + def __init__(self, *args, **kwargs): + super(certificat, self).__init__(*args, **kwargs) + self.data = None + + def _decode_subjectAltName(self, data): + from pyasn1.codec.der import decoder + from pyasn1_modules.rfc2459 import SubjectAltName + altName = [] + sa_names = decoder.decode(data, asn1Spec=SubjectAltName())[0] + for name in sa_names: + name_type = name.getName() + if name_type == 'dNSName': + altName.append(unicode(name.getComponent())) + else: + raise ValueError("Seulement les dNSName sont supporté pour l'extension de certificat SubjectAltName") + return altName + + def _format_cert(self, certificat): + import OpenSSL + if certificat.startswith('-----BEGIN CERTIFICATE-----'): + certificat = ssl.PEM_cert_to_DER_cert(certificat) + try: + x509 = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_ASN1, certificat) + except: + certificat = base64.b64decode(certificat) + x509 = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_ASN1, certificat) + data = {} + data['subject'] = dict(x509.get_subject().get_components()) + data['issuer'] = dict(x509.get_issuer().get_components()) + data['start'] = int(time.mktime(time.strptime(x509.get_notBefore(), '%Y%m%d%H%M%SZ'))) + data['end'] = int(time.mktime(time.strptime(x509.get_notAfter(), '%Y%m%d%H%M%SZ'))) + data['serialNumber'] = unicode(int(x509.get_serial_number())) + data['extensions'] = {} + for i in range(0, x509.get_extension_count()): + ext = x509.get_extension(i) + ext_name = ext.get_short_name() + if ext_name == 'subjectAltName': + data['extensions'][ext_name] = self._decode_subjectAltName(ext.get_data()) + else: + data['extensions'][ext_name] = str(ext) + return (certificat, data) + + def parse_value(self, certificat): + if self.parent.mode in ['w', 'rw']: + (certificat, data) = self._format_cert(certificat) + self.data = data + + if not 'x509Cert' in self.parent['objectClass']: + self.parent.x509(unicode(data['issuer']['CN']), data['start'], data['end'], data['serialNumber']) + else: + self.parent['issuerCN'] = unicode(data['issuer']['CN']) + self.parent['start'] = data['start'] + self.parent['end'] = data['end'] + self.parent['serialNumber'] = data['serialNumber'] + + for hostname in [data['subject']['CN']] + data['extensions'].get('subjectAltName',[]): + if hostname not in self.parent['hostCert']: + self.parent['hostCert'].append(hostname) + + self.value = certificat + + def __getitem__(self, key): + if not self.data: + self.data=self._format_cert(self.value)[1] + return self.data[key] + + def pem(self): + return ssl.DER_cert_to_PEM_cert(self.value) + def __unicode__(self): - return unicode(ssl.DER_cert_to_PEM_cert(self.value)) + return unicode(base64.b64encode(self.value)) def __str__(self): return self.value diff --git a/objets.py b/objets.py index 11b82d4..444c1e7 100644 --- a/objets.py +++ b/objets.py @@ -447,6 +447,13 @@ class CransLdapObject(object): """Est-ce que notre objet a l'attribut en question ?""" return attr in [attrib.__name__ for attrib in self.attribs] + def _check_setitem(self, attr, values): + """ + Vérifie des contraintes non liées à LDAP lors d'un __setitem__, + lève une exception si elles ne sont pas vérifiées + """ + pass + def __setitem__(self, attr, values): """Permet d'affecter des valeurs à l'objet comme s'il était un dictionnaire.""" @@ -464,6 +471,7 @@ class CransLdapObject(object): # (on peut pas utiliser self._modifs, car il ne faut # faire le changement que si on peut) attrs_before_verif = [ attributs.attrify(val, attr, self.conn, Parent=self) for val in values ] + self._check_setitem(attr, attrs_before_verif) if attr in self.attrs.keys(): for attribut in attrs_before_verif: attribut.check_uniqueness([unicode(content) for content in self.attrs[attr]]) @@ -701,9 +709,7 @@ class proprio(CransLdapObject): raise EnvironmentError("Vous n'avez pas le droit de supprimer %s." % self.dn) for machine in self.machines(): machine.delete(comm, login) - self.bury(comm, login) - self.conn.delete_s(self.dn) - services.services_to_restart(self.conn, self.attrs, {}, deleted_object=[self]) + super(proprio, self).delete(comm, login) class machine(CransLdapObject): u""" Une machine """ @@ -727,6 +733,20 @@ class machine(CransLdapObject): self._proprio = None self._certificats = None + def _check_setitem(self, attr, values): + """ + Vérifie des contraintes non liées à LDAP lors d'un __setitem__, + lève une exception si elles ne sont pas vérifiées. + Ici on vérifie qu'on ne retire pas un host à une machine tant + qu'un de ses certificats l'utilise. + """ + if attr in ['host', 'hostAlias']: + deleted = [ value for value in self[attr] if value not in values ] + for domain in deleted: + for certificat in self.certificats(): + if domain in certificat['hostCert']: + raise EnvironmentError("Vous devez d'abord supprimer ou éditer les certificats utilisant le nom de domaine %s avant de le retirer de la machine" % domain) + def proprio(self, mode=None): u"""Renvoie le propriétaire de la machine""" parent_dn = self.dn.split(',', 1)[1] @@ -854,6 +874,18 @@ class machine(CransLdapObject): ip6 = sbm['ip6HostNumber'][1] self['ip6HostNumber'] = ip6 + def delete(self, comm="", login=None): + """Supprimme l'objet de la base LDAP. En supprimant ses enfants d'abord.""" + if login is None: + login = self.conn.current_login + if self.mode not in ['w', 'rw']: + raise EnvironmentError("Objet en lecture seule, réessayer en lecture/écriture") + if not self.may_be(variables.deleted, self.conn.droits): + raise EnvironmentError("Vous n'avez pas le droit de supprimer %s." % self.dn) + for certificat in self.certificats(): + certificat.delete(comm, login) + super(machine, self).delete(comm, login) + class AssociationCrans(proprio): """ Association crans (propriétaire particulier).""" def save(self): @@ -1105,17 +1137,33 @@ class baseCert(CransLdapObject): ldap_name = "baseCert" + protected_issuer = [u'CAcert Class 3 Root', u'CA Cert Signing Authority'] + _machine = None def __repr__(self): return str(self.__class__) + " : xid=" + str(self['xid'][0]) def __init__(self, conn, dn, mode='ro', ldif=None): - super(baseCert, self).__init__(conn, dn, mode, ldif) + super(baseCert, self).__init__(conn, dn, 'ro', ldif) if "TLSACert" in self['objectClass']: self.attribs.extend(self.tlsa_attribs) if 'x509Cert' in self['objectClass']: self.attribs.extend(self.x509_attribs) + super(baseCert, self).__init__(conn, dn, mode, ldif) + + def _check_setitem(self, attr, values): + """ + Vérifie des contraintes non liées à LDAP lors d'un __setitem__, + lève une exception si elles ne sont pas vérifiées. + Ici on vérifie d'on ne retire pas un host du certificat + s'il est réèlement présent dans les données du certificat. + """ + if attr in ['hostCert']: + deleted = [ value for value in self[attr] if value not in values ] + for domain in deleted: + if domain in [self['certificat'][0]['subject']['CN']] + self['certificat'][0]['extensions'].get('subjectAltName',[]): + raise EnvironmentError("Vous ne pouvez pas retirer le domaine %s alors qu'il est déclaré dans le certificat" % domain) def tlsa(self, certificatUsage, matchingType): if not self.mode in ['w', 'rw']: @@ -1128,8 +1176,20 @@ class baseCert(CransLdapObject): self['matchingType']=matchingType self['selector']=0 - def x509(issuerCN, start, end, serialNumber, crlUrl=None): - pass + def x509(self, issuerCN, start, end, serialNumber, crlUrl=None): + if not self.mode in ['w', 'rw']: + return + if u"x509Cert" in self['objectClass']: + return + self._modifs['objectClass'].append(u"x509Cert") + self.attribs.extend(self.x509_attribs) + self['issuerCN'] = issuerCN + self['start'] = start + self['end'] = end + self['serialNumber'] = serialNumber + if crlUrl: + self['crlUrl'] = crlUrl + def machine(self): u"""Renvoie la machine du certificat""" @@ -1139,6 +1199,16 @@ class baseCert(CransLdapObject): return self._machine + def delete(self, comm="", login=None): + """Supprimme l'objet de la base LDAP.""" + if u"x509Cert" in self['objectClass']: + if not self['revocked'] or not self['revocked'][0]: + for issuer in self['issuerCN']: + if issuer in self.protected_issuer: + raise EnvironmentError("Vous n'avez pas le droit de supprimer %s tant que le certificat n'aura pas été marqué comme révoqué" % self.dn) + super(baseCert, self).delete(comm, login) + + @crans_object class service(CransLdapObject): ldap_name = "service"