# -*- python -*- # -*- coding: iso-8859-15 -*- """ Fichier de base d'interface avec les switchs manageables. Donne la classe switch qui permet d'effectuer les opérations élémentaires sur les switchs manageable HP 26xx. Frédéric PAUGET """ from time import sleep from popen2 import popen3 from sys import stderr, path from commands import getstatusoutput from annuaires import chbre_prises, all_switchs, uplink_prises try : from secrets import config_snmp_secrete except : # Si a pas le droit de lire config_snmp_secrete # on va tenter de tout faire en snmpv1 et communauté public def config_snmp_secrete(snmp,switch) : snmp = snmp(switch,version='1',community='public') return snmp.get, snmp.set, snmp.walk ############################################################################################# ### Définitions de classes utiles # Quelques exceptions class ConnectionTimout(Exception) : pass class ConnectionClosed(Exception) : pass class ConversationError(Exception) : pass # Classes de base pour comminiquer (ssh et snmp) import pexpect class ssh : """ Ouverture d'une connexion ssh, envoi de commandes et récupération du résultat """ def __init__(self,host) : """ Ouverture d'une connexion ssh vers le switch choisi """ self.switch = host self.__sshout = '' self.ssh = pexpect.spawn("ssh %s" % host) self.ssh.sendline("") self.ssh.sendline("configure") try: self.ssh.expect("%s\(config\)# " % self.switch, timeout=20) except pexpect.TIMEOUT: print "Timeout !" import traceback traceback.print_exc() print "Contenu du buffer :" print self.ssh.before def __del__(self) : """Ferme la connexion : envoi logout et attend """ self.ssh.sendline("logout") self.ssh.send("y") self.ssh.send("y") self.ssh.close() def send_cmd(self,cmd,timeout=15): """ Envoi une commande, attend le prompt et retourne la réponse """ # Envoi de la commande self.ssh.sendline(cmd) self.__sshout = '' try: # Attente de la réponse while 1: index = self.ssh.expect([' \[y/n\]\? ', '%s\(config\)# ' % self.switch, 'quit: Control-C'], timeout=timeout) self.__sshout = self.__sshout + self.ssh.before if index == 0: # On répond oui self.ssh.send("y") elif index == 1: # On est revenu au prompt break elif index == 2: # On doit continuer, on envoie espace self.ssh.send(" ") return self.__sshout except pexpect.TIMEOUT: print "Timeout !" import traceback traceback.print_exc() print "Contenu du buffer :" print self.ssh.before class snmp : """ Classe de communication SNMP """ def __init__(self,host,version=None,community=None,authentication_protocol=None, authentication_pass=None, username=None, privacy_pass=None) : """ host doit être la machine sur laquelle se connecter version est la verion du protocole snmp à utiliser : 1, 2c ou 3 le reste des données doit être ou non fourni suivant la version pour v1 et v2c seule la communauté est prise en compte pour v3 authentication_protocol, authentication_pass et username sont requis si accès en authNoPriv si privacy_pass est fourni accès en authPriv """ self.host = host self.version = version if version == '1' or version == '2c' : self.options = "-v %s -c '%s' %s " % ( version, community, host ) elif version =='3' : self.options = "-v 3 -u %s -a %s -A '%s' -l authNoPriv" % ( username, authentication_protocol, authentication_pass ) if privacy_pass : self.options += " -x DES -X '%s' -l authPriv" % privacy_pass self.options += " %s " % host else : raise ValueError('Version incorrecte') def __exec(self,cmd) : s, r = getstatusoutput(cmd) if s : raise ConversationError(r) return r def get(self,oid) : """ Retourne le résultat correspondant à l'oid demandé """ return self.__exec('snmpget -O vq %s %s ' % ( self.options, oid ) ) def set(self,oid,typ,val) : """ Change la valeur le l'oid donné. type est le type de la valeur val est la valeur à écrire """ return self.__exec('snmpset -O vq %s %s %s %s' % (self.options, oid, typ, val ) ) def walk(self,base_oid) : """ Retourne le résultat de snmpwalk le retour est un dictionnaire { oid : valeur } """ lignes = self.__exec('snmpwalk -O q %s %s' % (self.options, base_oid ) ).split('\n') result = {} for l in lignes : if l !="" : oid, valeur = l.split(' ', 1) result[oid] = valeur return result ############################################################################################# ### Gestion des switchs proprement dite class hpswitch : """ Classe pour l'administration des switchs HP. """ # Variables internes __debug=0 __logDest=stderr # Quelques paramètres IP_tftp = '138.231.136.7' firmware_file = 'firmware26xx' # Variables internes switch = None # nom du switch prise = '' __conn_ssh = None def __init__(self,switch) : """ Switch doit être le nom du switch """ if self.__debug : self.__logDest.write("HP DEBUG : __init__(switch=%s)\n" % switch ) self.switch = switch.lower() # Config snmp self.get, self.set, self.walk = config_snmp_secrete(snmp,switch) def __ssh(self,cmd,timout=15) : if not self.__conn_ssh : self.__conn_ssh = ssh(self.switch) return self.__conn_ssh.send_cmd(cmd,timout) def set_prise(self,prise,action) : """ Effectue l'action action donnée sur la (les) prise(s) donnée(s) 'prise' peut être un ensembre de prises : ex : '1-3,6' effectuera l'action sur les prises 1,2,3 et 6 'action' peut être principalement enable/disable speed-duplex [auto-100/auto-10] voir manuel des swtichs pour plus d'actions possibles """ if not prise : prise = self.prise if self.__debug : self.__logDest.write("HP DEBUG : set_prise(prise=%s,action=%s)\n" %( prise,action)) return self.__ssh( "interface ethernet %s %s" % (prise,action) ) def show_stats(self,prise='') : """ Retourne les stats de(s) prise(s) choisie(s).""" if not prise : prise = self.prise if self.__debug : self.__logDest.write("HP DEBUG : show_prise_status(prise=%s)\n" % prise) if prise : return self.__ssh("show interfaces ethernet %s" % prise) else : return self.__ssh("show interfaces") def show_prise_mac(self,prise='') : """ Retourne le(s) adresse(s) MAC présentes sur la prise.""" if not prise : prise = self.prise if self.__debug : self.__logDest.write("HP DEBUG : show_prise_mac(prise=%s)\n" % prise) try: data = self.walk('STATISTICS-MIB::hpSwitchPortFdbAddress.%d' % int(prise)) return map(lambda x:":".join(x[1:-2].lower().split(" ")),data.values()) except ValueError: # Pas de MAC trouvée return [] def where_is_mac(self, mac) : """Retrouve la prise correspondant à une adresse MAC donnée""" if self.__debug : self.__logDest.write("HP DEBUG : where_is_mac(mac=%s)\n" % mac) # On va transformer l'adresse MAC cherchée pour la mettre au format 0A 0A 0A 0A 0A 0A mac = mac.upper() mac = filter(lambda x: x in "0123456789ABCDEF", mac) # 0A0A0A0A0A0A mac = "%s %s %s %s %s %s" % (mac[0:2], mac[2:4], mac[4:6], mac[6:8], mac[8:10], mac[10:12]) # On interroge le switch data = self.walk('STATISTICS-MIB::hpSwitchPortFdbAddress'); # On cherche dans data la bonne adresse MAC for (onesnmp, onemac) in data.iteritems(): if onemac[1:-2] == mac: return int(onesnmp.split(".")[1]) # On a rien trouvé return None def set_prise_mac(self,prise='',mac='') : """ Défini les adresses mac autorisées sur une prise. /!\ format de la mac : xxxxxx-xxxxxx (x hexa) On peut aussi en mettre plusieus séparées par des espaces. Si mac est précédé de -, enlève les adresses (si juste - les vire toutes) Si mac n'est pas fourni retourne le(s) adresse(s) MAC autorisées sur la prise. Si prise n'est pas fourni retourne la config de sécurité de tous les ports """ if not prise : prise = self.prise if self.__debug : self.__logDest.write("HP DEBUG : set_prise_mac(prise=%s,mac=%s)\n" % (prise,mac)) if not mac : return self.__sudo("show port-security %s" % prise) if mac[0]=='-' : no='no ' mac=mac[1:] else :no='' return self.__ssh("%sport-security ethernet %s mac-address %s" % (no, prise,mac) ) def show_interfaces(self,prise='') : """ Retourne la liste des interfaces ainsi que leur état Si prise est spécifié n'affiche que la prise demandée """ if not prise : prise = self.prise if self.__debug : self.__logDest.write("HP DEBUG : show_interfaces(prise=%s)\n" % prise) a = self.__ssh("show interfaces brief") if a and prise : a=a.split('\n') try : a='\n'.join(a[:5]+[a[prise+4]],'\n') except : a=0 return a def update(self) : """ Upload de la config courrante Téléchargment de la nouvelle config Reboot.""" if self.__debug : self.__logDest.write("HP DEBUG : update()\n") # Sauvegarde self.__ssh("copy running-config tftp %s %s unix" % (self.IP_tftp, self.switch + '.running') ) try : print self.__ssh("copy tftp startup-config %s %s unix" % (self.IP_tftp, self.switch+'.conf') ) except ConnectionClosed : # C'est normal : le switch reboote pass def upgrade(self) : """ Changement de firmware, le firmware est sur la machine ayant le serveur tftfp et a pour nom firmware26xx """ if self.__debug : self.__logDest.write("HP DEBUG : upgrate()\n") print self.__ssh("copy tftp flash %s %s" % (self.IP_tftp, self.firmware_file) , 120) # Long timout def reboot(self) : """ Reboote le switch """ if self.__debug : self.__logDest.write("HP DEBUG : reboot()\n") try : self.__ssh("boot") except ConnectionClosed : # C'est normal : le switch reboote pass # Fonction utilisant le SNMP def multicast(self,ip='') : """ Donne la liste des ports du swich inscrits au flux multicast donné Si aucun flux donné teste tous les flux multicast possibles. Retourne un dictionnaire : { adresse du flux : [ ports inscrits ] } """ if self.__debug : self.__logDest.write("HP DEBUG : multicast(ip=%s)\n" % ip) if ip : data = self.walk('STATISTICS-MIB::hpIgmpStatsPortIndex2.%s' % ip) return { ip : data.values() } else : data = self.walk('STATISTICS-MIB::hpIgmpStatsPortIndex2') result = {} # On veut tout for oid, port in data.items() : try : ip = '.'.join(oid.split('.')[2:6]) except : continue result.setdefault(ip,[]) result[ip].append(port) return result def nb_prises(self) : """ Retourne le nombre de prises du switch """ if self.__debug : self.__logDest.write("HP DEBUG : nb_prises()\n") return int(self.version().split()[4][2:4]) def version(self) : """ Retourne la version du firmware du switch """ if self.__debug : self.__logDest.write("HP DEBUG : version()\n") return self.get('SNMPv2-MIB::sysDescr.0') def enable(self,prise=0) : """ Active une prise """ if not prise : prise = self.prise if self.__debug : self.__logDest.write("HP DEBUG : enable(prise=%s)\n" % prise) return self.set('IF-MIB::ifAdminStatus.%d' % int(prise), 'i', 1) def disable(self,prise=0) : """ Désactive une prise """ if not prise : prise = self.prise if self.__debug : self.__logDest.write("HP DEBUG : disable(prise=%s)\n" % prise) return self.set('IF-MIB::ifAdminStatus.%d' % int(prise), 'i', 2) def __is(self,oid,prise) : if not prise : prise = self.prise prise = str(prise) if prise=='all' : nb = 0 for oid,etat in self.walk(oid).items() : if etat == 'up' and int(oid.split('.')[1])<51 : # Le <51 est ici pour éviter de compter les ports fictifs nb += 1 return nb prise = prise.replace('-','') return self.get(oid + '.' + prise) == 'up' def is_enable(self,prise=0) : """ Retoune True ou False suivant si la prise est activée ou non Si prise=all retourne le nombre de prises activées sur le switch """ if prise != 'all': prise = int(prise) return self.__is('IF-MIB::ifAdminStatus',prise) def is_up(self,prise=0) : """ Retoune True ou False suivant si la prise est up Si prise=all retourne le nombre de prises up sur le switch """ if prise != 'all': prise = int(prise) return self.__is('IF-MIB::ifOperStatus',prise) def nom(self,nom=None,prise=0) : """ Retourne ou attribue le nom à la prise fournie """ if not prise : prise = self.prise oid = 'IF-MIB::ifAlias.%d' % int(prise) if nom==None : return self.get(oid) else : self.set(oid, 's' , nom) def eth_mode(self,mode=None,prise=0) : """ Fixe ou retourne le mode d'une prise mode est un tuple : (vitesse, duplex) ou simplement "auto" vitesse est : 10 100 ou 1000 duplex est FD, HD ou auto """ if not prise : prise = self.prise oid = 'CONFIG-MIB::hpSwitchPortFastEtherMode.%d' % int(prise) if mode == None : return self.get(oid) # Conversion du mode if mode == 'auto' : code = 5 else : code = { 'HD' : 2 , 'FD' : 4 , 'AUTO' : 8 }[mode[1].upper()] if mode[0] == 10 : code -= 1 elif mode[0] == 1000 : if code == 8 : code += 1 elif code == 2 : raise ValueError('Mode invelide %s' % mode) else: code += 1 self.set(oid,'i',code) class sw_chbre(hpswitch) : def __init__(self,chbre) : # On retrouve la chbre dans l'annuaire self.chbre = chbre try : bat = chbre[0].lower() prise = chbre_prises[bat][chbre[1:]] self.prise_brute = prise self.switch = 'bat%s' % bat num_switch = int(prise[0]) self.switch += '-%i' % num_switch if prise[-1] == '-' : #Prise en 10 self.prise = int(prise[1:-1]) self.prise10Mb = True else : self.prise = int(prise[1:]) self.prise10Mb = False except : raise ValueError('Chambre %s inconnue' % chbre) # Config snmp self.get, self.set, self.walk = config_snmp_secrete(snmp,self.switch) def reconfigure(self) : """ Reconfigure la prise (nom et vitesse) """ in10 = self.eth_mode().find('10Mbits') != 1 if self.prise10Mb and not in10 : self.eth_mode(('10','auto')) elif not self.prise10Mb and in10 : self.eth_mode('auto') nom = 'Chambre_%s' % self.chbre.capitalize() if nom != self.nom() : self.nom(nom)