diff --git a/attributs.py b/attributs.py index de90aa7..4ffaa8b 100644 --- a/attributs.py +++ b/attributs.py @@ -47,7 +47,7 @@ import smtplib import random import string from unicodedata import normalize -from crans_utils import format_tel, format_mac, mailexist, validate_name, ip4_of_rid, ip6_of_mac +from crans_utils import format_tel, format_mac, mailexist, validate_name, ip4_of_rid, ip6_of_mac, fetch_cert_info import itertools sys.path.append("/usr/scripts") @@ -397,7 +397,7 @@ class objectClass(Attr): if val not in [ 'top', 'organizationalUnit', 'posixAccount', 'shadowAccount', 'proprio', 'adherent', 'club', 'machine', 'machineCrans', 'borneWifi', 'machineWifi', 'machineFixe', 'x509Cert', 'TLSACert', - 'baseCert', 'cransAccount', 'service', 'facture', 'freeMid' ]: + 'baseCert', 'cransAccount', 'service', 'facture', 'freeMid', 'privateKey' ]: raise ValueError("Pourquoi insérer un objectClass=%r ?" % val) else: self.value = unicode(val) @@ -1555,6 +1555,49 @@ class revocked(boolAttr): can_modify = [nounou] legend = "Détermine si le certificat est révoqué" +@crans_attribute +class encrypted(boolAttr): + ldap_name = "encrypted" + singlevalue = True + optional = True + can_modify = [nounou, parent] + legend = "Détermine si une clef privée est chiffrée" + +@crans_attribute +class privatekey(Attr): + ldap_name = "privatekey" + python_type = str + can_modify = [parent, nounou] + legend = "Clef privée" + +@crans_attribute +class csr(Attr): + ldap_name = "csr" + python_type = str + can_modify = [parent, nounou] + legend = "requête de signature de certificat" + + def _format_cert(self, csr): + import OpenSSL + try: + x509 = OpenSSL.crypto.load_certificate_request(OpenSSL.crypto.FILETYPE_PEM, csr) + except: + print csr + raise + data = fetch_cert_info(x509) + return data + + def parse_value(self, csr): + if self.parent.mode in ['w', 'rw']: + data = self._format_cert(csr) + self.data = data + + for hostname in [data['subject']['CN']] + data['extensions'].get('subjectAltName',[]): + if hostname not in self.parent['hostCert']: + self.parent['hostCert'].append(unicode(hostname)) + + self.value = csr + @crans_attribute class certificat(Attr): ldap_name = "certificat" @@ -1567,20 +1610,6 @@ class certificat(Attr): super(certificat, self).__init__(*args, **kwargs) self.data = None - def _decode_subjectAltName(self, data): - from pyasn1.codec.der import decoder - from pyasn1_modules.rfc2459 import SubjectAltName - altName = [] - sa_names = decoder.decode(data, asn1Spec=SubjectAltName())[0] - for name in sa_names: - name_type = name.getName() - if name_type == 'dNSName': - altName.append(unicode(name.getComponent())) -# Cacert met des othername, du coup, on ignore juste -# else: -# raise ValueError("Seulement les dNSName sont supporté pour l'extension de certificat SubjectAltName (et pas %s)" % name_type) - return altName - def _format_cert(self, certificat): import OpenSSL try: @@ -1593,20 +1622,7 @@ class certificat(Attr): x509 = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_ASN1, certificat) except Exception: raise ValueError("Format du certificat invalide, est-il bien au format DER ou PEM ?") - data = {} - data['subject'] = dict(x509.get_subject().get_components()) - data['issuer'] = dict(x509.get_issuer().get_components()) - data['start'] = int(time.mktime(time.strptime(x509.get_notBefore(), '%Y%m%d%H%M%SZ'))) - data['end'] = int(time.mktime(time.strptime(x509.get_notAfter(), '%Y%m%d%H%M%SZ'))) - data['serialNumber'] = unicode(int(x509.get_serial_number())) - data['extensions'] = {} - for i in range(0, x509.get_extension_count()): - ext = x509.get_extension(i) - ext_name = ext.get_short_name() - if ext_name == 'subjectAltName': - data['extensions'][ext_name] = self._decode_subjectAltName(ext.get_data()) - else: - data['extensions'][ext_name] = str(ext) + data = fetch_cert_info(x509) return (certificat, data) def parse_value(self, certificat): diff --git a/crans_utils.py b/crans_utils.py index d86bf07..7f1629c 100644 --- a/crans_utils.py +++ b/crans_utils.py @@ -38,6 +38,7 @@ import smtplib import sys import os import base64 +import collections import hashlib import ldap.filter sys.path.append('/usr/scripts/gestion') @@ -243,3 +244,51 @@ def hash_password(password, salt=None): raise ValueError("salt devrait faire au moins 8 octets") return '{SSHA}' + base64.b64encode(hashlib.sha1(password + salt).digest() + salt) + +def decode_subjectAltName(data): + from pyasn1.codec.der import decoder + from pyasn1_modules.rfc2459 import SubjectAltName + altName = [] + sa_names = decoder.decode(data, asn1Spec=SubjectAltName())[0] + for name in sa_names: + name_type = name.getName() + if name_type == 'dNSName': + altName.append(unicode(name.getComponent())) +# Cacert met des othername, du coup, on ignore juste +# else: +# raise ValueError("Seulement les dNSName sont supporté pour l'extension de certificat SubjectAltName (et pas %s)" % name_type) + return altName + + +def fetch_cert_info(x509): + # Attention, pour les X509req, la vertion de openssl utilisé et celle du fichier /usr/scripts/lib/python2.7/site-packages/pyOpenSSL-0.14-py2.7.egg + # /usr/scripts/python.sh met le path comme il faut, sinon, il faudrait faire quelque chose comme : + # for file in os.listdir('/usr/scripts/lib/python2.7/site-packages/'): + # if file.endswith(".egg"): + # sys.path.insert(2, '/usr/scripts/lib/python2.7/site-packages/%s' % file) + # sys.path.insert(2, '/usr/scripts/lib/python2.7/site-packages/') + import OpenSSL + data = {} + data['subject'] = dict(x509.get_subject().get_components()) + if isinstance(x509, OpenSSL.crypto.X509): + data['issuer'] = dict(x509.get_issuer().get_components()) + data['start'] = int(time.mktime(time.strptime(x509.get_notBefore(), '%Y%m%d%H%M%SZ'))) + data['end'] = int(time.mktime(time.strptime(x509.get_notAfter(), '%Y%m%d%H%M%SZ'))) + data['serialNumber'] = unicode(int(x509.get_serial_number())) + data['extensions'] = collections.defaultdict(list) + def do_ext(data, ext): + ext_name = ext.get_short_name() + if ext_name == 'subjectAltName': + data['extensions'][ext_name] = decode_subjectAltName(ext.get_data()) + elif ext_name == 'extendedKeyUsage': + data['extensions'][ext_name].append(str(ext)) + else: + data['extensions'][ext_name] = str(ext) + if isinstance(x509, OpenSSL.crypto.X509): + for i in range(0, x509.get_extension_count()): + ext = x509.get_extension(i) + do_ext(data, ext) + else: + for ext in x509.get_extensions(): + do_ext(data, ext) + return data diff --git a/objets.py b/objets.py index d14a922..daf9e36 100644 --- a/objets.py +++ b/objets.py @@ -453,9 +453,12 @@ class CransLdapObject(object): orig_ldif = self.attrs.to_ldif() for attr in binary: ldif['%s;binary' % attr]=ldif[attr] - orig_ldif['%s;binary' % attr]=orig_ldif[attr] + orig_ldif['%s;binary' % attr]=orig_ldif.get(attr, []) del(ldif[attr]) - del(orig_ldif[attr]) + try: + del(orig_ldif[attr]) + except KeyError: + pass return modifyModlist(orig_ldif, ldif) @@ -919,9 +922,9 @@ class machine(CransLdapObject): self._proprio = new_cransldapobject(self.conn, self.parent_dn, self.mode if mode is None else mode) return self._proprio - def certificats(self): + def certificats(self, refresh=False): """Renvoie la liste des certificats de la machine""" - if self._certificats is None: + if refresh or self._certificats is None: self._certificats = self.conn.search(u'xid=*', dn = self.dn, scope = 1, mode=self.mode) for m in self._certificats: m._machine = self @@ -1239,13 +1242,16 @@ class baseCert(CransLdapObject): variables.modified: [attributs.nounou, attributs.bureau, attributs.parent], variables.deleted: [attributs.nounou, attributs.bureau, attributs.parent], } - attribs = [ attributs.xid, attributs.certificat, attributs.hostCert, attributs.historique] + attribs = [ attributs.xid, attributs.certificat, attributs.hostCert, attributs.historique, + attributs.info, attributs.csr ] tlsa_attribs = [ attributs.certificatUsage, attributs.selector, attributs.matchingType, attributs.portTCPin, attributs.portUDPin] x509_attribs = [ attributs.issuerCN, attributs.start, attributs.end, attributs.crlUrl, attributs.revocked, attributs.serialNumber ] + private_attribs = [ attributs.privatekey, attributs.encrypted ] + ldap_name = "baseCert" protected_issuer = [u'CAcert Class 3 Root', u'CA Cert Signing Authority'] @@ -1261,6 +1267,8 @@ class baseCert(CransLdapObject): self.attribs.extend(self.tlsa_attribs) if 'x509Cert' in self['objectClass']: self.attribs.extend(self.x509_attribs) + if "privateKey" in self['objectClass']: + self.attribs.extend(self.private_attribs) super(baseCert, self).__init__(*args, **kwargs) def _check_setitem(self, attr, values): @@ -1276,6 +1284,16 @@ class baseCert(CransLdapObject): if domain in [self['certificat'][0]['subject']['CN']] + self['certificat'][0]['extensions'].get('subjectAltName',[]): raise EnvironmentError("Vous ne pouvez pas retirer le domaine %s alors qu'il est déclaré dans le certificat" % domain) + def private(self, privatekey, encrypted): + if not self.mode in ['w', 'rw']: + return + if u"privateKey" in self['objectClass']: + return + self._modifs['objectClass'].append(u"privateKey") + self.attribs.extend(self.private_attribs) + self['encrypted']=encrypted + self['privatekey']=privatekey + def tlsa(self, certificatUsage, matchingType): if not self.mode in ['w', 'rw']: return