scripts/gestion/hptools2/switch.py
2015-10-11 14:46:21 +02:00

413 lines
13 KiB
Python

#!/bin/bash /usr/scripts/python.sh
# -*- coding: utf-8 -*-
#
# This file is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This file is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Street #330, Boston, MA 02111-1307, USA.
"""Outils principaux pour la description d'un switch"""
import socket
import netaddr
from .port import HPSwitchPort
from .snmp import SNMPClient
from .mac import format_mac, MACFactory
from .defaults import OPERSTATUS, ADMINSTATUS, ETHSPEED, REV_ETHSPEED
class SwitchNotFound(Exception):
"""Erreur basique quand le switch n'est pas trouvé"""
pass
class HPSwitchFactory(object):
"""Factory stockant les switches"""
switches = {}
@classmethod
def get_switch(cls, switch):
"""Récupère un switch dans la factory"""
return cls.switches.get(switch, None)
@classmethod
def register_switch(cls, switch, switch_object):
"""Enregistre un switch dans la factory"""
cls.switches[switch] = switch_object
@classmethod
def get_switches(cls):
"""Récupère l'ensemble des switches dans la Factory"""
return cls.switches
class HPSwitch(object):
"""Classe décrivant un switch HP."""
def __new__(cls, switch):
"""Vérifie d'abord si un switch n'existe pas déjà.
L'idée est d'éviter de manipuler en parallèle des objets
dont les données deviendraient incohérentes. Donc,
lorsqu'on instancie un switch, celui-ci est référencé
par son hostname s'il existe, ou par le nom donné sinon."""
try:
__switch = socket.gethostbyname_ex(switch)[0]
except socket.gaierror:
try:
netaddr.IPAddress(switch)
except netaddr.AddrFormatError:
raise SwitchNotFound("Switch %r non trouvé." % (switch,))
switch_object = HPSwitchFactory.get_switch(__switch)
if switch_object is None:
switch_object = super(HPSwitch, cls).__new__(cls)
HPSwitchFactory.register_switch(switch, switch_object)
return switch_object
def __init__(self, switch):
"""Récupère le nom, et un client snmp"""
self.switch = switch
self.client = SNMPClient(self.switch)
self.ports = {}
self.__build_ports_list()
def version(self):
"""Retourne les données relatives à la version du firmware"""
return self.client.walk('sysDescr')[0]['val']
def name(self):
"""Retourne un nom "standardisé" du switch
(aka g0, c4, …)"""
return self.switch.split(".", 1)[0].replace('bat', '').replace('-', '').lower()
def __build_ports_list(self):
"""Construit via une requête SNMP la liste des ports pour le switch."""
__ret = self.client.walk('hpSwitchPhysicalPortEntry.2')
for entry in __ret:
self.ports[int(entry['iid'])] = HPSwitchPort(int(entry['iid']), self, int(entry['val']))
def flush_port(self, num):
"""Vide un port de ses MACs"""
if self.ports.get(num, None) is not None:
self.ports[num].flush_macs()
def flush_ports(self):
"""Vide un port de ses MACs"""
for port in self.ports.itervalues():
port.flush_macs()
def fetch_all_ports(self):
"""Récupère les données depuis les ports du switch, et
renvoie ce qui a un intérêt"""
self.flush_ports()
__ret = self.client.walk('hpSwitchPortFdbAddress')
__ret = [
(int(ret['iid'].split('.')[0]), ret['val'])
for ret in __ret
]
return __ret
def __populate_port(self, num):
"""Peuple le port numéro num"""
if self.ports.get(num, None) is not None:
_ = self.ports[num].get_macs(update=True)
def __populate_all_ports(self):
"""Peuple tous les ports."""
__ret = self.fetch_all_ports()
for (iid, val) in __ret:
if self.ports.get(iid, None) is not None:
self.ports[iid].append_mac(val)
def show_port_macs(self, num, populate=False):
"""Affiche les macs d'un port donné.
Si populate vaut True, fait le populate et affiche le
bousin."""
if populate:
self.__populate_port(num)
return (
[
mac.value
for mac in self.ports[num].get_macs()
],
self.ports[num].name()
)
def show_ports_macs(self, populate=False):
"""Affiche les ports et macs associées.
Si populate vaut True, fait le populate et affiche le
bousin."""
if populate:
self.__populate_all_ports()
return {
port.name(): [
mac.value
for mac in port.get_macs()
]
for port in self.ports.itervalues()
}
def find_mac(self, mac, populate=False):
"""Cherche une mac sur le switch"""
mac = format_mac(mac)
if populate:
self.__populate_all_ports()
# On boucle sur les macs dans la factory
__mac = MACFactory.get_mac(mac)
if __mac is not None:
# On boucle sur les parents (des ports) à la recherche
# de ceux qui appartiennent au switch courant.
__parents = []
for parent in __mac.parents.itervalues():
if parent.parent == self:
__parents.append(parent)
# Si on en a trouvé, on les retourne avec la mac.
if __parents:
return (__mac, __parents)
return None
def __flush_multicast(self):
"""Vide les infos de multicast sur les ports"""
for port in self.ports.itervalues():
port.flush_multicast()
def __update_multicast(self):
"""Fait la mise à jour des infos de multicast sur chaque port"""
# On commence par vider.
self.__flush_multicast()
# On fait un walk.
data = self.client.walk('hpIgmpStatsPortIndex2')
# Le dico est du format standard. L'ip est au milieu du champ iid, et
# le port est dans val.
for data_dict in data:
# En gros, le champ iid ressemble à 1.239.255.255.255.6, où 1 est un truc
# que je connais pas, et 6 le numéro du port.
data_ip = ".".join(data_dict['iid'].split('.')[1:5])
igmp_ip, igmp_port = netaddr.IPAddress(data_ip), int(data_dict['val'])
# Y a plus qu'à stocker
if self.ports.get(igmp_port, None) is not None:
self.ports[igmp_port].append_multicast(igmp_ip)
def get_multicast(self, multi_ip=None, update=True):
"""Permet de récupérer les informations sur les ports
pour lesquels le multicast est actif."""
__output = {}
if multi_ip is not None:
multi_ip = netaddr.IPAddress(multi_ip)
# En cas d'update
if update:
self.__update_multicast()
# On construit le résultat de façon identique dans le cas
# update ou non.
for port in self.ports.itervalues():
for multicast_ip in port.multicast:
__output.setdefault(multicast_ip, []).append(port)
# On filtre par l'ip si besoin.
if multi_ip is not None:
__output = __output[multi_ip]
return __output
def nb_prises(self):
"""Retourne le nombre de prises du switch.
On pourrait aussi faire un self.client.walk('mib-2.17.1.2')
et récupérer la clef "val" du premier élément de la liste
retournée."""
return len(self.ports)
def __update_oper_status(self):
"""Récupère le statut des ports du switch."""
__oper = self.client.walk('ifOperStatus')
for dico in __oper:
port, state = dico['iid'], dico['val']
if self.ports.get(int(port), None) is not None:
self.ports[int(port)].oper = state
def __update_admin_status(self):
"""Récupère l'état d'un port du switch."""
__admin = self.client.walk('ifAdminStatus')
for dico in __admin:
port, state = dico['iid'], dico['val']
if self.ports.get(int(port), None) is not None:
self.ports[int(port)].admin = state
def is_enabled(self, prise, update=True):
"""Vérifie si la prise est activée"""
if update:
self.__update_admin_status()
if self.ports.get(prise, None) is not None:
return ADMINSTATUS[self.ports[prise].admin]
else:
return {
port : ADMINSTATUS[port.admin]
for port in self.ports.itervalues()
}
def is_up(self, prise, update=True):
"""Vérifie si la prise est allumée actuellement
(en gros, s'il y a une mac dessus)"""
if update:
self.__update_oper_status()
if self.ports.get(prise, None) is not None:
return OPERSTATUS[self.ports[prise].oper]
else:
return {
port : OPERSTATUS[port.oper]
for port in self.ports.itervalues()
}
def set_enabled(self, prise, enabled=True):
"""Met le port à enabled/disabled"""
if enabled:
val = '1'
else:
val = '2'
command_dict = {
'iid': str(prise),
'tag': 'ifAdminStatus',
'val': val,
}
return self.client.set([
command_dict
])
def toggle_enabled(self, prise):
"""Alterne up/down"""
if self.is_enabled(prise) == ADMINSTATUS[1]:
enabled = False
else:
enabled = True
return self.set_enabled(prise, enabled)
def get_port_alias(self, prise):
"""Retourne le nom du port"""
if self.ports.get(prise, None) is None:
return ""
return self.ports[prise].get_alias()
def update_ports_aliases(self):
"""Récupère les aliases des ports et les affecte"""
data = self.client.walk('ifAlias')
for data_dict in data:
if self.ports.get(int(data_dict['iid']), None) is not None:
self.ports[int(data_dict['iid'])].set_alias(data_dict['val'])
def set_port_alias(self, prise, alias):
"""Affecte un nom au port"""
if self.ports.get(prise, None) is not None:
self.client.set([
{
'iid': str(prise),
'tag': 'ifAlias',
'val': alias,
}
])
self.ports[prise].set_alias(alias)
def get_eth_speed(self, prise):
"""Retourne le nom du port"""
if self.ports.get(prise, None) is None:
return ""
return REV_ETHSPEED.get(self.ports[prise].get_eth(), 'unknown')
def update_eth_speed(self):
"""Met à jour la vitesse de tous les ports"""
data = self.client.walk('hpSwitchPortFastEtherMode')
for data_dict in data:
if self.ports.get(int(data_dict['iid']), None) is not None:
self.ports[int(data_dict['iid'])].set_eth(data_dict['val'])
def set_eth_speed(self, prise, rate=0, dtype='AUTO'):
"""Affecte un nom au port"""
# On affecte une config spécifique en vitesse
if self.ports.get(prise, None) is not None:
self.client.set([
{
'iid': str(prise),
'tag': 'hpSwitchPortFastEtherMode',
'val': ETHSPEED[dtype][int(rate)],
}
])
self.ports[prise].set_eth(ETHSPEED[dtype][int(rate)])
def get_vlans(self, prise=None, update=True):
"""Récupère les vlans actifs sur une prise"""
# Si mise à jour
if update:
# On fait le ménage
for port in self.ports.itervalues():
port.purge_vlans()
# Et on recommence
data = self.client.walk('enterprises.11.2.14.11.5.1.7.1.15.3.1.1')
for data_dict in data:
vlan, iid = [int(res) for res in data_dict['iid'].split('.')]
if self.ports.get(iid, None) is not None:
self.ports[iid].add_vlan(vlan)
# Si la prise vaut none, on file tout, sinon juste elle.
if prise is not None:
if self.ports.get(prise, None) is not None:
return self.ports[prise].get_vlans()
else:
return {
iid: port.get_vlans()
for (iid, port) in self.ports.iteritems()
}