[attributs, crans_utils, objets] Ajout d'attributs encrypted, privatekey, csr et objectClass privateKey
On peut générer/ajouter des clefs privées via gest_crans_lc. Elles sont forcément chiffré avec une passphrase. Pour les machines crans, elle est dans /etc/crans/secrets. Pour les autres machines elle n'est pas connue du crans et demandée à l'utilisateur lorsqu'il y a besoin de la clef privée pour des opérations. Du coup, on peut générer des csr automatiquement dans gest_crans_lc, ça m'a l'air bien pratique, ça évite d'oublier des subjectAltName par exemple quand on renouvel le certificat comme les nom du certificat sont également dans ldap. On peut aussi, juste avec une requête ldap voir quels sont les certificats qui vont bientôt expirer (&(objectClass=x509Cert)(end<int(time.time())+delai)).
This commit is contained in:
parent
480497ce4e
commit
72e35bcb32
3 changed files with 118 additions and 35 deletions
76
attributs.py
76
attributs.py
|
@ -47,7 +47,7 @@ import smtplib
|
||||||
import random
|
import random
|
||||||
import string
|
import string
|
||||||
from unicodedata import normalize
|
from unicodedata import normalize
|
||||||
from crans_utils import format_tel, format_mac, mailexist, validate_name, ip4_of_rid, ip6_of_mac
|
from crans_utils import format_tel, format_mac, mailexist, validate_name, ip4_of_rid, ip6_of_mac, fetch_cert_info
|
||||||
import itertools
|
import itertools
|
||||||
|
|
||||||
sys.path.append("/usr/scripts")
|
sys.path.append("/usr/scripts")
|
||||||
|
@ -397,7 +397,7 @@ class objectClass(Attr):
|
||||||
if val not in [ 'top', 'organizationalUnit', 'posixAccount', 'shadowAccount',
|
if val not in [ 'top', 'organizationalUnit', 'posixAccount', 'shadowAccount',
|
||||||
'proprio', 'adherent', 'club', 'machine', 'machineCrans',
|
'proprio', 'adherent', 'club', 'machine', 'machineCrans',
|
||||||
'borneWifi', 'machineWifi', 'machineFixe', 'x509Cert', 'TLSACert',
|
'borneWifi', 'machineWifi', 'machineFixe', 'x509Cert', 'TLSACert',
|
||||||
'baseCert', 'cransAccount', 'service', 'facture', 'freeMid' ]:
|
'baseCert', 'cransAccount', 'service', 'facture', 'freeMid', 'privateKey' ]:
|
||||||
raise ValueError("Pourquoi insérer un objectClass=%r ?" % val)
|
raise ValueError("Pourquoi insérer un objectClass=%r ?" % val)
|
||||||
else:
|
else:
|
||||||
self.value = unicode(val)
|
self.value = unicode(val)
|
||||||
|
@ -1555,6 +1555,49 @@ class revocked(boolAttr):
|
||||||
can_modify = [nounou]
|
can_modify = [nounou]
|
||||||
legend = "Détermine si le certificat est révoqué"
|
legend = "Détermine si le certificat est révoqué"
|
||||||
|
|
||||||
|
@crans_attribute
|
||||||
|
class encrypted(boolAttr):
|
||||||
|
ldap_name = "encrypted"
|
||||||
|
singlevalue = True
|
||||||
|
optional = True
|
||||||
|
can_modify = [nounou, parent]
|
||||||
|
legend = "Détermine si une clef privée est chiffrée"
|
||||||
|
|
||||||
|
@crans_attribute
|
||||||
|
class privatekey(Attr):
|
||||||
|
ldap_name = "privatekey"
|
||||||
|
python_type = str
|
||||||
|
can_modify = [parent, nounou]
|
||||||
|
legend = "Clef privée"
|
||||||
|
|
||||||
|
@crans_attribute
|
||||||
|
class csr(Attr):
|
||||||
|
ldap_name = "csr"
|
||||||
|
python_type = str
|
||||||
|
can_modify = [parent, nounou]
|
||||||
|
legend = "requête de signature de certificat"
|
||||||
|
|
||||||
|
def _format_cert(self, csr):
|
||||||
|
import OpenSSL
|
||||||
|
try:
|
||||||
|
x509 = OpenSSL.crypto.load_certificate_request(OpenSSL.crypto.FILETYPE_PEM, csr)
|
||||||
|
except:
|
||||||
|
print csr
|
||||||
|
raise
|
||||||
|
data = fetch_cert_info(x509)
|
||||||
|
return data
|
||||||
|
|
||||||
|
def parse_value(self, csr):
|
||||||
|
if self.parent.mode in ['w', 'rw']:
|
||||||
|
data = self._format_cert(csr)
|
||||||
|
self.data = data
|
||||||
|
|
||||||
|
for hostname in [data['subject']['CN']] + data['extensions'].get('subjectAltName',[]):
|
||||||
|
if hostname not in self.parent['hostCert']:
|
||||||
|
self.parent['hostCert'].append(unicode(hostname))
|
||||||
|
|
||||||
|
self.value = csr
|
||||||
|
|
||||||
@crans_attribute
|
@crans_attribute
|
||||||
class certificat(Attr):
|
class certificat(Attr):
|
||||||
ldap_name = "certificat"
|
ldap_name = "certificat"
|
||||||
|
@ -1567,20 +1610,6 @@ class certificat(Attr):
|
||||||
super(certificat, self).__init__(*args, **kwargs)
|
super(certificat, self).__init__(*args, **kwargs)
|
||||||
self.data = None
|
self.data = None
|
||||||
|
|
||||||
def _decode_subjectAltName(self, data):
|
|
||||||
from pyasn1.codec.der import decoder
|
|
||||||
from pyasn1_modules.rfc2459 import SubjectAltName
|
|
||||||
altName = []
|
|
||||||
sa_names = decoder.decode(data, asn1Spec=SubjectAltName())[0]
|
|
||||||
for name in sa_names:
|
|
||||||
name_type = name.getName()
|
|
||||||
if name_type == 'dNSName':
|
|
||||||
altName.append(unicode(name.getComponent()))
|
|
||||||
# Cacert met des othername, du coup, on ignore juste
|
|
||||||
# else:
|
|
||||||
# raise ValueError("Seulement les dNSName sont supporté pour l'extension de certificat SubjectAltName (et pas %s)" % name_type)
|
|
||||||
return altName
|
|
||||||
|
|
||||||
def _format_cert(self, certificat):
|
def _format_cert(self, certificat):
|
||||||
import OpenSSL
|
import OpenSSL
|
||||||
try:
|
try:
|
||||||
|
@ -1593,20 +1622,7 @@ class certificat(Attr):
|
||||||
x509 = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_ASN1, certificat)
|
x509 = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_ASN1, certificat)
|
||||||
except Exception:
|
except Exception:
|
||||||
raise ValueError("Format du certificat invalide, est-il bien au format DER ou PEM ?")
|
raise ValueError("Format du certificat invalide, est-il bien au format DER ou PEM ?")
|
||||||
data = {}
|
data = fetch_cert_info(x509)
|
||||||
data['subject'] = dict(x509.get_subject().get_components())
|
|
||||||
data['issuer'] = dict(x509.get_issuer().get_components())
|
|
||||||
data['start'] = int(time.mktime(time.strptime(x509.get_notBefore(), '%Y%m%d%H%M%SZ')))
|
|
||||||
data['end'] = int(time.mktime(time.strptime(x509.get_notAfter(), '%Y%m%d%H%M%SZ')))
|
|
||||||
data['serialNumber'] = unicode(int(x509.get_serial_number()))
|
|
||||||
data['extensions'] = {}
|
|
||||||
for i in range(0, x509.get_extension_count()):
|
|
||||||
ext = x509.get_extension(i)
|
|
||||||
ext_name = ext.get_short_name()
|
|
||||||
if ext_name == 'subjectAltName':
|
|
||||||
data['extensions'][ext_name] = self._decode_subjectAltName(ext.get_data())
|
|
||||||
else:
|
|
||||||
data['extensions'][ext_name] = str(ext)
|
|
||||||
return (certificat, data)
|
return (certificat, data)
|
||||||
|
|
||||||
def parse_value(self, certificat):
|
def parse_value(self, certificat):
|
||||||
|
|
|
@ -38,6 +38,7 @@ import smtplib
|
||||||
import sys
|
import sys
|
||||||
import os
|
import os
|
||||||
import base64
|
import base64
|
||||||
|
import collections
|
||||||
import hashlib
|
import hashlib
|
||||||
import ldap.filter
|
import ldap.filter
|
||||||
sys.path.append('/usr/scripts/gestion')
|
sys.path.append('/usr/scripts/gestion')
|
||||||
|
@ -243,3 +244,51 @@ def hash_password(password, salt=None):
|
||||||
raise ValueError("salt devrait faire au moins 8 octets")
|
raise ValueError("salt devrait faire au moins 8 octets")
|
||||||
|
|
||||||
return '{SSHA}' + base64.b64encode(hashlib.sha1(password + salt).digest() + salt)
|
return '{SSHA}' + base64.b64encode(hashlib.sha1(password + salt).digest() + salt)
|
||||||
|
|
||||||
|
def decode_subjectAltName(data):
|
||||||
|
from pyasn1.codec.der import decoder
|
||||||
|
from pyasn1_modules.rfc2459 import SubjectAltName
|
||||||
|
altName = []
|
||||||
|
sa_names = decoder.decode(data, asn1Spec=SubjectAltName())[0]
|
||||||
|
for name in sa_names:
|
||||||
|
name_type = name.getName()
|
||||||
|
if name_type == 'dNSName':
|
||||||
|
altName.append(unicode(name.getComponent()))
|
||||||
|
# Cacert met des othername, du coup, on ignore juste
|
||||||
|
# else:
|
||||||
|
# raise ValueError("Seulement les dNSName sont supporté pour l'extension de certificat SubjectAltName (et pas %s)" % name_type)
|
||||||
|
return altName
|
||||||
|
|
||||||
|
|
||||||
|
def fetch_cert_info(x509):
|
||||||
|
# Attention, pour les X509req, la vertion de openssl utilisé et celle du fichier /usr/scripts/lib/python2.7/site-packages/pyOpenSSL-0.14-py2.7.egg
|
||||||
|
# /usr/scripts/python.sh met le path comme il faut, sinon, il faudrait faire quelque chose comme :
|
||||||
|
# for file in os.listdir('/usr/scripts/lib/python2.7/site-packages/'):
|
||||||
|
# if file.endswith(".egg"):
|
||||||
|
# sys.path.insert(2, '/usr/scripts/lib/python2.7/site-packages/%s' % file)
|
||||||
|
# sys.path.insert(2, '/usr/scripts/lib/python2.7/site-packages/')
|
||||||
|
import OpenSSL
|
||||||
|
data = {}
|
||||||
|
data['subject'] = dict(x509.get_subject().get_components())
|
||||||
|
if isinstance(x509, OpenSSL.crypto.X509):
|
||||||
|
data['issuer'] = dict(x509.get_issuer().get_components())
|
||||||
|
data['start'] = int(time.mktime(time.strptime(x509.get_notBefore(), '%Y%m%d%H%M%SZ')))
|
||||||
|
data['end'] = int(time.mktime(time.strptime(x509.get_notAfter(), '%Y%m%d%H%M%SZ')))
|
||||||
|
data['serialNumber'] = unicode(int(x509.get_serial_number()))
|
||||||
|
data['extensions'] = collections.defaultdict(list)
|
||||||
|
def do_ext(data, ext):
|
||||||
|
ext_name = ext.get_short_name()
|
||||||
|
if ext_name == 'subjectAltName':
|
||||||
|
data['extensions'][ext_name] = decode_subjectAltName(ext.get_data())
|
||||||
|
elif ext_name == 'extendedKeyUsage':
|
||||||
|
data['extensions'][ext_name].append(str(ext))
|
||||||
|
else:
|
||||||
|
data['extensions'][ext_name] = str(ext)
|
||||||
|
if isinstance(x509, OpenSSL.crypto.X509):
|
||||||
|
for i in range(0, x509.get_extension_count()):
|
||||||
|
ext = x509.get_extension(i)
|
||||||
|
do_ext(data, ext)
|
||||||
|
else:
|
||||||
|
for ext in x509.get_extensions():
|
||||||
|
do_ext(data, ext)
|
||||||
|
return data
|
||||||
|
|
28
objets.py
28
objets.py
|
@ -453,9 +453,12 @@ class CransLdapObject(object):
|
||||||
orig_ldif = self.attrs.to_ldif()
|
orig_ldif = self.attrs.to_ldif()
|
||||||
for attr in binary:
|
for attr in binary:
|
||||||
ldif['%s;binary' % attr]=ldif[attr]
|
ldif['%s;binary' % attr]=ldif[attr]
|
||||||
orig_ldif['%s;binary' % attr]=orig_ldif[attr]
|
orig_ldif['%s;binary' % attr]=orig_ldif.get(attr, [])
|
||||||
del(ldif[attr])
|
del(ldif[attr])
|
||||||
del(orig_ldif[attr])
|
try:
|
||||||
|
del(orig_ldif[attr])
|
||||||
|
except KeyError:
|
||||||
|
pass
|
||||||
|
|
||||||
return modifyModlist(orig_ldif, ldif)
|
return modifyModlist(orig_ldif, ldif)
|
||||||
|
|
||||||
|
@ -919,9 +922,9 @@ class machine(CransLdapObject):
|
||||||
self._proprio = new_cransldapobject(self.conn, self.parent_dn, self.mode if mode is None else mode)
|
self._proprio = new_cransldapobject(self.conn, self.parent_dn, self.mode if mode is None else mode)
|
||||||
return self._proprio
|
return self._proprio
|
||||||
|
|
||||||
def certificats(self):
|
def certificats(self, refresh=False):
|
||||||
"""Renvoie la liste des certificats de la machine"""
|
"""Renvoie la liste des certificats de la machine"""
|
||||||
if self._certificats is None:
|
if refresh or self._certificats is None:
|
||||||
self._certificats = self.conn.search(u'xid=*', dn = self.dn, scope = 1, mode=self.mode)
|
self._certificats = self.conn.search(u'xid=*', dn = self.dn, scope = 1, mode=self.mode)
|
||||||
for m in self._certificats:
|
for m in self._certificats:
|
||||||
m._machine = self
|
m._machine = self
|
||||||
|
@ -1239,13 +1242,16 @@ class baseCert(CransLdapObject):
|
||||||
variables.modified: [attributs.nounou, attributs.bureau, attributs.parent],
|
variables.modified: [attributs.nounou, attributs.bureau, attributs.parent],
|
||||||
variables.deleted: [attributs.nounou, attributs.bureau, attributs.parent],
|
variables.deleted: [attributs.nounou, attributs.bureau, attributs.parent],
|
||||||
}
|
}
|
||||||
attribs = [ attributs.xid, attributs.certificat, attributs.hostCert, attributs.historique]
|
attribs = [ attributs.xid, attributs.certificat, attributs.hostCert, attributs.historique,
|
||||||
|
attributs.info, attributs.csr ]
|
||||||
|
|
||||||
tlsa_attribs = [ attributs.certificatUsage, attributs.selector, attributs.matchingType,
|
tlsa_attribs = [ attributs.certificatUsage, attributs.selector, attributs.matchingType,
|
||||||
attributs.portTCPin, attributs.portUDPin]
|
attributs.portTCPin, attributs.portUDPin]
|
||||||
x509_attribs = [ attributs.issuerCN, attributs.start, attributs.end,
|
x509_attribs = [ attributs.issuerCN, attributs.start, attributs.end,
|
||||||
attributs.crlUrl, attributs.revocked, attributs.serialNumber ]
|
attributs.crlUrl, attributs.revocked, attributs.serialNumber ]
|
||||||
|
|
||||||
|
private_attribs = [ attributs.privatekey, attributs.encrypted ]
|
||||||
|
|
||||||
ldap_name = "baseCert"
|
ldap_name = "baseCert"
|
||||||
|
|
||||||
protected_issuer = [u'CAcert Class 3 Root', u'CA Cert Signing Authority']
|
protected_issuer = [u'CAcert Class 3 Root', u'CA Cert Signing Authority']
|
||||||
|
@ -1261,6 +1267,8 @@ class baseCert(CransLdapObject):
|
||||||
self.attribs.extend(self.tlsa_attribs)
|
self.attribs.extend(self.tlsa_attribs)
|
||||||
if 'x509Cert' in self['objectClass']:
|
if 'x509Cert' in self['objectClass']:
|
||||||
self.attribs.extend(self.x509_attribs)
|
self.attribs.extend(self.x509_attribs)
|
||||||
|
if "privateKey" in self['objectClass']:
|
||||||
|
self.attribs.extend(self.private_attribs)
|
||||||
super(baseCert, self).__init__(*args, **kwargs)
|
super(baseCert, self).__init__(*args, **kwargs)
|
||||||
|
|
||||||
def _check_setitem(self, attr, values):
|
def _check_setitem(self, attr, values):
|
||||||
|
@ -1276,6 +1284,16 @@ class baseCert(CransLdapObject):
|
||||||
if domain in [self['certificat'][0]['subject']['CN']] + self['certificat'][0]['extensions'].get('subjectAltName',[]):
|
if domain in [self['certificat'][0]['subject']['CN']] + self['certificat'][0]['extensions'].get('subjectAltName',[]):
|
||||||
raise EnvironmentError("Vous ne pouvez pas retirer le domaine %s alors qu'il est déclaré dans le certificat" % domain)
|
raise EnvironmentError("Vous ne pouvez pas retirer le domaine %s alors qu'il est déclaré dans le certificat" % domain)
|
||||||
|
|
||||||
|
def private(self, privatekey, encrypted):
|
||||||
|
if not self.mode in ['w', 'rw']:
|
||||||
|
return
|
||||||
|
if u"privateKey" in self['objectClass']:
|
||||||
|
return
|
||||||
|
self._modifs['objectClass'].append(u"privateKey")
|
||||||
|
self.attribs.extend(self.private_attribs)
|
||||||
|
self['encrypted']=encrypted
|
||||||
|
self['privatekey']=privatekey
|
||||||
|
|
||||||
def tlsa(self, certificatUsage, matchingType):
|
def tlsa(self, certificatUsage, matchingType):
|
||||||
if not self.mode in ['w', 'rw']:
|
if not self.mode in ['w', 'rw']:
|
||||||
return
|
return
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue