Implémentation custom du parseur de gpg --list-keys parce que le package python-gnupg fait pas ce qu'il faut.
This commit is contained in:
parent
b3c0aac0f3
commit
8e3a76919c
2 changed files with 167 additions and 39 deletions
205
client.py
205
client.py
|
@ -10,6 +10,7 @@ Authors : Daniel Stan <daniel.stan@crans.org>
|
|||
|
||||
from __future__ import print_function
|
||||
|
||||
# Import builtins
|
||||
import sys
|
||||
import subprocess
|
||||
import json
|
||||
|
@ -20,12 +21,10 @@ import argparse
|
|||
import re
|
||||
import random
|
||||
import string
|
||||
import time
|
||||
import datetime
|
||||
try:
|
||||
import gnupg #disponible seulement sous wheezy
|
||||
except ImportError:
|
||||
if sys.stderr.isatty() and not any([opt in sys.argv for opt in ["-q", "--quiet"]]):
|
||||
sys.stderr.write(u"Package python-gnupg introuvable, vous ne pourrez pas vérifiez les clés.\n".encode("utf-8"))
|
||||
|
||||
# Import de la config
|
||||
try:
|
||||
# Oui, le nom de la commande est dans la config, mais on n'a pas encore accès à la config
|
||||
bootstrap_cmd_name = os.path.split(sys.argv[0])[1]
|
||||
|
@ -40,27 +39,7 @@ except ImportError:
|
|||
PASS = re.compile('[\t ]*pass(?:word)?[\t ]*:[\t ]*(.*)\r?\n?$',
|
||||
flags=re.IGNORECASE)
|
||||
|
||||
## GPG Definitions
|
||||
#: path gu binaire gpg
|
||||
GPG = '/usr/bin/gpg'
|
||||
#: paramètres à fournir à gpg en fonction de l'action désirée
|
||||
GPG_ARGS = {
|
||||
'decrypt' : ['-d'],
|
||||
'encrypt' : ['--armor', '-es'],
|
||||
'fingerprint' : ['--fingerprint'],
|
||||
'receive-keys' : ['--recv-keys'],
|
||||
}
|
||||
#: map lettre de trustlevel -> (signification, faut-il faire confiance à la clé)
|
||||
GPG_TRUSTLEVELS = {
|
||||
u"-" : (u"inconnue", False),
|
||||
u"n" : (u"nulle", False),
|
||||
u"m" : (u"marginale", True),
|
||||
u"f" : (u"entière", True),
|
||||
u"u" : (u"ultime", True),
|
||||
u"r" : (u"révoquée", False),
|
||||
u"e" : (u"expirée", False),
|
||||
u"q" : (u"/données insuffisantes/", False),
|
||||
}
|
||||
## Conf qu'il faudrait éliminer en passant ``parsed`` aux fonctions
|
||||
#: Mode verbeux
|
||||
VERB = False
|
||||
#: Par défaut, place-t-on le mdp dans le presse-papier ?
|
||||
|
@ -72,14 +51,41 @@ NEWROLES = None
|
|||
#: Serveur à interroger (peuplée à l'exécution)
|
||||
SERVER = None
|
||||
|
||||
def gpg(command, args = None):
|
||||
## GPG Definitions
|
||||
#: Path to gpg binary
|
||||
GPG = '/usr/bin/gpg'
|
||||
|
||||
#: Paramètres à fournir à gpg en fonction de l'action désirée
|
||||
GPG_ARGS = {
|
||||
'decrypt' : ['-d'],
|
||||
'encrypt' : ['--armor', '-es'],
|
||||
'receive-keys' : ['--recv-keys'],
|
||||
'list-keys' : ['--list-keys', '--with-colons', '--fixed-list-mode',
|
||||
'--with-fingerprint', '--with-fingerprint'], # Ce n'est pas une erreur. Il faut 2 --with-fingerprint pour avoir les fingerprints des subkeys.
|
||||
}
|
||||
|
||||
#: Mapping (lettre de trustlevel) -> (signification, faut-il faire confiance à la clé)
|
||||
GPG_TRUSTLEVELS = {
|
||||
u"-" : (u"inconnue (pas de valeur assignée)", False),
|
||||
u"o" : (u"inconnue (nouvelle clé)", False),
|
||||
u"i" : (u"invalide (self-signature manquante ?)", False),
|
||||
u"n" : (u"nulle", False),
|
||||
u"m" : (u"marginale", True),
|
||||
u"f" : (u"entière", True),
|
||||
u"u" : (u"ultime", True),
|
||||
u"r" : (u"révoquée", False),
|
||||
u"e" : (u"expirée", False),
|
||||
u"q" : (u"non définie", False),
|
||||
}
|
||||
|
||||
def gpg(command, args=None, verbose=False):
|
||||
"""Lance gpg pour la commande donnée avec les arguments
|
||||
donnés. Renvoie son entrée standard et sa sortie standard."""
|
||||
donnés. Renvoie son entrée standard et sa sortie standard."""
|
||||
full_command = [GPG]
|
||||
full_command.extend(GPG_ARGS[command])
|
||||
if args:
|
||||
full_command.extend(args)
|
||||
if VERB:
|
||||
if verbose:
|
||||
stderr = sys.stderr
|
||||
else:
|
||||
stderr = subprocess.PIPE
|
||||
|
@ -89,10 +95,134 @@ def gpg(command, args = None):
|
|||
stdout = subprocess.PIPE,
|
||||
stderr = stderr,
|
||||
close_fds = True)
|
||||
if not VERB:
|
||||
if not verbose:
|
||||
proc.stderr.close()
|
||||
return proc.stdin, proc.stdout
|
||||
|
||||
def _parse_timestamp(string, canbenone=False):
|
||||
"""Interprète ``string`` comme un timestamp depuis l'Epoch."""
|
||||
if string == u'' and canbenone:
|
||||
return None
|
||||
return datetime.datetime(*time.localtime(int(string))[:7])
|
||||
|
||||
def _parse_pub(data):
|
||||
"""Interprète une ligne ``pub:``"""
|
||||
d = {
|
||||
u'trustletter' : data[1],
|
||||
u'length' : int(data[2]),
|
||||
u'algorithm' : int(data[3]),
|
||||
u'longid' : data[4],
|
||||
u'signdate' : _parse_timestamp(data[5]),
|
||||
u'expiredate' : _parse_timestamp(data[6], canbenone=True),
|
||||
u'ownertrustletter' : data[8],
|
||||
u'capabilities' : data[11],
|
||||
}
|
||||
return d
|
||||
|
||||
def _parse_uid(data):
|
||||
"""Interprète une ligne ``uid:``"""
|
||||
d = {
|
||||
u'trustletter' : data[1],
|
||||
u'signdate' : _parse_timestamp(data[5], canbenone=True),
|
||||
u'hash' : data[7],
|
||||
u'uid' : data[9],
|
||||
}
|
||||
return d
|
||||
|
||||
def _parse_fpr(data):
|
||||
"""Interprète une ligne ``fpr:``"""
|
||||
d = {
|
||||
u'fpr' : data[9],
|
||||
}
|
||||
return d
|
||||
|
||||
def _parse_sub(data):
|
||||
"""Interprète une ligne ``sub:``"""
|
||||
d = {
|
||||
u'trustletter' : data[1],
|
||||
u'length' : int(data[2]),
|
||||
u'algorithm' : int(data[3]),
|
||||
u'longid' : data[4],
|
||||
u'signdate' : _parse_timestamp(data[5]),
|
||||
u'expiredate' : _parse_timestamp(data[6], canbenone=True),
|
||||
u'capabilities' : data[11],
|
||||
}
|
||||
return d
|
||||
|
||||
#: Functions to parse the recognized fields
|
||||
GPG_PARSERS = {
|
||||
u'pub' : _parse_pub,
|
||||
u'uid' : _parse_uid,
|
||||
u'fpr' : _parse_fpr,
|
||||
u'sub' : _parse_sub,
|
||||
}
|
||||
|
||||
def _gpg_printdebug(d):
|
||||
print("current_pub : %r" % d.get("current_pub", None))
|
||||
print("current_sub : %r" % d.get("current_sub", None))
|
||||
|
||||
def parse_keys(gpgout, debug=False):
|
||||
"""Parse l'output d'un listing de clés gpg."""
|
||||
ring = {}
|
||||
init_value = u"initialize" # Valeur utilisée pour dire "cet objet n'a pas encore été rencontré pendant le parsing"
|
||||
current_pub = init_value
|
||||
current_sub = init_value
|
||||
for line in iter(gpgout.readline, ''):
|
||||
# La doc dit que l'output est en UTF-8 «regardless of any --display-charset setting»
|
||||
line = line.decode("utf-8")
|
||||
line = line.split(":")
|
||||
field = line[0]
|
||||
if field in GPG_PARSERS.keys():
|
||||
if debug:
|
||||
print("\nbegin loop. met %s :" % (field))
|
||||
_gpg_printdebug(locals())
|
||||
try:
|
||||
content = GPG_PARSERS[field](line)
|
||||
except:
|
||||
print("*** FAILED ***")
|
||||
print(line)
|
||||
raise
|
||||
if field == u"pub":
|
||||
# Nouvelle clé
|
||||
# On sauvegarde d'abord le dernier sub (si il y en a un) dans son pub parent
|
||||
if current_sub != init_value:
|
||||
current_pub["subkeys"].append(current_sub)
|
||||
# Ensuite on sauve le pub précédent (si il y en a un) dans le ring
|
||||
if current_pub != init_value:
|
||||
ring[current_pub[u"fpr"]] = current_pub
|
||||
# On place le nouveau comme pub courant
|
||||
current_pub = content
|
||||
# Par défaut, il n'a ni subkeys, ni uids
|
||||
current_pub[u"subkeys"] = []
|
||||
current_pub[u"uids"] = []
|
||||
# On oublié l'éventuel dernier sub rencontré
|
||||
current_sub = init_value
|
||||
elif field == u"fpr":
|
||||
if current_sub != init_value:
|
||||
# On a lu un sub depuis le dernier pub, donc le fingerprint est celui du dernier sub rencontré
|
||||
current_sub[u"fpr"] = content[u"fpr"]
|
||||
else:
|
||||
# Alors c'est le fingerprint du pub
|
||||
current_pub[u"fpr"] = content[u"fpr"]
|
||||
elif field == u"uid":
|
||||
current_pub[u"uids"].append(content)
|
||||
elif field == u"sub":
|
||||
# Nouvelle sous-clé
|
||||
# D'abord on sauvegarde la précédente (si il y en a une) dans son pub parent
|
||||
if current_sub != init_value:
|
||||
current_pub[u"subkeys"].append(current_sub)
|
||||
# On place la nouvelle comme sub courant
|
||||
current_sub = content
|
||||
if debug:
|
||||
_gpg_printdebug(locals())
|
||||
print("parsed object : %r" % content)
|
||||
# À la fin, il faut sauvegarder les derniers (sub, pub) rencontrés,
|
||||
# parce que leur sauvegarde n'a pas encore été déclenchée
|
||||
if current_sub != init_value:
|
||||
current_pub["subkeys"].append(current_sub)
|
||||
if current_pub != init_value:
|
||||
ring[current_pub[u"fpr"]] = current_pub
|
||||
return ring
|
||||
|
||||
class simple_memoize(object):
|
||||
""" Memoization/Lazy """
|
||||
|
@ -190,22 +320,21 @@ def check_keys():
|
|||
if VERB:
|
||||
print("M : l'uid correspond au mail du fingerprint\nC : confiance OK (inclu la vérification de non expiration).\n")
|
||||
keys = all_keys()
|
||||
gpg = gnupg.GPG()
|
||||
localkeys = gpg.list_keys()
|
||||
_, gpgout = gpg('list-keys')
|
||||
localring = parse_keys(gpgout)
|
||||
failed = False
|
||||
for (mail, fpr) in keys.values():
|
||||
if fpr:
|
||||
if VERB:
|
||||
print((u"Checking %s… " % (mail)).encode("utf-8"), end="")
|
||||
corresponds = [key for key in localkeys if key["fingerprint"] == fpr]
|
||||
key = localring.get(fpr, None)
|
||||
# On vérifie qu'on possède la clé…
|
||||
if len(corresponds) == 1:
|
||||
correspond = corresponds[0]
|
||||
if not key is None:
|
||||
# …qu'elle correspond au mail…
|
||||
if mail.lower() in sum([re.findall("<(.*)>", uid.lower()) for uid in correspond["uids"]], []):
|
||||
if any([u"<%s>" % (mail,) in u["uid"] for u in key["uids"]]):
|
||||
if VERB:
|
||||
print("M ", end="")
|
||||
meaning, trustvalue = GPG_TRUSTLEVELS[correspond["trust"]]
|
||||
meaning, trustvalue = GPG_TRUSTLEVELS[key["trustletter"]]
|
||||
# … et qu'on lui fait confiance
|
||||
if not trustvalue:
|
||||
print((u"--> Fail on %s:%s\nLa confiance en la clé est : %s" % (fpr, mail, meaning,)).encode("utf-8"))
|
||||
|
@ -504,7 +633,7 @@ def recrypt_files():
|
|||
results = put_files(to_put)
|
||||
# On affiche les messages de retour
|
||||
for i in range(len(results)):
|
||||
print (u"%s : %s" % (to_put[i]['filename'], results[i][1]))
|
||||
print(u"%s : %s" % (to_put[i]['filename'], results[i][1]))
|
||||
else:
|
||||
print(u"Aucun fichier n'a besoin d'être rechiffré".encode("utf-8"))
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue