#!/bin/bash /usr/scripts/python.sh # -*- coding: utf-8 -*- u""" Copyright (C) Valentin Samir Licence : GPLv3 """ import os import sys import time import ldap import signal import inspect import traceback if '/usr/scripts' not in sys.path: sys.path.append('/usr/scripts') from pythondialog import Dialog as PythonDialog from pythondialog import DialogTerminatedBySignal, PythonDialogErrorBeforeExecInChildProcess from pythondialog import error as DialogError from gestion.affich_tools import get_screen_size, coul debug_enable = False debugf = None def mydebug(txt): """Petit fonction pour écrire des messages de débug dans /tmp/gest_crans_lc.log""" global debugf, debug_enable if debug_enable: if debugf is None: debugf = open('/tmp/gest_crans_lc.log', 'w') if isinstance(txt, list): for v in txt: mydebug(' ' + str(v)) else: debugf.write(str(txt)+'\n') debugf.flush() os.fsync(debugf) # Implémentation "à la main" de la tail récursion en python # voir http://kylem.net/programming/tailcall.html # je trouve ça assez sioux # En plus, ça nous permet de gérer plus facilement le # code en CPS (Continuation Passing Style) class TailCaller(object) : """ Classe permetant, en décorant des fonctions avec, d'avoir de la tail récursion faite "à la main" en python (voir http://kylem.net/programming/tailcall.html) Parameters ---------- f : function Fonction décoré """ other_callers = {} def __init__(self, f) : self.f = f self.func_name = f.func_name TailCaller.other_callers[id(self)]=f.func_name def __del__(self): del(TailCaller.other_callers[id(self)]) def __call__(self, *args, **kwargs) : mydebug("***%s calling" % self.func_name) stacklvl = len(inspect.stack()) try: ret = self.f(*args, **kwargs) except Continue as c: ret = c.tailCall while isinstance(ret, TailCall): # Si la continuation a été créer par un TailCaller précédent, on propage # Cela permet d'assurer qu'un TailCaller ne s'occupe que des TailCall # Crée après lui. if stacklvl>=ret.stacklvl: mydebug("***%s terminate" % self.func_name) raise Continue(ret) mydebug("***%s doing %s" % (self.func_name, ret)) try: ret = ret.handle() except Continue as c: ret = c.tailCall mydebug("***%s returning %s" % (self.func_name, ret)) return ret def tailcaller(f): """ Décorateur retardant la décoration par TailCaller d'une fonction À utiliser sur les fonctions faisant de la tail récursion et devant retourner une valeur. On l'utilise de toute façon sur la fonction d'entrée dans l'application Une fonction de devrait pas instancier de TailCall sans être décoré par ce décorteur de façon générale """ f.tailCaller = True return f class Continue(Exception): """Exception pour envoyer des TailCall en les raisant""" def __init__(self, tailCall): self.tailCall = tailCall class TailCall(object) : """ Appel tail récursif à une fonction et ses argument On peut voir un TailCall comme le fait de retarder l'appel à une fonction (avec ses listes d'arguements) à un moment futur. La fonction sera appelée dans les cas suivant : * Le TailCall est retourné par une fonction qui est un TailCaller * L'exception Continue(TailCall(...)) est levée et traverse une fonction qui est un TailCaller """ def __init__(self, call, *args, **kwargs) : self.stacklvl = len(inspect.stack()) if isinstance(call, TailCall): call.kwargs.update(**kwargs) kwargs = call.kwargs args = call.args + args call = call.call self.call = call self.args = args self.kwargs = kwargs self.check(self.args, self.kwargs) def check(self, args, kwargs): call = self.call if isinstance(call, TailCaller): call = call.f targs = inspect.getargspec(call) if targs.varargs is not None: if len(args) + len(kwargs) > len(targs.args): raise TypeError("%s() takes at most %s arguments (%s given)" % (call.func_name, len(targs.args), len(args) + len(kwargs))) for key in kwargs: if key not in targs.args: raise TypeError("%s() got an unexpected keyword argument '%s'" % (call.func_name, key)) def __str__(self): return "TailCall<%s(%s%s%s)>" % ( self.call.func_name, ', '.join(repr(a) for a in self.args), ', ' if self.args and self.kwargs else '', ', '.join("%s=%s" % (repr(k),repr(v)) for (k,v) in self.kwargs.items()) ) def copy(self): ''' Renvois une copie de l'objet courant attention les elements et args ou kwargs sont juste linké ça n'est pas gennant dans la mesure où ils ne sont normalement pas éditer mais remplacé par d'autres éléments ''' result = TailCall(self.call, *list(self.args), **dict(self.kwargs)) result.stacklvl = self.stacklvl return result def __call__(self, *args, **kwargs): tmpkwargs={} tmpkwargs.update(self.kwargs) tmpkwargs.update(kwargs) self.check(self.args + args, tmpkwargs) self.kwargs.update(kwargs) self.args = self.args + args return self def handle(self) : """ Exécute la fonction call sur sa liste d'argument. on déréférence les TailCaller le plus possible pour réduire la taille de la stack """ caller = None call = self.call while isinstance(call, TailCaller) : caller = call call = self.call.f return call(*self.args, **self.kwargs) def unicode_of_Error(x): """Formatte des exception""" return u"\n".join(unicode(i, 'utf-8') if type(i) == str else repr(i) for i in x.args) def raiseKeyboardInterrupt(x, y): """fonction utilisée pour réactiver les Ctrl-C""" raise KeyboardInterrupt() class Dialog(object): """Interface de gestion des machines et des adhérents du crans, version lc_ldap""" def __getattribute__(self, attr): ret = super(Dialog, self).__getattribute__(attr) # Petit hack pour que ça soit les methodes de l'objet instancié qui soient # décorée par TailCaller et pas les méthodes statiques de la classe Dialog if getattr(ret, 'tailCaller', False) and not isinstance(ret, TailCaller): ret = TailCaller(ret) ret.__doc__ = ret.f.__doc__ setattr(self, attr, ret) return ret def __init__(self, debug_enable=False, dialogrc=False): signal.signal(signal.SIGINT, signal.SIG_IGN) self.debug_enable = debug_enable # On met un timeout à 10min d'innactivité sur dialog self.timeout = 600 self.error_to_raise = (Continue, DialogError, ldap.SERVER_DOWN) self.dialogrc = dialogrc _dialog = None @property def dialog(self): """ Renvois l'objet dialog. """ if self.dialogrc: self._dialog = PythonDialog(DIALOGRC=self.dialogrc) else: self._dialog = PythonDialog() self.dialog_last_access = time.time() return self._dialog def nyan(self, cont, *args, **kwargs): """ Nyan nyan nyan nyan nyan nyan nyan nyan nyan nyan nyan nyan nyan nyan nyan nyan """ (lines, cols) = get_screen_size() print "\033[48;5;17m" print " "*(lines * cols) cols = int(min(cols/2, 65)) lines = int(lines) -1 cmd = "/usr/bin/nyancat -W %s -H %s" % (cols, lines) os.system(cmd) raise Continue(cont) @tailcaller def handle_dialog(self, cancel_cont, box, *args): """ Gère les fonctions appelant une fênetre dialog : gestion de l'appuie sur Ctrl-C et rattrapage des exception / segfault de dialog. cancel_cont représente l'endroit où retourner si Ctrl-C """ ctrlC=False ret=None try: signal.signal(signal.SIGINT, raiseKeyboardInterrupt) # Ctrl-C ret = box(*args) signal.signal(signal.SIGINT, signal.SIG_IGN) # Pas de Ctrl-C except KeyboardInterrupt: signal.signal(signal.SIGINT, signal.SIG_IGN) # Pas de Ctrl-C raise Continue(cancel_cont) except DialogTerminatedBySignal as e: signal.signal(signal.SIGINT, signal.SIG_IGN) # Pas de Ctrl-C if e[1] == 11: self.dialog.msgbox( "La fenêtre dialog à été fermée par une erreur de segmentation", timeout=self.timeout, title="Erreur rencontrée", width=73, height=10 ) raise Continue(cancel_cont) else: raise except PythonDialogErrorBeforeExecInChildProcess: self.dialog.msgbox( "La fenêtre dialog à été fermée et a retourné le code 127 : \n" +\ " * peut être que dialog n'a put être éxécuté\n" +\ " * peut être qu'il n'y a plus de mémoire disponible\n" +\ " * peut être que le nombre max de descripteur de fichier a été atteins\n" +\ "ou peut être que dialog fait juste du caca.\n" +\ "Quitter l'interface ?", title="Erreur rencontrée", width=73, height=12, defaultno=True, timeout=self.timeout ) raise Continue(cancel_cont) finally: if ret: return ret else: EnvironmentError("Pas de ret ?!? c'est pas possible ") @tailcaller def handle_dialog_result(self, code, output, cancel_cont, error_cont, codes_todo=[]): """ Gère les fonctions traitant les résultat d'appels à dialog. s'occupe de gérer les exceptions, Ctrl-C, propagation de certaine exceptions, l'appuis sur annuler. Le code à exécuté lui ai passé via la liste codes_todo, qui doit contenir une liste de triple : (code de retour dialog, fonction à exécuter, liste des arguements de la fonction) la fonction est appelée sur ses arguements si le code retourné par dialog correspond. codes_todo = [(code, todo, todo_args)] """ # Si on a appuyé sur annulé ou ESC, on s'en va via la continuation donnée en argument if code in (self.dialog.DIALOG_CANCEL, self.dialog.DIALOG_ESC): raise Continue(cancel_cont) # Sinon, c'est OK else: for (codes, todo, todo_args) in codes_todo: if code in codes: try: signal.signal(signal.SIGINT, raiseKeyboardInterrupt) # Ctrl-C # On effectue ce qu'il y a a faire dans todo ret = todo(*todo_args) signal.signal(signal.SIGINT, signal.SIG_IGN) # Pas de Ctrl-C return ret # On propage les Continue except self.error_to_raise: signal.signal(signal.SIGINT, signal.SIG_IGN) # Pas de Ctrl-C raise # En cas d'une autre erreur, on l'affiche et on retourne au menu d'édition except (Exception, ldap.OBJECT_CLASS_VIOLATION) as e: signal.signal(signal.SIGINT, signal.SIG_IGN) # Pas de Ctrl-C self.dialog.msgbox(traceback.format_exc() if self.debug_enable else "%s" % unicode_of_Error(e), timeout=self.timeout, title="Erreur rencontrée", width=73, height=10) raise Continue(error_cont) except KeyboardInterrupt: signal.signal(signal.SIGINT, signal.SIG_IGN) # Pas de Ctrl-C raise Continue(cancel_cont) # En cas de code de retour dialog non attendu, on prévient et on retourne au menu d'édition self.dialog.msgbox("Le code de retour dialog est %s, c'est étrange" % code, timeout=self.timeout, title="Erreur rencontrée", width=73, height=10) raise Continue(error_cont) @tailcaller def get_comment(self, title, text, cont, init='', force=False): """ Fait entrer à l'utilisateur un commentaire et le retourne. Si force est à True, on oblige le commentaire à être non vide """ (code, output) = self.dialog.inputbox(text=text, title=title, timeout=self.timeout, init=init) retry_cont = TailCall(self.get_comment, title=title, text=text, cont=cont, force=force) def todo(output, force, title, retry_cont): if force and not output: self.dialog.msgbox("Entrée vide, merci d'indiquer quelque chose", timeout=self.timeout, title=title) raise Continue(retry_cont) else: return unicode(output, 'utf-8') return self.handle_dialog_result( code=code, output=output, cancel_cont=cont, error_cont=retry_cont, codes_todo=[([self.dialog.DIALOG_OK], todo, [output, force, title, retry_cont])] ) @tailcaller def get_password(self, cont, confirm=True, title="Choix d'un mot de passe", **kwargs): """ Affiche une série d'inpuxbox pour faire entrer un mot de passe puis le retourne, si confirm=True, il y a une confirmation du mot de passe de demandée """ def todo(self_cont, cont): (code, pass1) = self.dialog.passwordbox("Entrez un mot de passe", title=title, timeout=self.timeout, **kwargs) if code != self.dialog.DIALOG_OK: raise Continue(cont) elif not pass1: raise ValueError("Mot de pass vide !") if confirm: (code, pass2) = self.dialog.passwordbox("Comfirmer le mot de passe", timeout=self.timeout, title=title, **kwargs) if code != self.dialog.DIALOG_OK: raise Continue(self_cont) if pass1 != pass2: raise ValueError("Les deux mots de passe ne concordent pas") return pass1 self_cont = TailCall(self.get_password, cont=cont) return self.handle_dialog_result( code=self.dialog.DIALOG_OK, output="", cancel_cont=cont, error_cont=self_cont, codes_todo=[([self.dialog.DIALOG_OK], todo, [self_cont, cont])] ) @tailcaller def get_timestamp(self, title, text, cont, hour=-1, minute=-1, second=-1, day=0, month=0, year=0): """Fait choisir une date et une heure et retourne le tuple (year, month, day, hour, minute, second)""" retry_cont = TailCall(self.get_timestamp, title=title, text=text, cont=cont, hour=hour, minute=minute, second=second, day=day, month=month, year=year) def get_date(day, month, year): (code, output) = self.dialog.calendar(text, day=day, month=month, year=year, timeout=self.timeout, title=title) if code in (self.dialog.DIALOG_CANCEL, self.dialog.DIALOG_ESC): raise Continue(cont) elif output: (day, month, year) = output return (year, month, day) else: raise EnvironmentError("Pourquoi je n'ai pas de date ?") def get_time(hour, minute, second, day, month, year): (code, output) = self.dialog.timebox(text, timeout=self.timeout, hour=hour, minute=minute, second=second) if code in (self.dialog.DIALOG_CANCEL, self.dialog.DIALOG_ESC): raise Continue(retry_cont(day=day, month=month, year=year)) elif output: (hour, minute, second) = output return (hour, minute, second) else: raise EnvironmentError("Pourquoi je n'ai pas d'horaire ?") (year, month, day) = get_date(day, month, year) (hour, minute, second) = get_time(hour, minute, second, day, month, year) return (year, month, day) + (hour, minute, second) def confirm(self, text, title, defaultno=False, width=0, height=0): """wrapper autour du widget yesno""" return self.dialog.yesno( text, no_collapse=True, colors=True, no_mouse=True, timeout=self.timeout, title=title, defaultno=defaultno, width=width, height=height, backtitle="Appuyez sur MAJ pour selectionner du texte" ) == self.dialog.DIALOG_OK