#!/bin/bash /usr/scripts/python.sh # -*- coding: utf-8 -*- """ Génération de la configuration d'un switch. Attention, cette version n'a pas encore été totalement testée. procédure de configuration initiale : * mot de passe admin (password manager user-name ) * activation du ssh (crypto key generate ssh) * copie fichier de conf pour les reconfiguration copier le fichier de conf dans /cfg/startup-config Dans tous les cas FAIRE LE SNMP A LA MAIN (script hptools) """ from __future__ import print_function import sys, os import datetime import jinja2 import itertools from socket import gethostbyname import netaddr import argparse if '/usr/scripts' not in sys.path: sys.path.append('/usr/scripts') import gestion.secrets_new as secrets from lc_ldap.shortcuts import lc_ldap_readonly as make_ldap_conn import gestion.annuaires_pg as annuaire import gestion.config as config import lc_ldap.objets as ldap_classes from gestion.hptools import snmp GIGABIT_MODELS = ['J9021A', 'J9145A'] MIB_PRISE_VLAN = 'SNMPv2-SMI::enterprises.11.2.14.11.5.1.7.1.15.3.1.1' MIB_PRISE_MAC = 'SNMPv2-SMI::enterprises.11.2.14.11.5.1.9.4.2' ldap = make_ldap_conn() # états possibles V_TAGGED = 1 V_UNTAGGED = 2 V_NO = 3 # Vlans disponibles ENABLED_VLANS = ['adherent', 'adm', 'wifi', 'v6only', 'accueil', 'isolement', 'appts', 'event'] def vlan_id(name): """Vlan id of a name (filtre jinja)""" return config.vlans[name] def net_of_vlan_name(name): """Renvoie le nom du réseau (ip) d'un vlan donné. C'est utile pour lister les IPs de serveurs dhcp à autoriser (entre autres)""" net_name = { 'adherent': 'fil', 'appts': 'personnel-ens', 'v6only': 'gratuit', 'wifi': 'bornes', }.get(name, name) return config.NETs[net_name] class Port(object): """Un port de switch""" num = None # : uplink: None ou str uplink = None # : Liste de serveurs servers = None # : Liste de bornes bornes = None # : Liste de noms de chambres chambres = None # : Liste des macs vues seen_macs = None # : Liste des vlans vus seen_vlans = None def __init__(self, num): self.num = num self.servers = list() self.bornes = list() self.chambres = list() self.seen_macs = list() self.seen_vlans = list() def __str__(self): if self.uplink: return self.uplink else: labels = [] if self.servers: labels.append('Srv_' + ','.join(s['host'][0].value.split('.', 1)[0] for s in self.servers)) if self.chambres: labels.append('Ch_' + ','.join(self.chambres)) if self.bornes: labels.append('Wifi_' + ','.join(b['host'][0].value.split('.', 1)[0] for b in self.bornes)) return ",".join(labels) or "Inconnu" def __int__(self): return self.num def speed(self): """Full speed or 100Mb ?""" if any("cl" in nom for nom in self.chambres) or self.uplink or \ self.servers or self.bornes: return '' if any( adh.get('droits', None) for adh in self.adherents()): return '' return 'speed-duplex auto-10-100' def flowcontrol(self): """Est-ce que le flowcontrol est activé sur ce port ?""" if self.uplink or self.servers: return 'no flow-control' else: return 'flow-control' def is_trusted(self): """Est-ce une prise que l'on maîtrise ?""" return self.uplink or self.servers def vlan_member(self, vlan): """Renvoie V_TAGGED, V_UNTAGGED ou V_NO suivant le ``vlan`` (str) demandé""" if self.servers: if vlan == 'adm': return V_UNTAGGED else: return V_NO if self.uplink: # TODO retirer ce hack dégueux: tous les switchs devraient tout # tagguer, même le vlan adhérent if vlan == 'adherent': return V_UNTAGGED else: return V_TAGGED # Précisons tout de suite qu'adm ne va pas plus loin elif vlan == 'adm': return V_NO elif self.bornes: if vlan in ['wifi', 'accueil', 'isolement', 'v6only', 'appts', 'event']: return V_TAGGED # Cas d'une borne dans une chambre: l'adherent doit pouvoir # se connecter elif vlan == 'adherent' and self.chambres: return V_UNTAGGED else: return V_NO # C'est donc une chambre d'adherent sans borne, il y aura # l'auth radius else: return V_NO def radius_auth(self): """Doit-on faire de l'auth radius ?""" return not self.uplink and not self.servers and not self.bornes def adherents(self): """Adhérents sur la prise""" filtre = u'(|%s)' % (''.join('(chbre=%s)' % c for c in self.chambres)) return ldap.search(filtre) def num_mac(self): """Renvoie le nombre de macs autorisées. Ne devrait pas être appelée si c'est une prise d'uplink ou de bornes """ assert(not self.bornes and not self.uplink) num = 2 # Si c'est un club, on peut supposer qu'il a besoin de beaucoup # de machines (krobot etc) if any("cl" in nom for nom in self.chambres): num += 6 else: # Si c'est une chambre d'adhérent, on rajoute le nombre de machines # filaires # TODO cette config serait à faire régulièrement # TODO serait-ce plus rapide de tout chercher d'abord ? for adh in self.adherents(): num += len(adh.machines()) return num class PortList(list): """Liste de ports""" def __str__(self): """ transforme une liste de prises en une chaine pour le switch exemple : 1, 3, 4, 5, 6, 7, 9, 10, 11, 12 => 1,3-7,9-12 """ liste = list(int(x) for x in self) liste.sort() sortie = [] groupe = [-99999, -99999] for x in itertools.chain(liste, [99999]): if x > groupe[1]+1: # Nouveau groupe ! if groupe[0] == groupe[1]: sortie.append('%d' % groupe[1]) else: sortie.append('%d-%d' % tuple(groupe)) groupe[0] = x groupe[1] = x return ','.join(sortie[1:]) def filter_vlan(self, vlan): """Prend un ``vlan`` et renvoie deux PortList, la première avec les ports taggués, la seconde pour les ports untaggués""" tagged = PortList() untagged = PortList() for port in self: assign = port.vlan_member(vlan) if assign == V_TAGGED: tagged.append(port) elif assign == V_UNTAGGED: untagged.append(port) return (tagged, untagged) def get_port_dict(switch): """Renvoie le dictionnaire prise->objet Port""" # Build ports ! ports = {} for num in xrange(1, 1+int(switch['nombrePrises'][0].value)): ports[num] = Port(num) bat, sw_num = get_bat_num(unicode(switch['host'][0])) # Remplit les machines ayant une prise spécifiée # (normalement uniquement les serveurs et les bornes) for machine in ldap.search(u'prise=%s%i*' % (bat, sw_num)): port = ports[int(machine['prise'][0].value[2:])] classe = machine['objectClass'][0].value if classe == 'machineCrans': port.servers.append(machine) elif classe == 'borneWifi': port.bornes.append(machine) # On remplit les chambres for prise, chbres in annuaire.reverse(bat).iteritems(): # TODO rajouter un arg à reverse if int(prise[0]) != int(sw_num): continue port = ports[int(prise[1:])] # (below) beware: type(num) == str (ex: 302g) port.chambres += [bat.upper() + num for num in chbres] # Remplit les uplinks for num_prise, label in annuaire.uplink_prises[bat].iteritems(): if num_prise/100 != int(sw_num): continue port = ports[num_prise % 100] port.uplink = label return ports def fill_port_infos(hostname, port_dict): """Rajoute des infos sur les ports d'un switch""" conn = snmp(hostname, version='1', community='public') prise_vlan = conn.walk(MIB_PRISE_VLAN, bin_comp=True) for res in prise_vlan: res = res.split('.') port = int(res[-2]) vlan = int(res[-3]) port_dict[port].seen_vlans.append(vlan) prise_mac = conn.walk(MIB_PRISE_MAC, bin_comp=True) for mib in prise_mac: port = int(mib.split('.')[-8]) mib = mib.split('.') mac = ':'.join('%02x' % int(mib[i]) for i in xrange(-7, -1)) port_dict[port].seen_macs.append(mac) def check_conf_ldap(hostname): """Vérifie la conf du switch, la base ldap et les macs/prises associées""" bat, sw_num = get_bat_num(hostname) switch = ldap.search(u'host=bat%s-%d.adm.crans.org' % (bat, sw_num))[0] port_dict = get_port_dict(switch) fill_port_infos(hostname, port_dict) for port in port_dict.itervalues(): print("* Checking port %d (%s)" % (port.num, port)) # Nombres de vlans pr_nb_vlans = len(port.seen_vlans) th_nb_vlans = sum( port.vlan_member(vname) != V_NO for vname in ENABLED_VLANS ) if not th_nb_vlans and port.radius_auth(): th_nb_vlans = 1 if port.bornes and port.vlan_member('adherent') == V_NO: th_nb_vlans += 1 if th_nb_vlans != pr_nb_vlans: print(" Wrong vlan number (%d,%d)" % (th_nb_vlans, pr_nb_vlans)) print(port.seen_vlans) print(list( vname for vname in ENABLED_VLANS if port.vlan_member(vname) != V_NO )) # Les macs if port.uplink: if len(port.seen_macs) < 20: print(" Uplink but few macs (%d)" % len(port.seen_macs)) elif port.radius_auth(): # On vérifie que c'est la bonne chambre th_prises_set = set() for mac in set(port.seen_macs): res = ldap.search(u'macAddress=%s' % mac) if not res: continue chbre = unicode(res[0].proprio().get('chbre', ['EXT'])[0]) th_prise = chbre[0].lower() try: th_prise += annuaire.chbre_prises(chbre[0], chbre[1:]) except annuaire.ChbreNotFound: # La chambre est inconnue -> drop continue th_prises_set.add(th_prise) pr_prise = bat.lower() + '%d%02d' % (sw_num, port.num) if th_prises_set and pr_prise not in th_prises_set: print(" Aucune machine de chbre. Candidats: %r" % th_prises_set) else: for mac in set(port.seen_macs): res = ldap.search(u'macAddress=%s' % mac) if not res: print(" Unknown mac %s" % mac) continue machine = res[0] owner = machine.proprio() if isinstance(owner, ldap_classes.AssociationCrans): the = unicode(machine.get('prise', ['N/A'])[0]) the = the.lower() pra = bat.lower() + '%d%02d' % (sw_num, port.num) if the != pra: print(" Machine %s sur mauvaise prise (%s,%s)" % (machine, the, pra)) fix_prise(machine, pra) elif isinstance(machine, ldap_classes.machineWifi): if not port.bornes: print(" Machine %s sur prise sans borne ?" % machine) def fix_prise(machine, prise): """Répare la base en remplaçant la prise de la machine par ce qui est conseillé en paramètre""" opt = "yN" old_prise = unicode(machine.get('prise', ['N/A'])[0]) print("Remplacer prise de %s par %s (ancienne valeur: %s) ?" % (machine, prise, old_prise), "[%s]" % opt) while True: char = raw_input() if char in opt.lower(): break print("[%s]" % opt) if char == 'y': if 'w' not in machine.mode: machine = ldap.search(u'%s' % machine.dn.split(',')[0], mode='rw')[0] with machine: machine['prise'] = unicode(prise) machine.history_gen() machine.save() print("Done !") def get_bat_num(hostname): """Renvoie un tuple (bat, num) où bat est la lettre du bâtiment et num l'entier numéro du switch""" return (hostname[3].lower(), int(hostname[5])) def conf_switch(hostname): """Affiche la configuration d'un switch""" bat, sw_num = get_bat_num(hostname) switch = ldap.search(u'host=bat%s-%d.adm.crans.org' % (bat, sw_num))[0] tpl_env = jinja2.Environment(loader=jinja2.FileSystemLoader(os.path.dirname(__file__))) ##for info: tpl_env.filters['vlan_id'] = vlan_id data = { 'switch': switch, 'hostname': 'bat%s-%d' % (bat, sw_num), 'bat': bat.upper(), 'date_gen': datetime.datetime.now(), # TODO fill that depuis bcfg2 ou whatever 'radius_servers': ['10.231.136.72', '10.231.136.9' ], 'radius_key': secrets.get('radius_key'), 'ntp_servers': ['10.231.136.98'], 'log_servers': ['10.231.136.38'], # dhcp et isc (secondaire) sont les deux seuls serveurs 'dhcp_rid_servers': [34, 160], # vlans activés (cf data.vlans) 'vlan_names': ENABLED_VLANS, # réseaux où on fait du dhcp snooping (cf data.NETs) 'dhcp_snooping_vlan_names': ['adherent', 'wifi', 'accueil', 'isolement', 'v6only', 'appts'], } for com in switch['info']: if com.value.startswith(';'): data['config_header'] = com.value.encode('utf-8') break else: print(((u'Impossible de déterminer le header à utiliser pour %s;' + u"Utilisation d'une valeur par défaut (remplir ldap)") % switch).encode('utf-8'), file=sys.stderr) data['config_header']= '; J4899A Configuration Editor; Created on release #H.10.50' imodel = data['config_header'].split(' ', 2)[1] if imodel == "J9145A": data['module_type'] = 'module 1 type J9145A' # Pas de snooping pour les 2810 if "2810" in switch['info'][0].value: data['dhcp_snooping_vlan_names'] = [] else: data['dhcp_servers'] = [] for vname in data['dhcp_snooping_vlan_names']: for rid in data['dhcp_rid_servers']: first = netaddr.IPNetwork(net_of_vlan_name(vname)[0]).first data['dhcp_servers'].append(str(netaddr.IPAddress(first + rid))) # Switch avec des ports gigabit uniquement if imodel in GIGABIT_MODELS: data['gigabit'] = True # Build ports ! ports_list = PortList(get_port_dict(switch).itervalues()) data['ports'] = ports_list data['vlans'] = [] # On remplit les objets vlans (nom, id, tagged, untagged etc) vlans = {} for name in data['vlan_names']: vlan = { 'name': name, 'id': config.vlans[name], } for assign, p in itertools.groupby(ports_list, lambda p: p.vlan_member(name)): attr = { V_TAGGED: 'tagged', V_UNTAGGED: 'untagged', V_NO: 'no'}[assign] vlan.setdefault(attr, PortList()) vlan[attr].extend(p) if name == 'adm': vlan['ip_cfg'] = (gethostbyname(hostname), '255.255.255.0') if name == 'adherent': # igmp snooping (multicast) mais nous ne sommes pas querier vlan['extra'] = 'ip igmp\nno ip igmp querier' vlans[name] = vlan data['vlans'].append(vlan) # Quelles sont les prises de confiance ? data['trusted'] = str(PortList(p for p in ports_list if p.is_trusted())) data['non_trusted'] = str(PortList(p for p in ports_list if not p.is_trusted())) # On désactive le multicast mdns if "2910" in switch['info'][0].value: data['drop_mdns'] = True else: data['drop_mdns'] = False # On render : return tpl_env.get_template('switch_conf.tpl').render(**data).encode('utf-8') if __name__ == "__main__": parser = argparse.ArgumentParser(description="Génération de la conf d'un "+ "switch.") parser.add_argument('hostname', help="Nom du switch à regénérer " + "(ex: batg-4)") parser.add_argument('-c', '--check', action='store_true', default=False, help="Vérifie la conf par rapport aux macs et vlans effectivement" +\ "présents sur le switch") options = parser.parse_args(sys.argv[1:]) if options.check: check_conf_ldap(options.hostname) else: print(conf_switch(options.hostname))