#!/bin/bash /usr/scripts/python.sh # -*- coding: utf-8 -*- """Outils principaux pour la description d'un switch""" import socket import netaddr from .port import HPSwitchPort from .snmp import SNMPClient from .mac import format_mac, MACFactory from .defaults import OPERSTATUS, ADMINSTATUS, ETHSPEED, REV_ETHSPEED class HPSwitchFactory(object): """Factory stockant les switches""" switches = {} @classmethod def get_switch(cls, switch): """Récupère un switch dans la factory""" return cls.switches.get(switch, None) @classmethod def register_switch(cls, switch, switch_object): """Enregistre un switch dans la factory""" cls.switches[switch] = switch_object @classmethod def get_switches(cls): """Récupère l'ensemble des switches dans la Factory""" return cls.switches class HPSwitch(object): """Classe décrivant un switch HP.""" def __new__(cls, switch): """Vérifie d'abord si un switch n'existe pas déjà. L'idée est d'éviter de manipuler en parallèle des objets dont les données deviendraient incohérentes. Donc, lorsqu'on instancie un switch, celui-ci est référencé par son hostname s'il existe, ou par le nom donné sinon.""" try: __switch = socket.gethostbyname_ex(switch)[0] except socket.gaierror: __switch = switch switch_object = HPSwitchFactory.get_switch(__switch) if switch_object is None: switch_object = super(HPSwitch, cls).__new__(cls) HPSwitchFactory.register_switch(switch, switch_object) return switch_object def __init__(self, switch): """Récupère le nom, et un client snmp""" self.switch = switch self.client = SNMPClient(self.switch) self.ports = {} self.__build_ports_list() def version(self): """Retourne les données relatives à la version du firmware""" return self.client.walk('sysDescr')[0]['val'] def name(self): """Retourne un nom "standardisé" du switch (aka g0, c4, …)""" return self.switch.split(".", 1)[0].replace('bat', '').replace('-', '').lower() def __build_ports_list(self): """Construit via une requête SNMP la liste des ports pour le switch.""" __ret = self.client.walk('hpSwitchPhysicalPortEntry.2') for entry in __ret: self.ports[int(entry['iid'])] = HPSwitchPort(int(entry['iid']), self, int(entry['val'])) def flush_port(self, num): """Vide un port de ses MACs""" if self.ports.get(num, None) is not None: self.ports[num].flush_macs() def flush_ports(self): """Vide un port de ses MACs""" for port in self.ports.itervalues(): port.flush_macs() def fetch_all_ports(self): """Récupère les données depuis les ports du switch, et renvoie ce qui a un intérêt""" self.flush_ports() __ret = self.client.walk('hpSwitchPortFdbAddress') __ret = [ (int(ret['iid'].split('.')[0]), ret['val']) for ret in __ret ] return __ret def __populate_port(self, num): """Peuple le port numéro num""" if self.ports.get(num, None) is not None: _ = self.ports[num].get_macs(update=True) def __populate_all_ports(self): """Peuple tous les ports.""" __ret = self.fetch_all_ports() for (iid, val) in __ret: if self.ports.get(iid, None) is not None: self.ports[iid].append_mac(val) def show_port_macs(self, num, populate=False): """Affiche les macs d'un port donné. Si populate vaut True, fait le populate et affiche le bousin.""" if populate: self.__populate_port(num) return ( [ mac.value for mac in self.ports[num].get_macs() ], self.ports[num].name() ) def show_ports_macs(self, populate=False): """Affiche les ports et macs associées. Si populate vaut True, fait le populate et affiche le bousin.""" if populate: self.__populate_all_ports() return { port.name(): [ mac.value for mac in port.get_macs() ] for port in self.ports.itervalues() } def find_mac(self, mac, populate=False): """Cherche une mac sur le switch""" mac = format_mac(mac) if populate: self.__populate_all_ports() # On boucle sur les macs dans la factory __mac = MACFactory.get_mac(mac) if __mac is not None: # On boucle sur les parents (des ports) à la recherche # de ceux qui appartiennent au switch courant. __parents = [] for parent in __mac.parents.itervalues(): if parent.parent == self: __parents.append(parent) # Si on en a trouvé, on les retourne avec la mac. if __parents: return (__mac, __parents) return None def __flush_multicast(self): """Vide les infos de multicast sur les ports""" for port in self.ports.itervalues(): port.flush_multicast() def __update_multicast(self): """Fait la mise à jour des infos de multicast sur chaque port""" # On commence par vider. self.__flush_multicast() # On fait un walk. data = self.client.walk('hpIgmpStatsPortIndex2') # Le dico est du format standard. L'ip est au milieu du champ iid, et # le port est dans val. for data_dict in data: # En gros, le champ iid ressemble à 1.239.255.255.255.6, où 1 est un truc # que je connais pas, et 6 le numéro du port. data_ip = ".".join(data_dict['iid'].split('.')[1:5]) igmp_ip, igmp_port = netaddr.IPAddress(data_ip), int(data_dict['val']) # Y a plus qu'à stocker if self.ports.get(igmp_port, None) is not None: self.ports[igmp_port].append_multicast(igmp_ip) def get_multicast(self, multi_ip=None, update=True): """Permet de récupérer les informations sur les ports pour lesquels le multicast est actif.""" __output = {} if multi_ip is not None: multi_ip = netaddr.IPAddress(multi_ip) # En cas d'update if update: self.__update_multicast() # On construit le résultat de façon identique dans le cas # update ou non. for port in self.ports.itervalues(): for multicast_ip in port.multicast: __output.setdefault(multicast_ip, []).append(port) # On filtre par l'ip si besoin. if multi_ip is not None: __output = __output[multi_ip] return __output def nb_prises(self): """Retourne le nombre de prises du switch. On pourrait aussi faire un self.client.walk('mib-2.17.1.2') et récupérer la clef "val" du premier élément de la liste retournée.""" return len(self.ports) def __update_oper_status(self): """Récupère le statut des ports du switch.""" __oper = self.client.walk('ifOperStatus') for dico in __oper: port, state = dico['iid'], dico['val'] if self.ports.get(int(port), None) is not None: self.ports[int(port)].oper = state def __update_admin_status(self): """Récupère l'état d'un port du switch.""" __admin = self.client.walk('ifAdminStatus') for dico in __admin: port, state = dico['iid'], dico['val'] if self.ports.get(int(port), None) is not None: self.ports[int(port)].admin = state def is_enabled(self, prise, update=True): """Vérifie si la prise est activée""" if update: self.__update_admin_status() if self.ports.get(prise, None) is not None: return ADMINSTATUS[self.ports[prise].admin] else: return { port : ADMINSTATUS[port.admin] for port in self.ports.itervalues() } def is_up(self, prise, update=True): """Vérifie si la prise est allumée actuellement (en gros, s'il y a une mac dessus)""" if update: self.__update_oper_status() if self.ports.get(prise, None) is not None: return OPERSTATUS[self.ports[prise].oper] else: return { port : OPERSTATUS[port.oper] for port in self.ports.itervalues() } def set_enabled(self, prise, enabled=True): """Met le port à enabled/disabled""" if enabled: val = '1' else: val = '2' command_dict = { 'iid': str(prise), 'tag': 'ifAdminStatus', 'val': val, } return self.client.set([ command_dict ]) def toggle_enabled(self, prise): """Alterne up/down""" if self.is_enabled(prise) == ADMINSTATUS[1]: enabled = False else: enabled = True return self.set_enabled(prise, enabled) def get_port_alias(self, prise): """Retourne le nom du port""" if self.ports.get(prise, None) is None: return "" return self.ports[prise].get_alias() def update_ports_aliases(self): """Récupère les aliases des ports et les affecte""" data = self.client.walk('ifAlias') for data_dict in data: if self.ports.get(int(data_dict['iid']), None) is not None: self.ports[int(data_dict['iid'])].set_alias(data_dict['val']) def set_port_alias(self, prise, alias): """Affecte un nom au port""" if self.ports.get(prise, None) is not None: self.client.set([ { 'iid': str(prise), 'tag': 'ifAlias', 'val': alias, } ]) self.ports[prise].set_alias(alias) def get_eth_speed(self, prise): """Retourne le nom du port""" if self.ports.get(prise, None) is None: return "" return REV_ETHSPEED.get(self.ports[prise].get_eth(), 'unknown') def update_eth_speed(self): """Met à jour la vitesse de tous les ports""" data = self.client.walk('hpSwitchPortFastEtherMode') for data_dict in data: if self.ports.get(int(data_dict['iid']), None) is not None: self.ports[int(data_dict['iid'])].set_eth(data_dict['val']) def set_eth_speed(self, prise, rate=0, dtype='AUTO'): """Affecte un nom au port""" # On affecte une config spécifique en vitesse if self.ports.get(prise, None) is not None: self.client.set([ { 'iid': str(prise), 'tag': 'hpSwitchPortFastEtherMode', 'val': ETHSPEED[dtype][int(rate)], } ]) self.ports[prise].set_eth(ETHSPEED[dtype][int(rate)]) def get_vlans(self, prise=None, update=True): """Récupère les vlans actifs sur une prise""" # Si mise à jour if update: # On fait le ménage for port in self.ports.itervalues(): port.purge_vlans() # Et on recommence data = self.client.walk('enterprises.11.2.14.11.5.1.7.1.15.3.1.1') for data_dict in data: vlan, iid = [int(res) for res in data_dict['iid'].split('.')] if self.ports.get(iid, None) is not None: self.ports[iid].add_vlan(vlan) # Si la prise vaut none, on file tout, sinon juste elle. if prise is not None: if self.ports.get(prise, None) is not None: return self.ports[prise].get_vlans() else: return { iid: port.get_vlans() for (iid, port) in self.ports.iteritems() }