# -*- python -*- # -*- coding: iso-8859-15 -*- """ Fichier de base d'interface avec les switchs manageables. Donne la classe switch qui permet d'effectuer les opérations élémentaires sur les switchs manageable HP 26xx. Frédéric PAUGET """ from time import sleep from popen2 import popen3 from sys import stderr, path from commands import getstatusoutput from annuaires import chbre_prises import threading try : from secrets import config_snmp_secrete except : # Si a pas le droit de lire config_snmp_secrete def rien(truc) : pass def config_snmp_secrete(a,b) : return (rien, rien, rien) ############################################################################################# ### Définitions de classes utiles # Quelques exceptions class ConnectionTimout(Exception) : pass class ConnectionClosed(Exception) : pass class ConversationError(Exception) : pass # Classes de base pour comminiquer (ssh et snmp) class ssh : """ Ouverture d'une connexion ssh, envoi de commandes et récupération du résultat """ __debug = 1 __logDest = stderr __ssh_out = '' # Retour de la connexion ssh def __init__(self,host) : """ Ouverture d'une connexion ssh vers le switch choisi """ self.switch = host if self.__debug : self.__logDest.write("SSH DEBUG : __init__(host=%s)\n" % host) self.__host = host self.__retour, self.__input, self.__err = popen3("/usr/bin/ssh -tt %s" % host) sleep(1) # Création de threads de lecture de la connexion r = threading.Thread(target=self.__read_retour,args=(threading.currentThread(),)) r.start() e = threading.Thread(target=self.__read_err,args=(threading.currentThread(),)) e.start() self.log=open('/tmp/test_ssh','w') # On passe l'intro, passe manager et en environnement de configuration self.send_cmd('\nenable\nconfigure') def __read_retour(self,parent) : while parent.isAlive() : c = self.__retour.read(1) self.__ssh_out += c self.log.write(c) self.log.flush() def __read_err(self,parent) : err = '' while parent.isAlive : char = self.__err.read(1) if char == '\n' : # Il y a eu une erreur if err.lower() in [ 'connection to %s closed by remote host.' % self.switch , 'connection to %s closed.' % self.switch ] : raise ConnectionClosed else : print err err = '' else : err += char def __del__(self) : """Ferme la connexion : envoi logout et attend """ if self.__debug : self.__logDest.write("SSH DEBUG : __del__()\n") try : self.send_cmd('\nlogout') except ConnectionClosed : # C'est le but pass def send_cmd(self,cmd,timeout=15): """ Envoi une commande, attend le prompt et retourne la réponse """ if self.__debug : self.__logDest.write("SSH DEBUG : __send_cmd(%s)\n" % cmd.strip() ) self.__ssh_out = '' # oubli de ce qu'il y a avant self.log.write('*****\n') self.log.flush() # Envoi de la commande self.__input.write(cmd+'\n') self.__input.flush() # Premier retour count=0 while 1: # On a récupéré un prompt ? try : print "|%s|" % self.__ssh_out[-45:-38] if self.__ssh_out[-45:-38] == ' [y/n]?' : self.__input.write('y') self.__input.flush() sleep(1) continue elif self.__ssh_out[-46:-36].lower() == '(config)# ' : if self.__debug : self.__logDest.write("SSH DEBUG : __send_cmd -> OK\n") break elif self.__ssh_out[-70:-7] == '-- MORE --, next page: Space, next line: Enter, quit: Control-C' : # Faut appuyer sur une touche if self.__debug : self.__logDest.write("SSH DEBUG : __send_cmd -> MORE\n") self.__input.write(' ') self.__input.flush() sleep(1) continue except: pass # Rien de bien, le switch es un peu lent, on attend if self.__debug : self.__logDest.write("SSH DEBUG : __send_cmd -> WAIT\n") sleep(1) count += 1 if count > timeout : # Il y a un problème raise ConnectionTimout return self.__ssh_out class snmp : """ Classe de communication SNMP """ def __init__(self,host,version=None,community=None,authentication_protocol=None, authentication_pass=None, username=None, privacy_pass=None) : """ host doit être la machine sur laquelle se connecter version est la verion du protocole snmp à utiliser : 1, 2c ou 3 le reste des données doit être ou non fourni suivant la version pour v1 et v2c seule la communauté est prise en compte pour v3 authentication_protocol, authentication_pass et username sont requis si accès en authNoPriv si privacy_pass est fourni accès en authPriv """ self.host = host self.version = version if version == '1' or version == '2c' : self.options = "-v %s -c '%s' %s " % ( version, community, host ) elif version =='3' : self.options = "-v 3 -u %s -a %s -A '%s' -l authNoPriv" % ( username, authentication_protocol, authentication_pass ) if privacy_pass : self.options += " -x DES -X '%s' -l authPriv" % privacy_pass self.options += " %s " % host else : raise ValueError('Version incorrecte') def __exec(self,cmd) : s, r = getstatusoutput(cmd) if s : raise ConversationError(r) return r def get(self,oid) : """ Retourne le résultat correspondant à l'oid demandé """ return self.__exec('snmpget -O vq %s %s ' % ( self.options, oid ) ) def set(self,oid,typ,val) : """ Change la valeur le l'oid donné. type est le type de la valeur val est la valeur à écrire """ return self.__exec('snmpset -O vq %s %s %s %s' % (self.options, oid, typ, val ) ) def walk(self,base_oid) : """ Retourne le résultat de snmpwalk le retour est un dictionnaire { oid : valeur } """ lignes = self.__exec('snmpwalk -O q %s %s' % (self.options, base_oid ) ).split('\n') result = {} for l in lignes : oid, valeur = l.split(' ', 1) result[oid] = valeur return result ############################################################################################# ### Gestion des switchs proprement dite class hpswitch : """ Classe pour l'administration des switchs HP. """ # Variables internes __debug=0 __logDest=stderr # Quelques paramètres IP_tftp = '138.231.136.7' # Variables internes switch = None # nom du switch prise = '' __conn_ssh = None def __init__(self,switch) : """ Switch doit être le nom du switch """ if self.__debug : self.__logDest.write("HP DEBUG : __init__(switch=%s)\n" % switch ) self.switch = switch.lower() # Config snmp self.get, self.set, self.walk = config_snmp_secrete(snmp,switch) def __ssh(self,cmd) : if not self.__conn_ssh : self.__conn_ssh = ssh(self.switch) return self.__conn_ssh.send_cmd(cmd) def set_prise(self,prise,action) : """ Effectue l'action action donnée sur la (les) prise(s) donnée(s) 'prise' peut être un ensembre de prises : ex : '1-3,6' effectuera l'action sur les prises 1,2,3 et 6 'action' peut être principalement enable/disable speed-duplex [auto-100/auto-10] voir manuel des swtichs pour plus d'actions possibles """ if not prise : prise = self.prise if self.__debug : self.__logDest.write("HP DEBUG : set_prise(prise=%s,action=%s)\n" %( prise,action)) return self.__ssh( "interface ethernet %s %s" % (prise,action) ) def show_stats(self,prise='') : """ Retourne les stats de(s) prise(s) choisie(s).""" if not prise : prise = self.prise if self.__debug : self.__logDest.write("HP DEBUG : show_prise_status(prise=%s)\n" % prise) if prise : return self.__ssh("show interfaces ethernet %s" % prise) else : return self.__ssh("show interfaces") def show_prise_mac(self,prise='') : """ Retourne le(s) adresse(s) MAC présentes sur la prise. Si la prise n'es pas spécifié retourne toutes les adresses mac connues du switch.""" if not prise : prise = self.prise if self.__debug : self.__logDest.write("HP DEBUG : show_prise_mac(prise=%s)\n" % prise) try: data = self.walk('SNMPv2-SMI::enterprises.11.2.14.11.5.1.9.4.2.1.2.%s' % prise) return map(lambda x:":".join(x[1:-2].lower().split(" ")),data.values()) except ValueError: # Pas de MAC trouvée return [] def set_prise_mac(self,prise='',mac='') : """ Défini les adresses mac autorisées sur une prise. /!\ format de la mac : xxxxxx-xxxxxx (x hexa) On peut aussi en mettre plusieus séparées par des espaces. Si mac est précédé de -, enlève les adresses (si juste - les vire toutes) Si mac n'est pas fourni retourne le(s) adresse(s) MAC autorisées sur la prise. Si prise n'est pas fourni retourne la config de sécurité de tous les ports """ if not prise : prise = self.prise if self.__debug : self.__logDest.write("HP DEBUG : set_prise_mac(prise=%s,mac=%s)\n" % (prise,mac)) if not mac : return self.__sudo("show port-security %s" % prise) if mac[0]=='-' : no='no ' mac=mac[1:] else :no='' return self.__ssh("%sport-security ethernet %s mac-address %s" % (no, prise,mac) ) def show_interfaces(self,prise='') : """ Retourne la liste des interfaces ainsi que leur état Si prise est spécifié n'affiche que la prise demandée """ if not prise : prise = self.prise if self.__debug : self.__logDest.write("HP DEBUG : show_interfaces(prise=%s)\n" % prise) a = self.__ssh("show interfaces brief") if a and prise : a=a.split('\n') try : a='\n'.join(a[:5]+[a[prise+4]],'\n') except : a=0 return a def update(self) : """ Upload de la config courrante Téléchargment de la nouvelle config Reboot.""" if self.__debug : self.__logDest.write("HP DEBUG : update()\n") # Sauvegarde self.__ssh("copy running-config tftp %s %s unix" % (self.IP_tftp, self.switch + '.running') ) try : self.__ssh("copy tftp startup-config %s %s unix" % (self.IP_tftp, self.switch+'.conf') ) except ConnectionClosed : # C'est normal : le switch reboote pass def upgrade(self) : """ Changement de firmware, le firmware est sur la machine ayant le serveur tftfp et a pour nom firmware26xx """ if self.__debug : self.__logDest.write("HP DEBUG : update()\n") self.__ssh("copy tftp flash %s firmware26xx" % self.IP_tftp,70) def reboot(self) : """ Reboote le switch """ if self.__debug : self.__logDest.write("HP DEBUG : reboot()\n") try : self.__ssh("boot") except ConnectionClosed : # C'est normal : le switch reboote pass # Fonction utilisant le SNMP def multicast(self,ip='') : """ Donne la liste des ports du swich inscrits au flux multicast donné Si aucun flux donné teste tous les flux multicast possibles. Retourne un dictionnaire : { adresse du flux : [ ports inscrits ] } """ if self.__debug : self.__logDest.write("HP DEBUG : multicast(ip=%s)\n" % ip) if ip : data = self.walk('SNMPv2-SMI::enterprises.11.2.14.11.5.1.9.10.3.1.1.1%s' % ip) return { ip : data.values() } else : data = self.walk('SNMPv2-SMI::enterprises.11.2.14.11.5.1.9.10.3.1.1.1') result = {} # On veut tout for oid, port in data.items() : try : ip = '.'.join(oid.split('.')[13:17]) except : continue result.setdefault(ip,[]) result[ip].append(port) return result def nb_prises(self) : """ Retourne le nombre de prises du switch """ if self.__debug : self.__logDest.write("HP DEBUG : nb_prises()\n") return int(self.get('IF-MIB::interfaces.ifNumber.0')) - 2 def version(self) : """ Retourne la version du firmware du switch """ if self.__debug : self.__logDest.write("HP DEBUG : version()\n") return self.get('SNMPv2-MIB::sysDescr.0') def enable(self,prise=0) : """ Active une prise """ if not prise : prise = self.prise if self.__debug : self.__logDest.write("HP DEBUG : enable(prise=%s)\n" % prise) return self.set('IF-MIB::ifAdminStatus.%i' % prise, 'i', 1) def disable(self,prise=0) : """ Désactive une prise """ if not prise : prise = self.prise if self.__debug : self.__logDest.write("HP DEBUG : disable(prise=%s)\n" % prise) return self.set('IF-MIB::ifAdminStatus.%i' % prise, 'i', 2) def status(self,prise=0) : """ Retourne un dictionnaire décrivant la prise donnée les clefs sont : activé, etat, vitesse, vitesse_max, nom """ if not prise : prise = self.prise if self.__debug : self.__logDest.write("HP DEBUG : status(prise=%s)\n" % prise) r={} # Activée ? r['activée'] = self.get('IF-MIB::ifAdminStatus.%i' % prise) # Etat actuel ? r['etat'] = self.get('IF-MIB::ifOperStatus.%i' % prise) # Vitesse v = self.get('IF-MIB::ifSpeed.%i' % prise) r['vitesse'] = '%iMbps' % ( int(v)/1000000 ) # Vitesse maximum r['vitesse_max'] = '%iMbps' % int(self.get('IF-MIB::ifHighSpeed.%i' % prise)) # Nom r['nom'] = self.get('IF-MIB::ifAlias.%i' % prise) return r class sw_chbre(hpswitch) : def __init__(self,chbre) : # On retrouve la chbre dans l'annuaire try : bat = chbre[0].lower() prise = chbre_prises[bat][chbre[1:]] self.switch = 'bat%s' % bat num_switch = int(prise[0]) if num_switch != 0 : self.switch += '-%i' % num_switch if prise[-1] == '-' : #Prise en 10 self.prise = int(prise[1:-1]) else : self.prise = int(prise[1:]) except : raise ValueError('Chambre %s inconnue' % chbre) # Config snmp self.get, self.set, self.walk = config_snmp_secrete(snmp,self.switch)