#!/usr/bin/env python # -*- coding: utf-8 -*- # # Copyright (C) 2009 Antoine Durand-Gasselin # Author: Antoine Durand-Gasselin # class TryAgain(Exception): """Exception raised when the step should be taken again.""" class PreviousStep(Exception): """Exception raised when should be backtracked to previous step.""" class EndScenario(Exception): """Exception raised when the scenario should be halted. 'msg' is the error message 'data' is the updated state object""" def __init__(self, msg, data=None): self.msg = msg self.data = data class Step: u"""This class defines a step. A step is defined by providing a function that, given an environment and an expected answer will return an update of the environment (as dictionnary).""" def __init__(self, update_fn): self.update = update_fn def run(self, env, default): """This function makes the call""" return self.update(env, default) class Scenario: u"""This class allows us to define scenarios.""" def __init__(self): u"""empty scenario, with an error handler (that takes an exception as argument)""" self.steps = None def nest(self, step): u"""Adds a step to a scenario""" if not isinstance(step, Step): raise TypeError("Can only bind steps") self.steps = ('NEST', step, self.steps) def case(self, switch, cases, fallback = None): #revoir le fallback u"""Calls a function (passing to it the environment), and will call the corresponding scenario in the second arg.""" if not callable(switch): raise TypeError("switch must be callable") if not isinstance(cases, dict): raise TypeError("cases must be a dict") for case in cases.values(): if not isinstance(case, Scenario): raise TypeError("cases must all be Scenarios") self.steps = ('CASE', (switch, cases, fallback), self.steps) def branch(self, cond, plan_A, plan_B): u"""Makes a test (will call it passing the environnement), and depending on the result, will process one of the two scenarios""" if not callable(cond): raise TypeError("cond must be callable") if not isinstance(plan_A, Scenario) or not isinstance(plan_B, Scenario): raise TypeError("Can only branch on scenarios") # self.steps = ('BRANCH', (cond, plan_A, plan_B) , self.steps) self.case(cond, { True: plan_A, False: plan_B }) def quote(self, scenario): u"""Runs a scenario as a single scenario step""" if not isinstance(scenario, Scenario): raise TypeError("scenario must be a scenario") def quote_scenar (dict1, dict2): try: return Running(scenario).run() except EndScenario: raise PreviousStep self.nest(Step(quote_scenar)) def step_scenario(step): s = Scenario() s.nest(step) return s def unit_scenario(): return ( Scenario()) def _relance(exc): raise exc class Running: u"""To run scenarios""" defaults = {} steps = None stack = None def __init__(self, scenario, env = {}, handle = _relance): if not isinstance(scenario, Scenario): raise TypeError("Can only run Scenarios") accu = scenario.steps self.env = env self.handle = handle # To avoid brain spots on the walls, we shall reverse the list # of steps. while accu: self.steps = accu[0], accu[1], self.steps accu = accu[2] def step(self): if self.steps: # Case of a Case, as a list of possible choices depending on # the result of a function call, if it is not handled, then # it does nothing. if self.steps[0] == 'CASE' : switch, cases, fallback = self.steps[1] self.steps = self.steps[2] scenar = cases.get(switch(self.env), fallback) if scenar: plan_steps = scenar.steps else: plan_steps = None while plan_steps: self.steps = plan_steps[0], plan_steps[1], self.steps plan_steps = plan_steps[2] ## # Case of a Branching ## if self.steps[0] == 'BRANCH' : ## # As it is (should be) an epsilon-test we won't ## # backtrack it. ## cond, plan_A, plan_B = self.steps[1] ## self.steps = self.steps[2] ## if cond(self.env): ## plan_steps = plan_A.steps ## else: ## plan_steps = plan_B.steps ## # Let's not forget we need to reverse the steps lists. ## while plan_steps: ## self.steps = plan_steps[0], plan_steps[1], self.steps ## plan_steps = plan_steps[2] # Case of nesting elif self.steps[0] == 'NEST': try: this_step = self.steps[1] new_env = this_step.run(self.env, self.defaults) # Should we perform sanity checks on new_env ? and raise # TryAgain if it fails ? After updating defaults ? self.stack = (self.env.copy(), new_env, self.steps, self.stack) self.env.update(new_env) self.defaults = {} self.steps = self.steps[2] except PreviousStep: if self.stack: self.env, self.defaults, self.steps, self.stack = self.stack else: raise EndScenario("No previous step", self.env) except TryAgain: # We can update defaults pass except Exception, e: try: self.handle(e) except PreviousStep: if self.stack: self.env, self.defaults, self.steps, self.stack = self.stack else: raise EndScenario("No previous step", self.env) except TryAgain: # We can update defaults pass else: # Should not be called raise "invalid step" def run(self): while self.steps: self.step() return (self.env) # For testing issues def prompt(var): def fn(dict, default): a = raw_input(u"%s (%s):= " % (var, default.get(var, ''))) if a == 'n': raise TryAgain elif a == 'b': raise PreviousStep else: return { var : a } return Step(fn) if __name__ == "__main__": s = Scenario() t = Scenario() u = Scenario() for i in ['toto', 'tata', 'titi', 'tutu']: s.nest(prompt(i)) t.nest(prompt(i[1])) for i in range(6,9): u.nest(prompt(str(i))) u.quote(t) for i in range(12,15): u.nest(prompt(str(i))) print (Running(u).run())