scripts/gestion/dialog/certificat.py
Valentin Samir acd7d8fd7f [gest_crans_lc] Ajout des validate_changes() même là où ça sert à rien
Pour ne pas avoir de surprise si quelqu'un modifie validate_changes() dans lc_ldap.
2014-12-01 13:32:55 +01:00

539 lines
28 KiB
Python

#!/bin/bash /usr/scripts/python.sh
# -*- coding: utf-8 -*-
u"""
Copyright (C) Valentin Samir
Licence : GPLv3
"""
import os
import sys
import ssl
import time
import inspect
import tempfile
import traceback
if '/usr/scripts' not in sys.path:
sys.path.append('/usr/scripts')
from OpenSSL import crypto, SSL
from gestion.cert_utils import createCertRequest
import lc_ldap.objets as objets
import lc_ldap.attributs as attributs
import lc
from CPS import TailCall, tailcaller, Continue
class Dialog(lc.Dialog):
def certificat_tlsa(self, certificat, cont, values=None):
"""Menu d'éditions des paramètres TLSA d'un certificat"""
separateur = ' '
form = {
'Type de certificat' : {'ldap_name' : 'certificatUsage', 'text':"".join(str(s) for s in certificat.get('certificatUsage', [])), 'len':1},
'Type de correspondance' : {'ldap_name' : 'matchingType', 'text':"".join(str(s) for s in certificat.get('matchingType', [])), 'len':1},
'Ports TCP' : {'ldap_name' : 'portTCPin', 'text':separateur.join(str(p) for p in certificat.get('portTCPin', [])), 'len':30},
'Ports UDP' : {'ldap_name' : 'portUDPin', 'text':separateur.join(str(p) for p in certificat.get('portUDPin', [])), 'len':30},
}
form_order = ['Type de certificat', 'Type de correspondance', 'Ports TCP', 'Ports UDP']
def box(fields_values=None):
fields = [("%s : " % k, form[k]['text'], form[k]['len'] + 1, form[k]['len']) for k in form_order]
return self.dialog.form(
text="""Type de certificat : Type de correspondance :
* 0 - CA pinning * 0 - certificat entier
* 1 - Cert pinning * 1 - sha256
* 2 - CA auto signé * 2 - sha512
* 3 - Cert autosigné""",
no_collapse=True,
height=0, width=0, form_height=0,
timeout=self.timeout,
fields=fields_values if fields_values else fields,
title="Paramètres TLS d'un certificat de la machine %s" % certificat.machine()['host'][0],
backtitle="Gestion des certificats des machines du Crans")
def todo(form, values, certificat, cont):
if not values['certificatUsage'] in ['0', '1', '2', '3']:
raise ValueError("""Type de certificat invalide :
les valeurs valident sont :
* 0 pour du CA pinning
(le certificat doit être une autorité de certification valide)
* 1 pour du certificat pinning
(le certificat doit déjà être validé par les navigateur)
* 2 pour ajouter un CA
(pour les autorité de certification autosigné)
* 3 pour les certificats autosigné"""
)
if not values['matchingType'] in ['0', '1', '2']:
raise ValueError("""Type de correspondance invalide :
les valeurs valident sont :
* 0 le certificat sera mis entièrement dans le dns
* 1 le sha256 du certificat sera mis dans le dns
* 2 le sha512 du certificat sera mis dans le dns"""
)
with self.conn.search(dn=certificat.dn, scope=0, mode='rw')[0] as certificat:
if "TLSACert" in certificat['objectClass']:
certificat['certificatUsage'] = unicode(values['certificatUsage'])
certificat['matchingType'] = unicode(values['matchingType'])
else:
certificat.tlsa(values['certificatUsage'], values['matchingType'])
certificat['portTCPin'] = [unicode(s, 'utf-8') for s in values['portTCPin'].split(separateur) if s]
certificat['portUDPin'] = [unicode(s, 'utf-8') for s in values['portUDPin'].split(separateur) if s]
certificat.validate_changes()
certificat.history_gen()
certificat.save()
raise Continue(cont(certificat=certificat))
(code, output) = self.handle_dialog(cont, box, values)
values = dict(zip([form[k]['ldap_name'] for k in form_order], output))
fields_values = [("%s : " % k, values.get(form[k]['ldap_name'], ""), form[k]['len'] + 1, form[k]['len']) for k in form_order]
self_cont=TailCall(self.certificat_tlsa, certificat=certificat, cont=cont, values=fields_values)
return self.handle_dialog_result(
code=code,
output=output,
cancel_cont=cont,
error_cont=self_cont,
codes_todo=[([self.dialog.DIALOG_OK], todo, [form, values, certificat, cont])]
)
def create_certificat(self, cont, machine=None, certificat=None):
"""Permet d'ajouter un certificat à une machine à partir du PEM du certificat"""
if machine is None and certificat is None:
raise EnvironmentError("Il faut fournir au moins une machine ou un certificat")
# input multiline en utilisant un editbox
def box():
fp, path = tempfile.mkstemp()
os.close(fp)
cmd = ['--editbox', path, "0", "0"]
(code, output) = self.dialog._perform(*(cmd,),
no_mouse=True, # On désactive la sourie sinon dialog segfault si on clic
backtitle="Appuyez sur CTRL+MAJ+V pour coller",
timeout=self.timeout,
title="Création d'un certificat, entrez le PEM du certificat")
os.remove(path)
if code == self.dialog.DIALOG_OK:
return code, output
else:
return code, None
def todo(machine, certificat, cont):
if certificat:
with self.conn.search(dn=certificat.dn, scope=0, mode='rw')[0] as certificat:
certificat['certificat'] = unicode(pem.strip(), 'utf-8')
certificat.validate_changes()
certificat.history_gen()
certificat.save()
else:
with self.conn.newCertificat(machine.dn, {}) as certificat:
certificat['certificat'] = unicode(pem.strip(), 'utf-8')
certificat.create()
raise Continue(cont(certificat=certificat, machine=certificat.machine()))
(code, pem) = self.handle_dialog(cont, box)
self_cont = TailCall(self.create_certificat, machine=machine, certificat=certificat, cont=cont)
return self.handle_dialog_result(
code=code,
output=pem,
cancel_cont=cont,
error_cont=self_cont,
codes_todo=[([self.dialog.DIALOG_OK], todo, [machine, certificat, cont])]
)
def create_privatekey(self, cont, machine=None, certificat=None, imp=False, size=4096):
"""Permet de générer ou importer une clef privée à une machine"""
if machine is None and certificat is None:
raise EnvironmentError("Il faut fournir au moins une machine ou un certificat")
# input multiline en utilisant un editbox
def box():
fp, path = tempfile.mkstemp()
os.close(fp)
cmd = ['--editbox', path, "0", "0"]
(code, output) = self.dialog._perform(*(cmd,),
no_mouse=True, # On désactive la sourie sinon dialog segfault si on clic
backtitle="Appuyez sur CTRL+MAJ+V pour coller",
timeout=self.timeout,
title="Création d'un certificat, entrez le PEM du certificat")
os.remove(path)
if code == self.dialog.DIALOG_OK:
return code, output
else:
return code, None
def todo(machine, certificat, pem, imp, size, cont):
if not imp:
if not machine:
machine=certificat.machine()
if "machineCrans" in machine['objectClass']:
passphrase = secrets.get('privatekey_passphrase')
else:
self.dialog.msgbox("Vous aller être inviter à entrez un mot de passe. Ce mot de passe est utilisé pour chiffrer la clef privée qui va être générée dans la base de donnée du crans.\n\nCe mot de passe n'est pas conservé, sous quelque forme que se soit par le crans.\nAussi, en cas de perte, la clef privée deviendrait inutilisable.\n Pensez à le sauvegarder quelque part",
title="Génération d'une clée privée",
width=70, timeout=self.timeout,
height=12)
passphrase = self.get_password(cont)
self.dialog.infobox("Génération d'une clef privée RSA de taille %s en cours.\nMerci de patienter" % size)
pem = crypto.PKey()
pem.generate_key(crypto.TYPE_RSA, size)
pem = crypto.dump_privatekey(crypto.FILETYPE_PEM, pem, "des3", passphrase)
elif not pem.startswith("-----BEGIN ENCRYPTED PRIVATE KEY-----"):
raise ValueError("On n'accepte que des clef chiffrée PKCS#8 en PEM. Donc la clef doit commencer par -----BEGIN ENCRYPTED PRIVATE KEY-----")
if certificat:
if "privatekey" in certificat:
raise ValueError("Il y a déjà une clef privée, merci d'annuler")
with self.conn.search(dn=certificat.dn, scope=0, mode='rw')[0] as certificat:
certificat.private(pem, encrypted=True)
certificat.validate_changes()
certificat.history_gen()
certificat.save()
self.dialog.msgbox("Clef privée bien ajouté", timeout=self.timeout, title="Ajout d'une clef privée")
raise Continue(cont(certificat=certificat, machine=certificat.machine()))
else:
with self.conn.newCertificat(machine.dn, {}) as certificat:
certificat['hostCert']=unicode(machine['host'][0])
certificat.private(pem, encrypted=True)
certificat.create()
self.dialog.msgbox("Clef privée créée avec succès", timeout=self.timeout, title="Création d'une clef privée")
raise Continue(cont(certificat=certificat, machine=certificat.machine()))
if imp:
(code, pem) = self.handle_dialog(cont, box)
else:
(code, pem) = (self.dialog.DIALOG_OK, "")
self_cont = TailCall(self.create_privatekey, machine=machine, certificat=certificat, cont=cont, imp=imp, size=size)
return self.handle_dialog_result(
code=code,
output=pem,
cancel_cont=cont,
error_cont=self_cont,
codes_todo=[([self.dialog.DIALOG_OK], todo, [machine, certificat, pem, imp, size, cont])]
)
def delete_certificat(self, certificat, cont):
"""Supprime un certificat"""
def todo(certificat, cont):
if self.confirm_item(item=certificat, title="Voulez vous vraiement supprimer le certificat ?"):
with self.conn.search(dn=certificat.dn, scope=0, mode='rw')[0] as certificat:
certificat.delete()
self.dialog.msgbox("Le certificat a bien été supprimé", timeout=self.timeout, title="Suppression d'un certificat")
raise Continue(cont(certificat=None, machine=certificat.machine(refresh=True)))
else:
raise Continue(cont(certificat=certificat))
return self.handle_dialog_result(
code=self.dialog.DIALOG_OK,
output="",
cancel_cont=cont,
error_cont=TailCall(self.delete_certificat, certificat=certificat, cont=cont),
codes_todo=[([self.dialog.DIALOG_OK], todo, [certificat, cont])]
)
def gen_csr(self, certificat, cont):
"""Permet de générer un csr à partir de la clef privée du certificat"""
def todo(certificat, self_cont, cont):
if certificat['encrypted']:
if "machineCrans" in certificat.machine()["objectClass"]:
passphrase = secrets.get('privatekey_passphrase')
else:
self.dialog.msgbox("Mercie de fournir le mot de passe chiffrant la clef privée.\nIl a été choisis lors de la création de la clef.",
title="Génération d'un CSR",
width=70,
height=10, timeout=self.timeout)
passphrase = self.get_password(cont, confirm=False)
else:
passphrase = None
try:
if passphrase:
pkey = crypto.load_privatekey(crypto.FILETYPE_PEM, str(certificat['privatekey'][0]), passphrase)
else:
pkey = crypto.load_privatekey(crypto.FILETYPE_PEM, str(certificat['privatekey'][0]))
except crypto.Error as e:
if len(e.message) > 2 and len(e.message[2]) > 2 and e.message[2][2] == 'bad password read':
self.dialog.msgbox("Mauvais mot de passe", timeout=self.timeout)
raise Continue(self_cont)
else:
raise
req = createCertRequest(pkey,
digest="sha1",
subjectAltName=[str(host) for host in certificat['hostCert'][1:]],
C=u"FR",
ST=u"Ile de France",
L=u"Cachan",
O=u"Association Cachan Réseaux A Normal SUP (C.R.A.N.S)",
OU=u"Crans",
CN=unicode(certificat['hostCert'][0]),
)
csr = crypto.dump_certificate_request(crypto.FILETYPE_PEM, req)
with self.conn.search(dn=certificat.dn, scope=0, mode='rw')[0] as certificat:
certificat['csr']=unicode(csr)
certificat.validate_changes()
certificat.history_gen()
certificat.save()
self.handle_dialog(cont, box, csr)
if self.dialog.yesno("Remplacer le certificat actuel ?", timeout=self.timeout) == self.dialog.DIALOG_OK:
return self.create_certificat(certificat=certificat, cont=cont(certificat=certificat))
else:
raise Continue(cont(certificat=certificat))
self_cont = TailCall(self.gen_csr, certificat=certificat, cont=cont)
return self.handle_dialog_result(
code=self.dialog.DIALOG_OK,
output="",
cancel_cont=cont,
error_cont=self_cont,
codes_todo=[([self.dialog.DIALOG_OK], todo, [certificat, self_cont, cont])]
)
def get_certificat(self, certificat, cont, privatekey=False, csr=False):
"""Permet d'afficher le certificat courant"""
def box(text):
fp, path = tempfile.mkstemp()
os.write(fp, text)
os.close(fp)
self.dialog.textbox(filename=path, height=0, width=0,
backtitle="Appuyez sur CTRL+MAJ+V pour coller",
title="Récupération d'un certificat",
no_mouse=True, timeout=self.timeout,)
os.remove(path)
return
if privatekey:
self.handle_dialog(cont, box, unicode(certificat['privatekey'][0]))
elif csr:
self.handle_dialog(cont, box, unicode(certificat['csr'][0]))
else:
self.handle_dialog(cont, box, unicode(ssl.DER_cert_to_PEM_cert(str(certificat['certificat'][0]))))
raise Continue(cont)
def create_csr(self, cont, machine=None, certificat=None):
"""Permet d'ajouter un csr à une machine à partir du PEM du csr"""
if machine is None and certificat is None:
raise EnvironmentError("Il faut fournir au moins une machine ou un certificat")
# input multiline en utilisant un editbox
def box():
fp, path = tempfile.mkstemp()
os.close(fp)
cmd = ['--editbox', path, "0", "0"]
(code, output) = self.dialog._perform(*(cmd,),
no_mouse=True, # On désactive la sourie sinon dialog segfault si on clic
backtitle="Appuyez sur CTRL+MAJ+V pour coller",
timeout=self.timeout,
title="Création d'un certificat, entrez le PEM du certificat")
os.remove(path)
if code == self.dialog.DIALOG_OK:
return code, output
else:
return code, None
def todo(machine, certificat, pem, cont):
if certificat:
with self.conn.search(dn=certificat.dn, scope=0, mode='rw')[0] as certificat:
certificat['csr'] = unicode(pem.strip(), 'utf-8')
certificat.validate_changes()
certificat.history_gen()
certificat.save()
else:
with self.conn.newCertificat(machine.dn, {}) as certificat:
certificat['hostCert']=unicode(machine['host'][0])
certificat['csr'] = unicode(pem.strip(), 'utf-8')
certificat.create()
raise Continue(cont(certificat=certificat, machine=certificat.machine()))
(code, pem) = self.handle_dialog(cont, box)
self_cont = TailCall(self.create_csr, machine=machine, certificat=certificat, cont=cont)
return self.handle_dialog_result(
code=code,
output=pem,
cancel_cont=cont,
error_cont=self_cont,
codes_todo=[([self.dialog.DIALOG_OK], todo, [machine, certificat, pem, cont])]
)
def modif_machine_certificat(self, machine, cont, tag=None, certificat=None):
"""Permet l'édition d'un certificat d'une machine"""
self_cont = TailCall(self.modif_machine_certificat, machine=machine, cont=cont, certificat=certificat)
if certificat is None:
certificat_index = self.edit_certificat_select(machine=machine, title="Modification des certificats de %s" % machine['host'][0], cont=cont)
if certificat_index == 'new':
raise Continue(TailCall(self.create_certificat, machine=machine, cont=self_cont))
elif certificat_index == 'priv':
raise Continue(TailCall(self.create_privatekey, machine=machine, cont=self_cont))
elif certificat_index == 'csr':
raise Continue(TailCall(self.create_csr, machine=machine, cont=self_cont))
certificat = machine.certificats()[certificat_index]
a = attributs
menu_droits = {
'Hostname':[a.parent, a.nounou],
'AddPrivateKey':[a.parent, a.nounou],
'AddCertificate':[a.parent, a.nounou],
'SetCertificate':[a.parent, a.nounou],
'TLSA':[a.parent, a.nounou],
'Autre':[a.nounou],
'GetPriv':[a.parent, a.nounou],
'GetCert':[a.parent, a.nounou, a.cableur],
'GetCSR':[a.parent, a.nounou, a.cableur],
'GenCSR':[a.parent, a.nounou],
'Remarque':[a.parent, a.nounou, a.cableur],
'Supprimer':[a.parent, a.nounou],
}
menu = {
'Hostname' : {'text':"Noms d'hôte utilisant le certificat", "help":'Il doivent être inclus dans les host et hostAlias de la machine parente', "attribut":attributs.hostCert},
'AddPrivateKey' : {'text': 'Ajouter la clef privée', 'help':'La clef doit être obligatoirement chiffrée avant envoi', "callback":TailCall(self.create_privatekey, imp=True)},
'AddCertificate' : {'text': 'Ajouter un certificat X509', 'help':'', "callback":self.create_certificat},
'SetCertificate' : {'text': 'Remplacer le certificat X509', 'help':'', "callback":self.create_certificat},
'TLSA' : {'text':"Paramètres pour les champs dns TLSA", 'help':'Permet de configurer DANE pour le certificat X509', "callback":self.certificat_tlsa},
'Autre': {'text' : "Modifier les attribut booléen comme revocked", 'help':'', "callback":self.modif_certificat_boolean},
'GetPriv' : {'text' : 'Récupérer la clef privée', 'help':"Affiche la clef privée telle qu'elle est dans la base de donnée, c'est à dire chiffrée", 'callback':TailCall(self.get_certificat, privatekey=True)},
'GetCert' : {'text' : 'Récupérer le certificat', 'help':"Affiche le certificat au format PEM", 'callback':self.get_certificat},
'GetCSR' : {'text' : 'Récupérer la requête de signature de certificat', 'help':"Affiche le CSR au format PEM", 'callback':TailCall(self.get_certificat, csr=True)},
'GenCSR' : {'text' : 'Générer un CSR, puis remplacer le certificat', 'help':'Généré à partir de la clef privée. Les noms (CN et subjectAltName) sont pris à partir de Hostname (attribut hostCert)', "callback":self.gen_csr},
'Remarque' : {'text': 'Mettre des remarques', 'help':'La première apparait dans la liste des certificats', 'attribut':attributs.info},
'Supprimer' : {'text' : "Supprimer le certificat", 'help':'', "callback":self.delete_certificat},
}
if "privateKey" in certificat["objectClass"]:
menu
menu_order = ['Hostname']
if not "privateKey" in certificat['objectClass']:
menu_order.extend(['AddPrivateKey', 'SetCertificate'])
if not "x509Cert" in certificat['objectClass']:
menu_order.extend([ 'AddCertificate'])
if "x509Cert" in certificat['objectClass']:
menu_order.extend(['TLSA', 'Autre', 'GetCert'])
if certificat['csr']:
menu_order.extend(['GetCSR'])
if "privateKey" in certificat['objectClass']:
if attributs.nounou in self.conn.droits or machine.dn.startswith(self.conn.dn):
menu_order.extend(['GetPriv'])
menu_order.extend(['GenCSR'])
menu_order.extend(['Remarque', 'Supprimer'])
def box(default_item=None):
text="Certificat de %s, xid=%s :\n" % (certificat['hostCert'][0], certificat['xid'][0])
if "x509Cert" in certificat['objectClass']:
text += " * Certificat N°0x%X émis par %s, valable du %s au %s\n" % (
int(str(certificat['serialNumber'][0])),
certificat['issuerCN'][0],
time.strftime("%d/%m/%Y", time.localtime(int(certificat['start'][0]))),
time.strftime("%d/%m/%Y", time.localtime(int(certificat['end'][0])))
)
if "privateKey" in certificat['objectClass']:
text += " * Clef privée\n"
if certificat['csr']:
text += " * Requête de signature de certificat\n"
if certificat['info']:
text += str(certificat['info'][0])
return self.dialog.menu(
text,
width=0,
height=0,
menu_height=0,
item_help=1,
timeout=self.timeout,
default_item=str(default_item),
title="Modification des certificats de %s" % certificat.machine()['host'][0],
scrollbar=True,
cancel_label="Retour",
backtitle=u"Vous êtes connecté en tant que %s" % self.conn.current_login,
choices=[(key, menu[key]['text'], menu[key]['help']) for key in menu_order if self.has_right(menu_droits[key], certificat)])
def todo(tag, menu, certificat, self_cont):
if not tag in menu_order:
raise Continue(self_cont(certificat=certificat))
else:
if 'callback' in menu[tag]:
raise Continue(TailCall(menu[tag]['callback'], certificat=certificat, cont=self_cont(certificat=certificat, tag=tag)))
elif 'attribut' in menu[tag]:
raise Continue(TailCall(self.modif_certificat_attributs, certificat=certificat, cont=self_cont(certificat=certificat, tag=tag), attr=menu[tag]['attribut'].ldap_name))
else:
raise EnvironmentError("Il n'y a ni champ 'attribut' ni 'callback' pour le tag %s" % tag)
cancel_cont = cont(machine=machine) if certificat is None else self_cont(machine=certificat.machine(), certificat=None, tag=tag)
(code, tag) = self.handle_dialog(cancel_cont, box, tag)
return self.handle_dialog_result(
code=code,
output=tag,
cancel_cont=cancel_cont,
error_cont=self_cont,
codes_todo=[([self.dialog.DIALOG_OK], todo, [tag, menu, certificat, self_cont])]
)
@tailcaller
def edit_certificat_select(self, machine, title, cont):
"""Permet de choisir un certificat existant ou nouveau d'une machine"""
a = attributs
menu_droits = {
'new':[a.parent, a.nounou],
'priv':[a.parent, a.nounou],
'csr':[a.parent, a.nounou],
}
menu = {
'new':'Ajouter un nouveau certificat',
'priv':'Générer une nouvelle clef privée',
'csr':'Ajouter une nouvelle requête de certificat',
}
menu_order = ['new', 'priv', 'csr']
menu_special = ['new', 'priv', 'csr']
def box(default_item=None):
index=0
choices = []
for key in menu_order:
if self.has_right(menu_droits[key], machine):
choices.append((key, menu[key]))
for cert in machine.certificats():
if cert['info']:
item = str(cert['info'][0])
elif "x509Cert" in cert['objectClass']:
item = "Emit par %s pour %s du %s au %s" % (cert['issuerCN'][0], ', '.join(str(cn) for cn in cert['hostCert']), time.strftime("%d/%m/%Y", time.localtime(int(cert['start'][0]))), time.strftime("%d/%m/%Y", time.localtime(int(cert['end'][0]))))
elif "privateKey" in cert['objectClass']:
item = "Clef privée de %s, xid=%s" % (cert['hostCert'][0], cert['xid'][0])
elif cert['csr']:
item = "Requête de signature de certificat pour %s, xid=%s" % (cert['hostCert'][0], cert['xid'][0])
choices.append((str(index), item))
index+=1
return self.dialog.menu(
"Modifier ou ajouter un certificat ?",
width=0,
height=0,
timeout=self.timeout,
menu_height=0,
item_help=0,
title="Modification des certificats de %s" % machine['host'][0],
scrollbar=True,
default_item=str(default_item),
cancel_label="Retour",
backtitle=u"Vous êtes connecté en tant que %s" % self.conn.current_login,
choices=choices)
def todo(tag):
if tag in ['new', 'priv', 'csr']:
return tag
else:
return int(tag)
(code, tag) = self.handle_dialog(cont, box)
retry_cont = TailCall(self.edit_certificat_select, machine=machine, title=title, cont=cont)
return self.handle_dialog_result(
code=code,
output=tag,
cancel_cont=cont(machine=machine),
error_cont=retry_cont,
codes_todo=[([self.dialog.DIALOG_OK], todo, [tag])]
)
def modif_certificat_boolean(self, certificat, cont):
"""Juste un raccourci vers edit_boolean_attributs spécifique aux certificats"""
a = attributs
attribs = [a.revocked]
return self.edit_boolean_attributs(
obj=certificat,
attribs=attribs,
title="Édition des attributs booléen d'un certificat de la machine %s" % certificat.machine()['host'][0],
update_obj='certificat',
cont=cont)
def modif_certificat_attributs(self, certificat, attr, cont):
"""Juste un raccourci vers edit_attributs spécifique aux certificats"""
return self.edit_attributs(obj=certificat, update_obj='certificat', attr=attr, title="Modification d'un certificat de la machine %s" % certificat.machine()['host'][0], cont=cont)