# ⁻*- coding: utf-8 -*- # # Ce fichier contient la définition de plusieurs fonctions d'interface à freeradius # qui peuvent être appelées (suivant les configurations) à certains moment de # l'éxécution. # import logging import netaddr import radiusd # Module magique freeradius (radiusd.py is dummy) import ldap import os import lc_ldap.shortcuts from lc_ldap.crans_utils import escape as escape_ldap import lc_ldap.crans_utils import lc_ldap.objets import gestion.config.config as config from gestion.gen_confs.trigger import trigger_generate_cochon as trigger_generate import annuaires_pg TEST_SERVER = bool(os.getenv('DBG_FREERADIUS', False)) USERNAME_SUFFIX_WIFI = '.wifi.crans.org' USERNAME_SUFFIX_FIL = '.crans.org' ## -*- Logging -*- # Initialisation d'un logger pour faire des stats etc # pour l'instant, on centralise tout sur thot en mode debug logger = logging.getLogger('auth.py') logger.setLevel(logging.DEBUG) formatter = logging.Formatter('%(name)s: [%(levelname)s] %(message)s') handler = logging.handlers.SysLogHandler(address = '/dev/log') try: handler.addFormatter(formatter) except AttributeError: handler.formatter = formatter logger.addHandler(handler) ## -*- Types de blacklists -*- #: reject tout de suite bl_reject = [u'bloq'] #: place sur le vlan isolement bl_isolement = [u'virus', u'autodisc_virus', u'autodisc_p2p', u'ipv6_ra'] # TODO carte_etudiant: dépend si sursis ou non (regarder lc_ldap) # TODO LOGSSSSS #: place sur accueil bl_accueil = [] # Ces blacklists ont des effets soft (portail captif port 80) #bl_accueil = [u'carte_etudiant', u'chambre_invalide', u'paiement'] ## -*- Decorateurs -*- # À appliquer sur les fonctions qui ont besoin d'une conn ldap use_ldap_admin = lc_ldap.shortcuts.with_ldap_conn(retries=2, delay=5, constructor=lc_ldap.shortcuts.lc_ldap_admin) use_ldap = lc_ldap.shortcuts.with_ldap_conn(retries=2, delay=5, constructor=lc_ldap.shortcuts.lc_ldap_anonymous) def radius_event(f): """Décorateur pour les fonctions d'interfaces avec radius. Une telle fonction prend un uniquement argument, qui est une liste de tuples (clé, valeur) et renvoie un triplet dont les composantes sont : * le code de retour (voir radiusd.RLM_MODULE_* ) * un tuple de couples (clé, valeur) pour les valeurs de réponse (accès ok et autres trucs du genre) * un tuple de couples (clé, valeur) pour les valeurs internes à mettre à jour (mot de passe par exemple) Voir des exemples plus complets ici: https://github.com/FreeRADIUS/freeradius-server/blob/master/src/modules/rlm_python/ On se contente avec ce décorateur (pour l'instant) de convertir la liste de tuples en entrée en un dictionnaire.""" def new_f(auth_data): data = dict() for (key, value) in auth_data or []: # Beware: les valeurs scalaires sont entre guillemets # Ex: Calling-Station-Id: "une_adresse_mac" data[key] = value.replace('"', '') try: return f(data) except Exception as e: logger.error(repr(e) + ' on data ' + repr(auth_data)) raise return new_f @use_ldap def get_machines(data, conn, is_wifi=True, proprio=None): """Obtient la liste de machine essayant actuellement de se connecter""" if is_wifi: suffix = USERNAME_SUFFIX_WIFI base = u'(objectclass=machineWifi)' else: suffix = USERNAME_SUFFIX_FIL base = u'(objectclass=machineFixe)' mac = data.get('Calling-Station-Id', None) if mac: try: mac = lc_ldap.crans_utils.format_mac(mac.decode('ascii', 'ignore')) except: radiusd.radlog(radiusd.L_ERR, 'Cannot format MAC !') mac = None username = data.get('User-Name', None) if username: username = escape_ldap(username.decode('ascii', 'ignore')) if username.endswith(suffix): username = username[:-len(suffix)] if mac is None: radiusd.radlog(radiusd.L_ERR, 'Cannot read client MAC from AP !') if username is None: radiusd.radlog(radiusd.L_ERR, 'Cannot read client User-Name !') # Liste de recherches ldap à essayer, dans l'ordre # ** Case 1: Search by mac res = conn.search(u'(&%s(macAddress=%s))' % (base, mac)) if res: return res # Si proprio fourni, on ne cherche désormais que parmi ses machines # (opt est le dico des params optionnels de search) opt = {} if proprio is not None: opt['dn'] = proprio.dn # Filaire: pas de username. if not is_wifi: username = '*' # ** Case 2: unregistered mac : il nous faut au moins un username ou être sûr # du propriétaire if username != '*' or proprio is not None: res = conn.search(u'(&%s(macAddress=)(host=%s%s))' % (base, username, suffix), **opt) return res def get_prise_chbre(data): """Extrait la prise (filaire) et la chambre correspondante. Par convention, le nom d'une chambre commence (lettre du bâtiment) par une majuscule, tandis que la prise correspondante commence par une miniscule. """ ## Filaire: NAS-Identifier => contient le nom du switch (batm-3.adm.crans.org) ## NAS-Port => port du switch (ex 42) # Lettre du bâtiment (C, B, A, etc, en majuscule) bat_name = None # Numéro du switch bat_num = None # Port sur le switch port = None nas = data.get('NAS-Identifier', None) if nas: if nas.startswith('bat'): nas = nas.split('.', 1)[0] try: bat_name = nas[3].upper() bat_num = int(nas.split('-', 1)[1]) except IndexError, ValueError: pass port = data.get('NAS-Port', None) if port: port = int(port) if bat_num is not None and bat_name and port: prise = bat_name.lower() + "%01d%02d" % (bat_num, port) try: chbre = bat_name + annuaires_pg.reverse(bat_name, prise[1:])[0] except IndexError: chbre = None return prise, chbre return None @use_ldap_admin def register_mac(data, machine, conn): """Enregistre la mac actuelle sur une machine donnée.""" mac = data.get('Calling-Station-Id', None) if mac is None: radiusd.radlog(radiusd.L_ERR, 'Cannot find MAC') return mac = mac.decode('ascii', 'ignore').replace('"','') try: mac = lc_ldap.crans_utils.format_mac(mac).lower() except: radiusd.radlog(radiusd.L_ERR, 'Cannot format MAC !') return # TODO lc_ldap devrait posséder une fonction pour passer en rw depuis un ro machine = conn.search(dn=machine.dn, scope=ldap.SCOPE_BASE, mode='rw')[0] with machine: radiusd.radlog(radiusd.L_INFO, 'Registering mac %s' % mac) machine['macAddress'] = mac machine.history_add(u'auth.py', u'macAddress ( -> %s)' % mac) machine.validate_changes() machine.save() radiusd.radlog(radiusd.L_INFO, 'Mac set') radiusd.radlog(radiusd.L_INFO, 'Triggering komaz') trigger_generate('odlyd') radiusd.radlog(radiusd.L_INFO, 'done ! (triggered komaz)') @radius_event @use_ldap_admin @use_ldap def instantiate(p, *conns): """Utile pour initialiser les connexions ldap une première fois (otherwise, do nothing)""" logger.info('Instantiation') if TEST_SERVER: logger.info('DBG_FREERADIUS is enabled') @radius_event def authorize_wifi(data): """Section authorize pour le wifi (NB: le filaire est en accept pour tout le monde) Éxécuté avant l'authentification proprement dite. On peut ainsi remplir les champs login et mot de passe qui serviront ensuite à l'authentification (MschapV2/PEAP ou MschapV2/TTLS)""" items = get_machines(data) if not items: radiusd.radlog(radiusd.L_ERR, 'lc_ldap: Nobody found') return radiusd.RLM_MODULE_NOTFOUND if len(items) > 1: radiusd.radlog(radiusd.L_ERR, 'lc_ldap: Too many results (took first)') machine = items[0] proprio = machine.proprio() if isinstance(proprio, lc_ldap.objets.AssociationCrans): radiusd.radlog(radiusd.L_ERR, 'Crans machine trying to authenticate !') return radiusd.RLM_MODULE_INVALID for bl in machine.blacklist_actif(): if bl.value['type'] in bl_reject: return radiusd.RLM_MODULE_REJECT if not machine.get('ipsec', False): radiusd.radlog(radiusd.L_ERR, 'WiFi authentication but machine has no' + 'password') return radiusd.RLM_MODULE_REJECT password = machine['ipsec'][0].value.encode('ascii', 'ignore') # TODO: feed cert here return (radiusd.RLM_MODULE_UPDATED, (), ( ("Cleartext-Password", password), ), ) @radius_event def authorize_fil(data): """For now, do nothing. TODO: check bl_reject. TODO: check chap auth """ return (radiusd.RLM_MODULE_UPDATED, (), ( ("Auth-Type", "crans_fil"), ), ) @radius_event def post_auth_wifi(data): """Appelé une fois que l'authentification est ok. On peut rajouter quelques éléments dans la réponse radius ici. Comme par exemple le vlan sur lequel placer le client""" port, vlan_name, reason = decide_vlan(data, True) mac = data.get('Calling-Station-Id', None) log_message = '(wifi) %s -> %s [%s%s]' % \ (port, mac, vlan_name, (reason and u': ' + reason).encode('utf-8')) logger.info(log_message) radiusd.radlog(radiusd.L_AUTH, log_message) # Si NAS ayant des mapping particuliers, à signaler ici vlan_id = config.vlans[vlan_name] # WiFi : Pour l'instant, on ne met pas d'infos de vlans dans la réponse # les bornes wifi ont du mal avec cela if TEST_SERVER: return (radiusd.RLM_MODULE_UPDATED, ( ("Tunnel-Type", "VLAN"), ("Tunnel-Medium-Type", "IEEE-802"), ("Tunnel-Private-Group-Id", '%d' % vlan_id), ), () ) return radiusd.RLM_MODULE_OK @radius_event def post_auth_fil(data): """Idem, mais en filaire. """ port, vlan_name, reason = decide_vlan(data, False) mac = data.get('Calling-Station-Id', None) log_message = '(fil) %s -> %s [%s%s]' % \ (port, mac, vlan_name, (reason and u': ' + reason).encode('utf-8')) logger.info(log_message) radiusd.radlog(radiusd.L_AUTH, log_message) # Si NAS ayant des mapping particuliers, à signaler ici vlan_id = config.vlans[vlan_name] # Filaire return (radiusd.RLM_MODULE_UPDATED, ( ("Tunnel-Type", "VLAN"), ("Tunnel-Medium-Type", "IEEE-802"), ("Tunnel-Private-Group-Id", '%d' % vlan_id), ), () ) @use_ldap def decide_vlan(data, is_wifi, conn): """Décide du vlan non-taggué à assigner, et donne une raison à ce choix. Retourne un (port, vlan_name, reason) où * port = est une prise réseau / chambre (si filaire) "wifi" si wifi * vlan_name est un nom de vlan (cf config.py) * reason est un unicode explicant le choix """ # Switch de remplissage decision par défaut, port, hebergeurs if is_wifi: decision = 'wifi', u'' port = data.get('Called-Station-Id', '?') hebergeurs = [] else: decision = 'adherent', u'' prise, chbre = get_prise_chbre(data) port = "%s/%s" % (prise, chbre or 'Inconnue') if chbre: chbre = escape_ldap(chbre) hebergeurs = conn.search(u'(&(chbre=%s)(|(cid=*)(aid=*)))' % chbre) else: hebergeurs = [] # Prend la première machine candidat dans la base, ou exit items = get_machines(data, is_wifi=is_wifi, proprio=(hebergeurs+[None])[0]) if not items: return (port, 'accueil', u'Machine inconnue') machine = items[0] proprio = machine.proprio() # Avant de continuer, on assigne la mac à la machine candidat if '' in machine['macAddress']: register_mac(data, machine) if not machine['ipHostNumber']: decision = 'v6only', u'No IPv4' # TODO: condition plus générique: # netaddr.all_matching_cidr(machine['ipHostNumber'][0].value, # config.NETs_primaires['personnel-ens']) elif machine['ipHostNumber'][0].value in netaddr.IPNetwork('10.2.9.0/24'): # Cas des personnels logés dans les appartements de l'ENS decision = 'appts', u'Personnel ENS' # Application des blacklists for bl in machine.blacklist_actif(): if bl.value['type'] in bl_isolement: decision = 'isolement', unicode(bl) if bl.value['type'] in bl_accueil: decision = 'accueil', unicode(bl) # Filaire : protection anti-"squattage" if not is_wifi: # Si l'adhérent n'est pas membre actif, il doit se brancher depuis la # prise d'un autre adhérent à jour de cotisation force_ma = False is_ma = bool(proprio.get('droits', False)) if chbre is None: if is_ma: force_ma = True else: decision = "accueil", u"Chambre inconnue" elif 'cl' in chbre: # Pour les locaux clubs, il n'y a pas forcément un club sédentaire # (typiquement, les locaux sous digicode) decision = decision[0], decision[1] + u' (local club)' else: for hebergeur in hebergeurs: # Si on est hébergé par un adhérent ok, ou que c'est notre # chambre, pas de problème if hebergeur.dn == proprio.dn or not hebergeur.blacklist_actif(): break else: # Si tous les hebergeurs sont blacklistés, autoriser # uniquement si MA if is_ma: force_ma = True else: decision = "accueil", u"Hébergeur blacklisté" if force_ma: decision = decision[0], decision[1] + u' (force MA)' # Debug: si la machine a un commentaire de la forme "force_vlan: ..." if TEST_SERVER: for info in machine.get('info', []): txt = info.value if txt.startswith(u'force_vlan:'): txt = txt[len(u'force_vlan:'):] vlan, reason = txt.split(',') vlan = vlan.encode('ascii', errors='ignore').strip() reason = u"%s (was %r)" % (reason.strip(), decision) decision = (vlan, reason) return (port,) + decision @radius_event def dummy_fun(p): return radiusd.RLM_MODULE_OK def detach(p=None): """Appelé lors du déchargement du module (enfin, normalement)""" print "*** goodbye from auth.py ***" return radiusd.RLM_MODULE_OK # à réimplémenter dans le authorize # chap_ok(os.getenv('CHAP_PASSWORD'), os.getenv('CHAP_CHALLENGE'), mac) def chap_ok(password, challenge, clear_pass) : """ Test l'authentification chap fournie password et chalenge doivent être données en hexa (avec ou sans le 0x devant) retourne True si l'authentification est OK retourne False sinon """ try : challenge = binascii.a2b_hex(challenge.replace('0x','')) password = binascii.a2b_hex(password.replace('0x','')) if hashlib.md5(password[0] + clear_pass + challenge).digest() == password[1:] : return True except : return False