552 lines
28 KiB
Python
552 lines
28 KiB
Python
#!/bin/bash /usr/scripts/python.sh
|
|
# -*- coding: utf-8 -*-
|
|
|
|
u"""
|
|
Copyright (C) Valentin Samir
|
|
Licence : GPLv3
|
|
|
|
"""
|
|
import os
|
|
import sys
|
|
import ssl
|
|
import time
|
|
import inspect
|
|
import tempfile
|
|
import traceback
|
|
if '/usr/scripts' not in sys.path:
|
|
sys.path.append('/usr/scripts')
|
|
import gestion.secrets_new as secrets
|
|
|
|
from OpenSSL import crypto, SSL
|
|
|
|
from gestion.cert_utils import createCertRequest
|
|
|
|
import lc_ldap.objets as objets
|
|
import lc_ldap.attributs as attributs
|
|
|
|
import lc
|
|
from CPS import TailCall, tailcaller, Continue
|
|
|
|
class Dialog(lc.Dialog):
|
|
def certificat_tlsa(self, certificat, cont, values=None):
|
|
"""Menu d'éditions des paramètres TLSA d'un certificat"""
|
|
separateur = ' '
|
|
form = {
|
|
'Type de certificat' : {'ldap_name' : 'certificatUsage', 'text':"".join(str(s) for s in certificat.get('certificatUsage', [])), 'len':1},
|
|
'Type de correspondance' : {'ldap_name' : 'matchingType', 'text':"".join(str(s) for s in certificat.get('matchingType', [])), 'len':1},
|
|
'Ports TCP' : {'ldap_name' : 'portTCPin', 'text':separateur.join(str(p) for p in certificat.get('portTCPin', [])), 'len':30},
|
|
'Ports UDP' : {'ldap_name' : 'portUDPin', 'text':separateur.join(str(p) for p in certificat.get('portUDPin', [])), 'len':30},
|
|
}
|
|
form_order = ['Type de certificat', 'Type de correspondance', 'Ports TCP', 'Ports UDP']
|
|
def box(fields_values=None):
|
|
fields = [("%s : " % k, form[k]['text'], form[k]['len'] + 1, form[k]['len']) for k in form_order]
|
|
return self.dialog.form(
|
|
text="""Type de certificat : Type de correspondance :
|
|
* 0 - CA pinning * 0 - certificat entier
|
|
* 1 - Cert pinning * 1 - sha256
|
|
* 2 - CA auto signé * 2 - sha512
|
|
* 3 - Cert autosigné""",
|
|
no_collapse=True,
|
|
height=0, width=0, form_height=0,
|
|
timeout=self.timeout,
|
|
fields=fields_values if fields_values else fields,
|
|
title="Paramètres TLS d'un certificat de la machine %s" % certificat.machine()['host'][0],
|
|
backtitle="Gestion des certificats des machines du Crans")
|
|
|
|
def todo(form, values, certificat, cont):
|
|
if not values['certificatUsage'] in ['0', '1', '2', '3']:
|
|
raise ValueError("""Type de certificat invalide :
|
|
les valeurs valident sont :
|
|
* 0 pour du CA pinning
|
|
(le certificat doit être une autorité de certification valide)
|
|
* 1 pour du certificat pinning
|
|
(le certificat doit déjà être validé par les navigateur)
|
|
* 2 pour ajouter un CA
|
|
(pour les autorité de certification autosigné)
|
|
* 3 pour les certificats autosigné"""
|
|
)
|
|
if not values['matchingType'] in ['0', '1', '2']:
|
|
raise ValueError("""Type de correspondance invalide :
|
|
les valeurs valident sont :
|
|
* 0 le certificat sera mis entièrement dans le dns
|
|
* 1 le sha256 du certificat sera mis dans le dns
|
|
* 2 le sha512 du certificat sera mis dans le dns"""
|
|
)
|
|
with self.conn.search(dn=certificat.dn, scope=0, mode='rw')[0] as certificat:
|
|
if "TLSACert" in certificat['objectClass']:
|
|
certificat['certificatUsage'] = unicode(values['certificatUsage'])
|
|
certificat['matchingType'] = unicode(values['matchingType'])
|
|
else:
|
|
certificat.tlsa(values['certificatUsage'], values['matchingType'])
|
|
certificat['portTCPin'] = [unicode(s, 'utf-8') for s in values['portTCPin'].split(separateur) if s]
|
|
certificat['portUDPin'] = [unicode(s, 'utf-8') for s in values['portUDPin'].split(separateur) if s]
|
|
certificat.validate_changes()
|
|
certificat.history_gen()
|
|
certificat.save()
|
|
raise Continue(cont(certificat=certificat))
|
|
|
|
(code, output) = self.handle_dialog(cont, box, values)
|
|
values = dict(zip([form[k]['ldap_name'] for k in form_order], output))
|
|
fields_values = [("%s : " % k, values.get(form[k]['ldap_name'], ""), form[k]['len'] + 1, form[k]['len']) for k in form_order]
|
|
self_cont=TailCall(self.certificat_tlsa, certificat=certificat, cont=cont, values=fields_values)
|
|
return self.handle_dialog_result(
|
|
code=code,
|
|
output=output,
|
|
cancel_cont=cont,
|
|
error_cont=self_cont,
|
|
codes_todo=[([self.dialog.DIALOG_OK], todo, [form, values, certificat, cont])]
|
|
)
|
|
|
|
def create_certificat(self, cont, machine=None, certificat=None):
|
|
"""Permet d'ajouter un certificat à une machine à partir du PEM du certificat"""
|
|
if machine is None and certificat is None:
|
|
raise EnvironmentError("Il faut fournir au moins une machine ou un certificat")
|
|
# input multiline en utilisant un editbox
|
|
def box():
|
|
fp, path = tempfile.mkstemp()
|
|
os.close(fp)
|
|
cmd = ['--editbox', path, "0", "0"]
|
|
(code, output) = self.dialog._perform(*(cmd,),
|
|
no_mouse=True, # On désactive la sourie sinon dialog segfault si on clic
|
|
backtitle="Appuyez sur CTRL+MAJ+V pour coller",
|
|
timeout=self.timeout,
|
|
title="Création d'un certificat, entrez le PEM du certificat")
|
|
os.remove(path)
|
|
if code == self.dialog.DIALOG_OK:
|
|
return code, output
|
|
else:
|
|
return code, None
|
|
|
|
def todo(machine, certificat, cont):
|
|
if certificat:
|
|
with self.conn.search(dn=certificat.dn, scope=0, mode='rw')[0] as certificat:
|
|
certificat['certificat'] = unicode(pem.strip(), 'utf-8')
|
|
certificat.validate_changes()
|
|
certificat.history_gen()
|
|
certificat.save()
|
|
else:
|
|
with self.conn.newCertificat(machine.dn, {}) as certificat:
|
|
certificat['certificat'] = unicode(pem.strip(), 'utf-8')
|
|
certificat.create()
|
|
raise Continue(cont(certificat=certificat, machine=certificat.machine()))
|
|
|
|
(code, pem) = self.handle_dialog(cont, box)
|
|
self_cont = TailCall(self.create_certificat, machine=machine, certificat=certificat, cont=cont)
|
|
return self.handle_dialog_result(
|
|
code=code,
|
|
output=pem,
|
|
cancel_cont=cont,
|
|
error_cont=self_cont,
|
|
codes_todo=[([self.dialog.DIALOG_OK], todo, [machine, certificat, cont])]
|
|
)
|
|
|
|
|
|
|
|
|
|
def create_privatekey(self, cont, machine=None, certificat=None, imp=False, size=4096):
|
|
"""Permet de générer ou importer une clef privée à une machine"""
|
|
if machine is None and certificat is None:
|
|
raise EnvironmentError("Il faut fournir au moins une machine ou un certificat")
|
|
# input multiline en utilisant un editbox
|
|
def box():
|
|
fp, path = tempfile.mkstemp()
|
|
os.close(fp)
|
|
cmd = ['--editbox', path, "0", "0"]
|
|
(code, output) = self.dialog._perform(*(cmd,),
|
|
no_mouse=True, # On désactive la sourie sinon dialog segfault si on clic
|
|
backtitle="Appuyez sur CTRL+MAJ+V pour coller",
|
|
timeout=self.timeout,
|
|
title="Création d'un certificat, entrez le PEM du certificat")
|
|
os.remove(path)
|
|
if code == self.dialog.DIALOG_OK:
|
|
return code, output
|
|
else:
|
|
return code, None
|
|
|
|
def todo(machine, certificat, pem, imp, size, cont):
|
|
if not imp:
|
|
if not machine:
|
|
machine=certificat.machine()
|
|
if "machineCrans" in machine['objectClass']:
|
|
passphrase = secrets.get('privatekey_passphrase')
|
|
else:
|
|
self.dialog.msgbox("Vous aller être inviter à entrez un mot de passe. Ce mot de passe est utilisé pour chiffrer la clef privée qui va être générée dans la base de donnée du crans.\n\nCe mot de passe n'est pas conservé, sous quelque forme que se soit par le crans.\nAussi, en cas de perte, la clef privée deviendrait inutilisable.\n Pensez à le sauvegarder quelque part",
|
|
title="Génération d'une clée privée",
|
|
width=70, timeout=self.timeout,
|
|
height=12)
|
|
passphrase = self.get_password(cont)
|
|
self.dialog.infobox("Génération d'une clef privée RSA de taille %s en cours.\nMerci de patienter" % size)
|
|
pem = crypto.PKey()
|
|
pem.generate_key(crypto.TYPE_RSA, size)
|
|
pem = crypto.dump_privatekey(crypto.FILETYPE_PEM, pem, "des3", passphrase)
|
|
elif not pem.startswith("-----BEGIN ENCRYPTED PRIVATE KEY-----"):
|
|
raise ValueError("On n'accepte que des clef chiffrée PKCS#8 en PEM. Donc la clef doit commencer par -----BEGIN ENCRYPTED PRIVATE KEY-----")
|
|
if certificat:
|
|
if "privatekey" in certificat:
|
|
raise ValueError("Il y a déjà une clef privée, merci d'annuler")
|
|
with self.conn.search(dn=certificat.dn, scope=0, mode='rw')[0] as certificat:
|
|
certificat.private(pem, encrypted=True)
|
|
certificat.validate_changes()
|
|
certificat.history_gen()
|
|
certificat.save()
|
|
self.dialog.msgbox("Clef privée bien ajouté", timeout=self.timeout, title="Ajout d'une clef privée")
|
|
raise Continue(cont(certificat=certificat, machine=certificat.machine()))
|
|
else:
|
|
with self.conn.newCertificat(machine.dn, {}) as certificat:
|
|
certificat['hostCert']=unicode(machine['host'][0])
|
|
certificat.private(pem, encrypted=True)
|
|
certificat.create()
|
|
self.dialog.msgbox("Clef privée créée avec succès", timeout=self.timeout, title="Création d'une clef privée")
|
|
raise Continue(cont(certificat=certificat, machine=certificat.machine()))
|
|
|
|
if imp:
|
|
(code, pem) = self.handle_dialog(cont, box)
|
|
else:
|
|
(code, pem) = (self.dialog.DIALOG_OK, "")
|
|
self_cont = TailCall(self.create_privatekey, machine=machine, certificat=certificat, cont=cont, imp=imp, size=size)
|
|
return self.handle_dialog_result(
|
|
code=code,
|
|
output=pem,
|
|
cancel_cont=cont,
|
|
error_cont=self_cont,
|
|
codes_todo=[([self.dialog.DIALOG_OK], todo, [machine, certificat, pem, imp, size, cont])]
|
|
)
|
|
|
|
def delete_certificat(self, certificat, cont):
|
|
"""Supprime un certificat"""
|
|
def todo(certificat, cont):
|
|
if self.confirm_item(item=certificat, title="Voulez vous vraiement supprimer le certificat ?"):
|
|
with self.conn.search(dn=certificat.dn, scope=0, mode='rw')[0] as certificat:
|
|
certificat.delete()
|
|
self.dialog.msgbox("Le certificat a bien été supprimé", timeout=self.timeout, title="Suppression d'un certificat")
|
|
raise Continue(cont(certificat=None, machine=certificat.machine(refresh=True)))
|
|
else:
|
|
raise Continue(cont(certificat=certificat))
|
|
|
|
return self.handle_dialog_result(
|
|
code=self.dialog.DIALOG_OK,
|
|
output="",
|
|
cancel_cont=cont,
|
|
error_cont=TailCall(self.delete_certificat, certificat=certificat, cont=cont),
|
|
codes_todo=[([self.dialog.DIALOG_OK], todo, [certificat, cont])]
|
|
)
|
|
|
|
|
|
def gen_csr(self, certificat, cont):
|
|
"""Permet de générer un csr à partir de la clef privée du certificat"""
|
|
def box(text):
|
|
fp, path = tempfile.mkstemp()
|
|
os.write(fp, text)
|
|
os.close(fp)
|
|
self.dialog.textbox(filename=path, height=0, width=0,
|
|
backtitle="Appuyez sur CTRL+MAJ+V pour coller",
|
|
title="Récupération d'un certificat",
|
|
no_mouse=True, timeout=self.timeout,)
|
|
os.remove(path)
|
|
return
|
|
|
|
def todo(certificat, self_cont, cont):
|
|
if certificat['encrypted']:
|
|
if "machineCrans" in certificat.machine()["objectClass"]:
|
|
passphrase = secrets.get('privatekey_passphrase')
|
|
else:
|
|
self.dialog.msgbox("Mercie de fournir le mot de passe chiffrant la clef privée.\nIl a été choisis lors de la création de la clef.",
|
|
title="Génération d'un CSR",
|
|
width=70,
|
|
height=10, timeout=self.timeout)
|
|
passphrase = self.get_password(cont, confirm=False)
|
|
else:
|
|
passphrase = None
|
|
|
|
try:
|
|
if passphrase:
|
|
pkey = crypto.load_privatekey(crypto.FILETYPE_PEM, str(certificat['privatekey'][0]), passphrase)
|
|
else:
|
|
pkey = crypto.load_privatekey(crypto.FILETYPE_PEM, str(certificat['privatekey'][0]))
|
|
except crypto.Error as e:
|
|
if len(e.message) > 2 and len(e.message[2]) > 2 and e.message[2][2] == 'bad password read':
|
|
self.dialog.msgbox("Mauvais mot de passe", timeout=self.timeout)
|
|
raise Continue(self_cont)
|
|
else:
|
|
raise
|
|
|
|
req = createCertRequest(pkey,
|
|
digest="sha1",
|
|
subjectAltName=[str(host) for host in certificat['hostCert'][1:]],
|
|
C=u"FR",
|
|
ST=u"Ile de France",
|
|
L=u"Cachan",
|
|
O=u"Association Cachan Réseaux A Normal SUP (C.R.A.N.S)",
|
|
OU=u"Crans",
|
|
CN=unicode(certificat['hostCert'][0]),
|
|
)
|
|
csr = crypto.dump_certificate_request(crypto.FILETYPE_PEM, req)
|
|
with self.conn.search(dn=certificat.dn, scope=0, mode='rw')[0] as certificat:
|
|
certificat['csr']=unicode(csr)
|
|
certificat.validate_changes()
|
|
certificat.history_gen()
|
|
certificat.save()
|
|
self.handle_dialog(cont, box, csr)
|
|
if self.dialog.yesno("Remplacer le certificat actuel ?", timeout=self.timeout) == self.dialog.DIALOG_OK:
|
|
return self.create_certificat(certificat=certificat, cont=cont(certificat=certificat))
|
|
else:
|
|
raise Continue(cont(certificat=certificat))
|
|
|
|
self_cont = TailCall(self.gen_csr, certificat=certificat, cont=cont)
|
|
return self.handle_dialog_result(
|
|
code=self.dialog.DIALOG_OK,
|
|
output="",
|
|
cancel_cont=cont,
|
|
error_cont=self_cont,
|
|
codes_todo=[([self.dialog.DIALOG_OK], todo, [certificat, self_cont, cont])]
|
|
)
|
|
|
|
def get_certificat(self, certificat, cont, privatekey=False, csr=False):
|
|
"""Permet d'afficher le certificat courant"""
|
|
def box(text):
|
|
fp, path = tempfile.mkstemp()
|
|
os.write(fp, text)
|
|
os.close(fp)
|
|
self.dialog.textbox(filename=path, height=0, width=0,
|
|
backtitle="Appuyez sur CTRL+MAJ+V pour coller",
|
|
title="Récupération d'un certificat",
|
|
no_mouse=True, timeout=self.timeout,)
|
|
os.remove(path)
|
|
return
|
|
if privatekey:
|
|
self.handle_dialog(cont, box, unicode(certificat['privatekey'][0]))
|
|
elif csr:
|
|
self.handle_dialog(cont, box, unicode(certificat['csr'][0]))
|
|
else:
|
|
self.handle_dialog(cont, box, unicode(ssl.DER_cert_to_PEM_cert(str(certificat['certificat'][0]))))
|
|
raise Continue(cont)
|
|
|
|
def create_csr(self, cont, machine=None, certificat=None):
|
|
"""Permet d'ajouter un csr à une machine à partir du PEM du csr"""
|
|
if machine is None and certificat is None:
|
|
raise EnvironmentError("Il faut fournir au moins une machine ou un certificat")
|
|
# input multiline en utilisant un editbox
|
|
def box():
|
|
fp, path = tempfile.mkstemp()
|
|
os.close(fp)
|
|
cmd = ['--editbox', path, "0", "0"]
|
|
(code, output) = self.dialog._perform(*(cmd,),
|
|
no_mouse=True, # On désactive la sourie sinon dialog segfault si on clic
|
|
backtitle="Appuyez sur CTRL+MAJ+V pour coller",
|
|
timeout=self.timeout,
|
|
title="Création d'un certificat, entrez le PEM du certificat")
|
|
os.remove(path)
|
|
if code == self.dialog.DIALOG_OK:
|
|
return code, output
|
|
else:
|
|
return code, None
|
|
|
|
def todo(machine, certificat, pem, cont):
|
|
if certificat:
|
|
with self.conn.search(dn=certificat.dn, scope=0, mode='rw')[0] as certificat:
|
|
certificat['csr'] = unicode(pem.strip(), 'utf-8')
|
|
certificat.validate_changes()
|
|
certificat.history_gen()
|
|
certificat.save()
|
|
else:
|
|
with self.conn.newCertificat(machine.dn, {}) as certificat:
|
|
certificat['hostCert']=unicode(machine['host'][0])
|
|
certificat['csr'] = unicode(pem.strip(), 'utf-8')
|
|
certificat.create()
|
|
raise Continue(cont(certificat=certificat, machine=certificat.machine()))
|
|
|
|
(code, pem) = self.handle_dialog(cont, box)
|
|
self_cont = TailCall(self.create_csr, machine=machine, certificat=certificat, cont=cont)
|
|
return self.handle_dialog_result(
|
|
code=code,
|
|
output=pem,
|
|
cancel_cont=cont,
|
|
error_cont=self_cont,
|
|
codes_todo=[([self.dialog.DIALOG_OK], todo, [machine, certificat, pem, cont])]
|
|
)
|
|
|
|
def modif_machine_certificat(self, machine, cont, tag=None, certificat=None):
|
|
"""Permet l'édition d'un certificat d'une machine"""
|
|
self_cont = TailCall(self.modif_machine_certificat, machine=machine, cont=cont, certificat=certificat)
|
|
if certificat is None:
|
|
certificat_index = self.edit_certificat_select(machine=machine, title="Modification des certificats de %s" % machine['host'][0], cont=cont)
|
|
if certificat_index == 'new':
|
|
raise Continue(TailCall(self.create_certificat, machine=machine, cont=self_cont))
|
|
elif certificat_index == 'priv':
|
|
raise Continue(TailCall(self.create_privatekey, machine=machine, cont=self_cont))
|
|
elif certificat_index == 'csr':
|
|
raise Continue(TailCall(self.create_csr, machine=machine, cont=self_cont))
|
|
certificat = machine.certificats()[certificat_index]
|
|
a = attributs
|
|
menu_droits = {
|
|
'Hostname':[a.parent, a.nounou],
|
|
'AddPrivateKey':[a.parent, a.nounou],
|
|
'AddCertificate':[a.parent, a.nounou],
|
|
'SetCertificate':[a.parent, a.nounou],
|
|
'TLSA':[a.parent, a.nounou],
|
|
'Autre':[a.nounou],
|
|
'GetPriv':[a.parent, a.nounou],
|
|
'GetCert':[a.parent, a.nounou, a.cableur],
|
|
'GetCSR':[a.parent, a.nounou, a.cableur],
|
|
'GenCSR':[a.parent, a.nounou],
|
|
'Remarque':[a.parent, a.nounou, a.cableur],
|
|
'Supprimer':[a.parent, a.nounou],
|
|
}
|
|
menu = {
|
|
'Hostname' : {'text':"Noms d'hôte utilisant le certificat", "help":'Il doivent être inclus dans les host et hostAlias de la machine parente', "attribut":attributs.hostCert},
|
|
'AddPrivateKey' : {'text': 'Ajouter la clef privée', 'help':'La clef doit être obligatoirement chiffrée avant envoi', "callback":TailCall(self.create_privatekey, imp=True)},
|
|
'AddCertificate' : {'text': 'Ajouter un certificat X509', 'help':'', "callback":self.create_certificat},
|
|
'SetCertificate' : {'text': 'Remplacer le certificat X509', 'help':'', "callback":self.create_certificat},
|
|
'TLSA' : {'text':"Paramètres pour les champs dns TLSA", 'help':'Permet de configurer DANE pour le certificat X509', "callback":self.certificat_tlsa},
|
|
'Autre': {'text' : "Modifier les attribut booléen comme revocked", 'help':'', "callback":self.modif_certificat_boolean},
|
|
'GetPriv' : {'text' : 'Récupérer la clef privée', 'help':"Affiche la clef privée telle qu'elle est dans la base de donnée, c'est à dire chiffrée", 'callback':TailCall(self.get_certificat, privatekey=True)},
|
|
'GetCert' : {'text' : 'Récupérer le certificat', 'help':"Affiche le certificat au format PEM", 'callback':self.get_certificat},
|
|
'GetCSR' : {'text' : 'Récupérer la requête de signature de certificat', 'help':"Affiche le CSR au format PEM", 'callback':TailCall(self.get_certificat, csr=True)},
|
|
'GenCSR' : {'text' : 'Générer un CSR, puis remplacer le certificat', 'help':'Généré à partir de la clef privée. Les noms (CN et subjectAltName) sont pris à partir de Hostname (attribut hostCert)', "callback":self.gen_csr},
|
|
'Remarque' : {'text': 'Mettre des remarques', 'help':'La première apparait dans la liste des certificats', 'attribut':attributs.info},
|
|
'Supprimer' : {'text' : "Supprimer le certificat", 'help':'', "callback":self.delete_certificat},
|
|
}
|
|
if "privateKey" in certificat["objectClass"]:
|
|
menu
|
|
menu_order = ['Hostname']
|
|
if not "privateKey" in certificat['objectClass']:
|
|
menu_order.extend(['AddPrivateKey', 'SetCertificate'])
|
|
if not "x509Cert" in certificat['objectClass']:
|
|
menu_order.extend([ 'AddCertificate'])
|
|
if "x509Cert" in certificat['objectClass']:
|
|
menu_order.extend(['TLSA', 'Autre', 'GetCert'])
|
|
if certificat['csr']:
|
|
menu_order.extend(['GetCSR'])
|
|
if "privateKey" in certificat['objectClass']:
|
|
if attributs.nounou in self.conn.droits or machine.dn.startswith(self.conn.dn):
|
|
menu_order.extend(['GetPriv'])
|
|
menu_order.extend(['GenCSR'])
|
|
menu_order.extend(['Remarque', 'Supprimer'])
|
|
def box(default_item=None):
|
|
text="Certificat de %s, xid=%s :\n" % (certificat['hostCert'][0], certificat['xid'][0])
|
|
if "x509Cert" in certificat['objectClass']:
|
|
text += " * Certificat N°0x%X émis par %s, valable du %s au %s\n" % (
|
|
int(str(certificat['serialNumber'][0])),
|
|
certificat['issuerCN'][0],
|
|
time.strftime("%d/%m/%Y", time.localtime(int(certificat['start'][0]))),
|
|
time.strftime("%d/%m/%Y", time.localtime(int(certificat['end'][0])))
|
|
)
|
|
if "privateKey" in certificat['objectClass']:
|
|
text += " * Clef privée\n"
|
|
if certificat['csr']:
|
|
text += " * Requête de signature de certificat\n"
|
|
if certificat['info']:
|
|
text += str(certificat['info'][0])
|
|
return self.dialog.menu(
|
|
text,
|
|
width=0,
|
|
height=0,
|
|
menu_height=0,
|
|
item_help=1,
|
|
timeout=self.timeout,
|
|
default_item=str(default_item),
|
|
title="Modification des certificats de %s" % certificat.machine()['host'][0],
|
|
scrollbar=True,
|
|
cancel_label="Retour",
|
|
backtitle=u"Vous êtes connecté en tant que %s" % self.conn.current_login,
|
|
choices=[(key, menu[key]['text'], menu[key]['help']) for key in menu_order if self.has_right(menu_droits[key], certificat)])
|
|
|
|
def todo(tag, menu, certificat, self_cont):
|
|
if not tag in menu_order:
|
|
raise Continue(self_cont(certificat=certificat))
|
|
else:
|
|
if 'callback' in menu[tag]:
|
|
raise Continue(TailCall(menu[tag]['callback'], certificat=certificat, cont=self_cont(certificat=certificat, tag=tag)))
|
|
elif 'attribut' in menu[tag]:
|
|
raise Continue(TailCall(self.modif_certificat_attributs, certificat=certificat, cont=self_cont(certificat=certificat, tag=tag), attr=menu[tag]['attribut'].ldap_name))
|
|
else:
|
|
raise EnvironmentError("Il n'y a ni champ 'attribut' ni 'callback' pour le tag %s" % tag)
|
|
cancel_cont = cont(machine=machine) if certificat is None else self_cont(machine=certificat.machine(), certificat=None, tag=tag)
|
|
(code, tag) = self.handle_dialog(cancel_cont, box, tag)
|
|
return self.handle_dialog_result(
|
|
code=code,
|
|
output=tag,
|
|
cancel_cont=cancel_cont,
|
|
error_cont=self_cont,
|
|
codes_todo=[([self.dialog.DIALOG_OK], todo, [tag, menu, certificat, self_cont])]
|
|
)
|
|
|
|
@tailcaller
|
|
def edit_certificat_select(self, machine, title, cont):
|
|
"""Permet de choisir un certificat existant ou nouveau d'une machine"""
|
|
a = attributs
|
|
menu_droits = {
|
|
'new':[a.parent, a.nounou],
|
|
'priv':[a.parent, a.nounou],
|
|
'csr':[a.parent, a.nounou],
|
|
}
|
|
menu = {
|
|
'new':'Ajouter un nouveau certificat',
|
|
'priv':'Générer une nouvelle clef privée',
|
|
'csr':'Ajouter une nouvelle requête de certificat',
|
|
}
|
|
menu_order = ['new', 'priv', 'csr']
|
|
menu_special = ['new', 'priv', 'csr']
|
|
def box(default_item=None):
|
|
index=0
|
|
choices = []
|
|
for key in menu_order:
|
|
if self.has_right(menu_droits[key], machine):
|
|
choices.append((key, menu[key]))
|
|
for cert in machine.certificats():
|
|
if cert['info']:
|
|
item = str(cert['info'][0])
|
|
elif "x509Cert" in cert['objectClass']:
|
|
item = "Emit par %s pour %s du %s au %s" % (cert['issuerCN'][0], ', '.join(str(cn) for cn in cert['hostCert']), time.strftime("%d/%m/%Y", time.localtime(int(cert['start'][0]))), time.strftime("%d/%m/%Y", time.localtime(int(cert['end'][0]))))
|
|
elif "privateKey" in cert['objectClass']:
|
|
item = "Clef privée de %s, xid=%s" % (cert['hostCert'][0], cert['xid'][0])
|
|
elif cert['csr']:
|
|
item = "Requête de signature de certificat pour %s, xid=%s" % (cert['hostCert'][0], cert['xid'][0])
|
|
choices.append((str(index), item))
|
|
index+=1
|
|
return self.dialog.menu(
|
|
"Modifier ou ajouter un certificat ?",
|
|
width=0,
|
|
height=0,
|
|
timeout=self.timeout,
|
|
menu_height=0,
|
|
item_help=0,
|
|
title="Modification des certificats de %s" % machine['host'][0],
|
|
scrollbar=True,
|
|
default_item=str(default_item),
|
|
cancel_label="Retour",
|
|
backtitle=u"Vous êtes connecté en tant que %s" % self.conn.current_login,
|
|
choices=choices)
|
|
|
|
def todo(tag):
|
|
if tag in ['new', 'priv', 'csr']:
|
|
return tag
|
|
else:
|
|
return int(tag)
|
|
|
|
(code, tag) = self.handle_dialog(cont, box)
|
|
retry_cont = TailCall(self.edit_certificat_select, machine=machine, title=title, cont=cont)
|
|
return self.handle_dialog_result(
|
|
code=code,
|
|
output=tag,
|
|
cancel_cont=cont(machine=machine),
|
|
error_cont=retry_cont,
|
|
codes_todo=[([self.dialog.DIALOG_OK], todo, [tag])]
|
|
)
|
|
|
|
|
|
def modif_certificat_boolean(self, certificat, cont):
|
|
"""Juste un raccourci vers edit_boolean_attributs spécifique aux certificats"""
|
|
a = attributs
|
|
attribs = [a.revocked]
|
|
return self.edit_boolean_attributs(
|
|
obj=certificat,
|
|
attribs=attribs,
|
|
title="Édition des attributs booléen d'un certificat de la machine %s" % certificat.machine()['host'][0],
|
|
update_obj='certificat',
|
|
cont=cont)
|
|
|
|
def modif_certificat_attributs(self, certificat, attr, cont):
|
|
"""Juste un raccourci vers edit_attributs spécifique aux certificats"""
|
|
return self.edit_attributs(obj=certificat, update_obj='certificat', attr=attr, title="Modification d'un certificat de la machine %s" % certificat.machine()['host'][0], cont=cont)
|
|
|