
* Comme ça c'est plus clair que c'est un truc custom crans * Le lien symbolique /usr/scripts/gestion/crans/ est retiré. D'autres suivront.
220 lines
7.1 KiB
Python
220 lines
7.1 KiB
Python
#!/usr/bin/env python
|
|
# -*- coding: utf-8 -*-
|
|
#
|
|
# Copyright (C) 2009 Antoine Durand-Gasselin
|
|
# Author: Antoine Durand-Gasselin <adg@crans.org>
|
|
#
|
|
|
|
class TryAgain(Exception):
|
|
"""Exception raised when the step should be taken again."""
|
|
|
|
class PreviousStep(Exception):
|
|
"""Exception raised when should be backtracked to previous step."""
|
|
|
|
class EndScenario(Exception):
|
|
"""Exception raised when the scenario should be halted.
|
|
'msg' is the error message
|
|
'data' is the updated state object"""
|
|
def __init__(self, msg, data=None):
|
|
self.msg = msg
|
|
self.data = data
|
|
|
|
class Step:
|
|
u"""This class defines a step. A step is defined by providing a
|
|
function that, given an environment and an expected answer will
|
|
return an update of the environment (as dictionnary)."""
|
|
def __init__(self, update_fn):
|
|
self.update = update_fn
|
|
|
|
def run(self, env, default):
|
|
"""This function makes the call"""
|
|
return self.update(env, default)
|
|
|
|
class Scenario:
|
|
u"""This class allows us to define scenarios."""
|
|
|
|
def __init__(self):
|
|
u"""empty scenario, with an error handler (that takes an exception as
|
|
argument)"""
|
|
self.steps = None
|
|
|
|
def nest(self, step):
|
|
u"""Adds a step to a scenario"""
|
|
if not isinstance(step, Step):
|
|
raise TypeError("Can only bind steps")
|
|
self.steps = ('NEST', step, self.steps)
|
|
|
|
def case(self, switch, cases, fallback = None): #revoir le fallback
|
|
u"""Calls a function (passing to it the environment), and will
|
|
call the corresponding scenario in the second arg."""
|
|
if not callable(switch):
|
|
raise TypeError("switch must be callable")
|
|
if not isinstance(cases, dict):
|
|
raise TypeError("cases must be a dict")
|
|
for case in cases.values():
|
|
if not isinstance(case, Scenario):
|
|
raise TypeError("cases must all be Scenarios")
|
|
self.steps = ('CASE', (switch, cases, fallback), self.steps)
|
|
|
|
def branch(self, cond, plan_A, plan_B):
|
|
u"""Makes a test (will call it passing the environnement), and
|
|
depending on the result, will process one of the two scenarios"""
|
|
if not callable(cond):
|
|
raise TypeError("cond must be callable")
|
|
if not isinstance(plan_A, Scenario) or not isinstance(plan_B, Scenario):
|
|
raise TypeError("Can only branch on scenarios")
|
|
# self.steps = ('BRANCH', (cond, plan_A, plan_B) , self.steps)
|
|
self.case(cond, { True: plan_A, False: plan_B })
|
|
|
|
def quote(self, scenario):
|
|
u"""Runs a scenario as a single scenario step"""
|
|
|
|
if not isinstance(scenario, Scenario):
|
|
raise TypeError("scenario must be a scenario")
|
|
|
|
def quote_scenar (dict1, dict2):
|
|
try:
|
|
return Running(scenario).run()
|
|
except EndScenario:
|
|
raise PreviousStep
|
|
|
|
self.nest(Step(quote_scenar))
|
|
|
|
def step_scenario(step):
|
|
s = Scenario()
|
|
s.nest(step)
|
|
return s
|
|
|
|
def unit_scenario():
|
|
return ( Scenario())
|
|
|
|
def _relance(exc): raise exc
|
|
|
|
class Running:
|
|
u"""To run scenarios"""
|
|
|
|
defaults = {}
|
|
steps = None
|
|
stack = None
|
|
|
|
def __init__(self, scenario, env = {}, handle = _relance):
|
|
if not isinstance(scenario, Scenario):
|
|
raise TypeError("Can only run Scenarios")
|
|
accu = scenario.steps
|
|
self.env = env
|
|
self.handle = handle
|
|
|
|
# To avoid brain spots on the walls, we shall reverse the list
|
|
# of steps.
|
|
while accu:
|
|
self.steps = accu[0], accu[1], self.steps
|
|
accu = accu[2]
|
|
|
|
def step(self):
|
|
if self.steps:
|
|
# Case of a Case, as a list of possible choices depending on
|
|
# the result of a function call, if it is not handled, then
|
|
# it does nothing.
|
|
if self.steps[0] == 'CASE' :
|
|
switch, cases, fallback = self.steps[1]
|
|
self.steps = self.steps[2]
|
|
scenar = cases.get(switch(self.env), fallback)
|
|
if scenar: plan_steps = scenar.steps
|
|
else: plan_steps = None
|
|
while plan_steps:
|
|
self.steps = plan_steps[0], plan_steps[1], self.steps
|
|
plan_steps = plan_steps[2]
|
|
|
|
## # Case of a Branching
|
|
## if self.steps[0] == 'BRANCH' :
|
|
## # As it is (should be) an epsilon-test we won't
|
|
## # backtrack it.
|
|
## cond, plan_A, plan_B = self.steps[1]
|
|
## self.steps = self.steps[2]
|
|
## if cond(self.env):
|
|
## plan_steps = plan_A.steps
|
|
## else:
|
|
## plan_steps = plan_B.steps
|
|
## # Let's not forget we need to reverse the steps lists.
|
|
## while plan_steps:
|
|
## self.steps = plan_steps[0], plan_steps[1], self.steps
|
|
## plan_steps = plan_steps[2]
|
|
|
|
# Case of nesting
|
|
elif self.steps[0] == 'NEST':
|
|
try:
|
|
this_step = self.steps[1]
|
|
new_env = this_step.run(self.env, self.defaults)
|
|
# Should we perform sanity checks on new_env ? and raise
|
|
# TryAgain if it fails ? After updating defaults ?
|
|
|
|
self.stack = (self.env.copy(), new_env, self.steps, self.stack)
|
|
self.env.update(new_env)
|
|
self.defaults = {}
|
|
self.steps = self.steps[2]
|
|
|
|
except PreviousStep:
|
|
if self.stack:
|
|
self.env, self.defaults, self.steps, self.stack = self.stack
|
|
else:
|
|
raise EndScenario("No previous step", self.env)
|
|
|
|
except TryAgain:
|
|
# We can update defaults
|
|
pass
|
|
|
|
except Exception, e:
|
|
try:
|
|
self.handle(e)
|
|
|
|
except PreviousStep:
|
|
if self.stack:
|
|
self.env, self.defaults, self.steps, self.stack = self.stack
|
|
else:
|
|
raise EndScenario("No previous step", self.env)
|
|
|
|
except TryAgain:
|
|
# We can update defaults
|
|
pass
|
|
|
|
else:
|
|
# Should not be called
|
|
raise "invalid step"
|
|
|
|
def run(self):
|
|
while self.steps:
|
|
self.step()
|
|
return (self.env)
|
|
|
|
|
|
# For testing issues
|
|
def prompt(var):
|
|
def fn(dict, default):
|
|
a = raw_input(u"%s (%s):= " % (var, default.get(var, '<no default>')))
|
|
if a == 'n':
|
|
raise TryAgain
|
|
elif a == 'b':
|
|
raise PreviousStep
|
|
else:
|
|
return { var : a }
|
|
return Step(fn)
|
|
|
|
if __name__ == "__main__":
|
|
s = Scenario()
|
|
t = Scenario()
|
|
u = Scenario()
|
|
|
|
for i in ['toto', 'tata', 'titi', 'tutu']:
|
|
s.nest(prompt(i))
|
|
t.nest(prompt(i[1]))
|
|
|
|
for i in range(6,9):
|
|
u.nest(prompt(str(i)))
|
|
|
|
u.quote(t)
|
|
|
|
for i in range(12,15):
|
|
u.nest(prompt(str(i)))
|
|
|
|
|
|
print (Running(u).run())
|