scripts/gestion/dialog/CPS.py
2014-12-02 20:41:00 +01:00

420 lines
17 KiB
Python

#!/bin/bash /usr/scripts/python.sh
# -*- coding: utf-8 -*-
u"""
Copyright (C) Valentin Samir
Licence : GPLv3
"""
import os
import sys
import time
import ldap
import signal
import inspect
import traceback
if '/usr/scripts' not in sys.path:
sys.path.append('/usr/scripts')
from pythondialog import Dialog as PythonDialog
from pythondialog import DialogTerminatedBySignal, PythonDialogErrorBeforeExecInChildProcess
from pythondialog import error as DialogError
from gestion.affich_tools import get_screen_size, coul
debug_enable = False
debugf = None
def mydebug(txt):
"""Petit fonction pour écrire des messages de débug dans /tmp/gest_crans_lc.log"""
global debugf, debug_enable
if debug_enable:
if debugf is None:
debugf = open('/tmp/gest_crans_lc.log', 'w')
if isinstance(txt, list):
for v in txt:
mydebug(' ' + str(v))
else:
debugf.write(str(txt)+'\n')
debugf.flush()
os.fsync(debugf)
# Implémentation "à la main" de la tail récursion en python
# voir http://kylem.net/programming/tailcall.html
# je trouve ça assez sioux
# En plus, ça nous permet de gérer plus facilement le
# code en CPS (Continuation Passing Style)
class TailCaller(object) :
"""
Classe permetant, en décorant des fonctions avec, d'avoir de la tail récursion
faite "à la main" en python (voir http://kylem.net/programming/tailcall.html)
Parameters
----------
f : function
Fonction décoré
"""
other_callers = {}
def __init__(self, f) :
self.f = f
self.func_name = f.func_name
TailCaller.other_callers[id(self)]=f.func_name
def __del__(self):
del(TailCaller.other_callers[id(self)])
def __call__(self, *args, **kwargs) :
mydebug("***%s calling" % self.func_name)
stacklvl = len(inspect.stack())
try:
ret = self.f(*args, **kwargs)
except Continue as c:
ret = c.tailCall
while isinstance(ret, TailCall):
# Si la continuation a été créer par un TailCaller précédent, on propage
# Cela permet d'assurer qu'un TailCaller ne s'occupe que des TailCall
# Crée après lui.
if stacklvl>=ret.stacklvl:
mydebug("***%s terminate" % self.func_name)
raise Continue(ret)
mydebug("***%s doing %s" % (self.func_name, ret))
try:
ret = ret.handle()
except Continue as c:
ret = c.tailCall
mydebug("***%s returning %s" % (self.func_name, ret))
return ret
def tailcaller(f):
"""
Décorateur retardant la décoration par TailCaller d'une fonction
À utiliser sur les fonctions faisant de la tail récursion
et devant retourner une valeur. On l'utilise de toute
façon sur la fonction d'entrée dans l'application
Une fonction de devrait pas instancier de TailCall sans
être décoré par ce décorteur de façon générale
"""
f.tailCaller = True
return f
class Continue(Exception):
"""Exception pour envoyer des TailCall en les raisant"""
def __init__(self, tailCall):
self.tailCall = tailCall
class TailCall(object) :
"""
Appel tail récursif à une fonction et ses argument
On peut voir un TailCall comme le fait de retarder
l'appel à une fonction (avec ses listes d'arguements)
à un moment futur. La fonction sera appelée dans les
cas suivant :
* Le TailCall est retourné par une fonction qui est un TailCaller
* L'exception Continue(TailCall(...)) est levée et traverse
une fonction qui est un TailCaller
"""
def __init__(self, call, *args, **kwargs) :
self.stacklvl = len(inspect.stack())
if isinstance(call, TailCall):
call.kwargs.update(**kwargs)
kwargs = call.kwargs
args = call.args + args
call = call.call
self.call = call
self.args = args
self.kwargs = kwargs
self.check(self.args, self.kwargs)
def check(self, args, kwargs):
call = self.call
if isinstance(call, TailCaller):
call = call.f
targs = inspect.getargspec(call)
if targs.varargs is not None:
if len(args) + len(kwargs) > len(targs.args):
raise TypeError("%s() takes at most %s arguments (%s given)" % (call.func_name, len(targs.args), len(args) + len(kwargs)))
for key in kwargs:
if key not in targs.args:
raise TypeError("%s() got an unexpected keyword argument '%s'" % (call.func_name, key))
def __str__(self):
return "TailCall<%s(%s%s%s)>" % (
self.call.func_name,
', '.join(repr(a) for a in self.args),
', ' if self.args and self.kwargs else '',
', '.join("%s=%s" % (repr(k),repr(v)) for (k,v) in self.kwargs.items())
)
def copy(self):
'''
Renvois une copie de l'objet courant
attention les elements et args ou kwargs sont juste linké
ça n'est pas gennant dans la mesure où ils ne sont normalement pas
éditer mais remplacé par d'autres éléments
'''
result = TailCall(self.call, *list(self.args), **dict(self.kwargs))
result.stacklvl = self.stacklvl
return result
def __call__(self, *args, **kwargs):
tmpkwargs={}
tmpkwargs.update(self.kwargs)
tmpkwargs.update(kwargs)
self.check(self.args + args, tmpkwargs)
self.kwargs.update(kwargs)
self.args = self.args + args
return self
def handle(self) :
"""
Exécute la fonction call sur sa liste d'argument.
on déréférence les TailCaller le plus possible pour réduire
la taille de la stack
"""
caller = None
call = self.call
while isinstance(call, TailCaller) :
caller = call
call = self.call.f
return call(*self.args, **self.kwargs)
def unicode_of_Error(x):
"""Formatte des exception"""
return u"\n".join(unicode(i, 'utf-8') if type(i) == str
else repr(i) for i in x.args)
def raiseKeyboardInterrupt(x, y):
"""fonction utilisée pour réactiver les Ctrl-C"""
raise KeyboardInterrupt()
class Dialog(object):
"""Interface de gestion des machines et des adhérents du crans, version lc_ldap"""
def __getattribute__(self, attr):
ret = super(Dialog, self).__getattribute__(attr)
# Petit hack pour que ça soit les methodes de l'objet instancié qui soient
# décorée par TailCaller et pas les méthodes statiques de la classe Dialog
if getattr(ret, 'tailCaller', False) and not isinstance(ret, TailCaller):
ret = TailCaller(ret)
ret.__doc__ = ret.f.__doc__
setattr(self, attr, ret)
return ret
def __init__(self, debug_enable=False):
signal.signal(signal.SIGINT, signal.SIG_IGN)
self.debug_enable = debug_enable
# On met un timeout à 10min d'innactivité sur dialog
self.timeout = 600
self.error_to_raise = (Continue, DialogError, ldap.SERVER_DOWN)
_dialog = None
@property
def dialog(self):
"""
Renvois l'objet dialog.
"""
if self._dialog is None:
self._dialog = PythonDialog()
self.dialog_last_access = time.time()
return self._dialog
def nyan(self, cont, *args, **kwargs):
"""
Nyan nyan nyan nyan nyan nyan nyan nyan nyan nyan nyan nyan nyan nyan nyan nyan
"""
(lines, cols) = get_screen_size()
print "\033[48;5;17m"
print " "*(lines * cols)
cols = int(min(cols/2, 65))
lines = int(lines) -1
cmd = "/usr/bin/nyancat -W %s -H %s" % (cols, lines)
os.system(cmd)
raise Continue(cont)
@tailcaller
def handle_dialog(self, cancel_cont, box, *args):
"""
Gère les fonctions appelant une fênetre dialog :
gestion de l'appuie sur Ctrl-C et rattrapage des exception / segfault de dialog.
cancel_cont représente l'endroit où retourner si Ctrl-C
"""
ctrlC=False
ret=None
try:
signal.signal(signal.SIGINT, raiseKeyboardInterrupt) # Ctrl-C
ret = box(*args)
signal.signal(signal.SIGINT, signal.SIG_IGN) # Pas de Ctrl-C
except KeyboardInterrupt:
signal.signal(signal.SIGINT, signal.SIG_IGN) # Pas de Ctrl-C
raise Continue(cancel_cont)
except DialogTerminatedBySignal as e:
signal.signal(signal.SIGINT, signal.SIG_IGN) # Pas de Ctrl-C
if e[1] == 11:
self.dialog.msgbox(
"La fenêtre dialog à été fermée par une erreur de segmentation",
timeout=self.timeout, title="Erreur rencontrée", width=73, height=10
)
raise Continue(cancel_cont)
else:
raise
except PythonDialogErrorBeforeExecInChildProcess:
self.dialog.msgbox(
"La fenêtre dialog à été fermée et a retourné le code 127 : \n" +\
" * peut être que dialog n'a put être éxécuté\n" +\
" * peut être qu'il n'y a plus de mémoire disponible\n" +\
" * peut être que le nombre max de descripteur de fichier a été atteins\n" +\
"ou peut être que dialog fait juste du caca.\n" +\
"Quitter l'interface ?",
title="Erreur rencontrée", width=73, height=12, defaultno=True,
timeout=self.timeout
)
raise Continue(cancel_cont)
finally:
if ret:
return ret
else:
EnvironmentError("Pas de ret ?!? c'est pas possible ")
@tailcaller
def handle_dialog_result(self, code, output, cancel_cont, error_cont, codes_todo=[]):
"""
Gère les fonctions traitant les résultat d'appels à dialog.
s'occupe de gérer les exceptions, Ctrl-C, propagation de certaine exceptions, l'appuis sur annuler.
Le code à exécuté lui ai passé via la liste codes_todo, qui doit contenir une liste de triple :
(code de retour dialog, fonction à exécuter, liste des arguements de la fonction)
la fonction est appelée sur ses arguements si le code retourné par dialog correspond.
codes_todo = [(code, todo, todo_args)]
"""
# Si on a appuyé sur annulé ou ESC, on s'en va via la continuation donnée en argument
if code in (self.dialog.DIALOG_CANCEL, self.dialog.DIALOG_ESC):
raise Continue(cancel_cont)
# Sinon, c'est OK
else:
for (codes, todo, todo_args) in codes_todo:
if code in codes:
try:
signal.signal(signal.SIGINT, raiseKeyboardInterrupt) # Ctrl-C
# On effectue ce qu'il y a a faire dans todo
ret = todo(*todo_args)
signal.signal(signal.SIGINT, signal.SIG_IGN) # Pas de Ctrl-C
return ret
# On propage les Continue
except self.error_to_raise:
signal.signal(signal.SIGINT, signal.SIG_IGN) # Pas de Ctrl-C
raise
# En cas d'une autre erreur, on l'affiche et on retourne au menu d'édition
except (Exception, ldap.OBJECT_CLASS_VIOLATION) as e:
signal.signal(signal.SIGINT, signal.SIG_IGN) # Pas de Ctrl-C
self.dialog.msgbox(traceback.format_exc() if self.debug_enable else "%s" % unicode_of_Error(e), timeout=self.timeout,
title="Erreur rencontrée", width=73, height=10)
raise Continue(error_cont)
except KeyboardInterrupt:
signal.signal(signal.SIGINT, signal.SIG_IGN) # Pas de Ctrl-C
raise Continue(cancel_cont)
# En cas de code de retour dialog non attendu, on prévient et on retourne au menu d'édition
self.dialog.msgbox("Le code de retour dialog est %s, c'est étrange" % code,
timeout=self.timeout, title="Erreur rencontrée", width=73, height=10)
raise Continue(error_cont)
@tailcaller
def get_comment(self, title, text, cont, init='', force=False):
"""
Fait entrer à l'utilisateur un commentaire et le retourne.
Si force est à True, on oblige le commentaire à être non vide
"""
(code, output) = self.dialog.inputbox(text=text, title=title, timeout=self.timeout, init=init)
retry_cont = TailCall(self.get_comment, title=title, text=text, cont=cont, force=force)
def todo(output, force, title, retry_cont):
if force and not output:
self.dialog.msgbox("Entrée vide, merci d'indiquer quelque chose", timeout=self.timeout, title=title)
raise Continue(retry_cont)
else:
return unicode(output, 'utf-8')
return self.handle_dialog_result(
code=code,
output=output,
cancel_cont=cont,
error_cont=retry_cont,
codes_todo=[([self.dialog.DIALOG_OK], todo, [output, force, title, retry_cont])]
)
@tailcaller
def get_password(self, cont, confirm=True, title="Choix d'un mot de passe", **kwargs):
"""
Affiche une série d'inpuxbox pour faire entrer un mot de passe puis le retourne,
si confirm=True, il y a une confirmation du mot de passe de demandée
"""
def todo(self_cont, cont):
(code, pass1) = self.dialog.passwordbox("Entrez un mot de passe", title=title, timeout=self.timeout, **kwargs)
if code != self.dialog.DIALOG_OK:
raise Continue(cont)
elif not pass1:
raise ValueError("Mot de pass vide !")
if confirm:
(code, pass2) = self.dialog.passwordbox("Comfirmer le mot de passe", timeout=self.timeout, title=title, **kwargs)
if code != self.dialog.DIALOG_OK:
raise Continue(self_cont)
if pass1 != pass2:
raise ValueError("Les deux mots de passe ne concordent pas")
return pass1
self_cont = TailCall(self.get_password, cont=cont)
return self.handle_dialog_result(
code=self.dialog.DIALOG_OK,
output="",
cancel_cont=cont,
error_cont=self_cont,
codes_todo=[([self.dialog.DIALOG_OK], todo, [self_cont, cont])]
)
@tailcaller
def get_timestamp(self, title, text, cont, hour=-1, minute=-1, second=-1, day=0, month=0, year=0):
"""Fait choisir une date et une heure et retourne le tuple (year, month, day, hour, minute, second)"""
retry_cont = TailCall(self.get_timestamp, title=title, text=text, cont=cont, hour=hour,
minute=minute, second=second, day=day, month=month, year=year)
def get_date(day, month, year):
(code, output) = self.dialog.calendar(text, day=day, month=month, year=year,
timeout=self.timeout, title=title)
if code in (self.dialog.DIALOG_CANCEL, self.dialog.DIALOG_ESC):
raise Continue(cont)
elif output:
(day, month, year) = output
return (year, month, day)
else:
raise EnvironmentError("Pourquoi je n'ai pas de date ?")
def get_time(hour, minute, second, day, month, year):
(code, output) = self.dialog.timebox(text, timeout=self.timeout, hour=hour,
minute=minute, second=second)
if code in (self.dialog.DIALOG_CANCEL, self.dialog.DIALOG_ESC):
raise Continue(retry_cont(day=day, month=month, year=year))
elif output:
(hour, minute, second) = output
return (hour, minute, second)
else:
raise EnvironmentError("Pourquoi je n'ai pas d'horaire ?")
(year, month, day) = get_date(day, month, year)
(hour, minute, second) = get_time(hour, minute, second, day, month, year)
return (year, month, day) + (hour, minute, second)
def confirm(self, text, title, defaultno=False, width=0, height=0):
"""wrapper autour du widget yesno"""
return self.dialog.yesno(
text,
no_collapse=True,
colors=True,
no_mouse=True,
timeout=self.timeout,
title=title,
defaultno=defaultno,
width=width, height=height,
backtitle="Appuyez sur MAJ pour selectionner du texte"
) == self.dialog.DIALOG_OK