scripts/freeradius/auth.py

622 lines
21 KiB
Python

# ⁻*- coding: utf-8 -*-
"""
Backend python pour freeradius.
Ce fichier contient la définition de plusieurs fonctions d'interface à
freeradius qui peuvent être appelées (suivant les configurations) à certains
moment de l'authentification, en WiFi, filaire, ou par les NAS eux-mêmes.
Inspirés d'autres exemples trouvés ici :
https://github.com/FreeRADIUS/freeradius-server/blob/master/src/modules/rlm_python/
"""
import logging
import netaddr
import radiusd # Module magique freeradius (radiusd.py is dummy)
import ldap
import os
import binascii
import hashlib
import lc_ldap.shortcuts
from lc_ldap.crans_utils import escape as escape_ldap
import lc_ldap.crans_utils
import lc_ldap.objets
import gestion.config.config as config
from gestion.gen_confs.trigger import trigger_generate_cochon as trigger_generate
import annuaires_pg
from gestion import secrets_new as secrets
#: Serveur radius de test (pas la prod)
TEST_SERVER = bool(os.getenv('DBG_FREERADIUS', False))
#: Le taggage dynamique de vlan (dans la réponse) est désactivé sur WiFi
WIFI_DYN_VLAN = TEST_SERVER
#: Suffixe à retirer du username si présent (en wifi)
USERNAME_SUFFIX_WIFI = '.wifi.crans.org'
#: Suffixe à retirer du username si présent (filaire)
USERNAME_SUFFIX_FIL = '.crans.org'
## -*- Logging -*-
class RadiusdHandler(logging.Handler):
"""Handler de logs pour freeradius"""
def emit(self, record):
"""Process un message de log, en convertissant les niveaux"""
if record.levelno >= logging.WARN:
rad_sig = radiusd.L_ERR
elif record.levelno >= logging.INFO:
rad_sig = radiusd.L_INFO
else:
rad_sig = radiusd.L_DBG
radiusd.radlog(rad_sig, record.msg)
# Initialisation d'un logger (pour logguer unifié)
logger = logging.getLogger('auth.py')
logger.setLevel(logging.DEBUG)
formatter = logging.Formatter('%(name)s: [%(levelname)s] %(message)s')
handler = RadiusdHandler()
handler.setFormatter(formatter)
logger.addHandler(handler)
## -*- Types de blacklists -*-
#: reject tout de suite
BL_REJECT = [u'bloq']
#: place sur le vlan isolement
BL_ISOLEMENT = [u'virus', u'autodisc_virus', u'autodisc_p2p', u'ipv6_ra']
#: place sur accueil
BL_ACCUEIL = []
# Ces blacklists ont des effets soft (portail captif port 80)
#BL_ACCUEIL = [u'carte_etudiant', u'chambre_invalide', u'paiement']
#: chambre qui n'en sont pas vraiment. Il s'agit de prises en libre accès,
# pour lequelles il est donc idiot d'activer la protection antisquattage:
# personne n'y habite ! ( G091 -> G097: salle d'étude du rdc du G)
PUBLIC_CHBRE = ['G091', 'G092', 'G093', 'G094', 'G095', 'G096', 'G097']
## -*- Decorateurs -*-
# À appliquer sur les fonctions qui ont besoin d'une conn ldap
use_ldap_admin = lc_ldap.shortcuts.with_ldap_conn(retries=2, delay=5,
constructor=lc_ldap.shortcuts.lc_ldap_admin)
use_ldap = lc_ldap.shortcuts.with_ldap_conn(retries=2, delay=5,
constructor=lc_ldap.shortcuts.lc_ldap_anonymous)
def radius_event(fun):
"""Décorateur pour les fonctions d'interfaces avec radius.
Une telle fonction prend un uniquement argument, qui est une liste de tuples
(clé, valeur) et renvoie un triplet dont les composantes sont :
* le code de retour (voir radiusd.RLM_MODULE_* )
* un tuple de couples (clé, valeur) pour les valeurs de réponse (accès ok
et autres trucs du genre)
* un tuple de couples (clé, valeur) pour les valeurs internes à mettre à
jour (mot de passe par exemple)
On se contente avec ce décorateur (pour l'instant) de convertir la liste de
tuples en entrée en un dictionnaire."""
def new_f(auth_data):
if type(auth_data) == dict:
data = auth_data
else:
data = dict()
for (key, value) in auth_data or []:
# Beware: les valeurs scalaires sont entre guillemets
# Ex: Calling-Station-Id: "une_adresse_mac"
data[key] = value.replace('"', '')
try:
return fun(data)
except Exception as err:
logger.error('Failed %r on data %r' % (err, auth_data))
raise
return new_f
@use_ldap
def get_machines(data, conn, is_wifi=True, proprio=None):
"""Obtient la liste de machine essayant actuellement de se connecter"""
if is_wifi:
suffix = USERNAME_SUFFIX_WIFI
base = u'(objectclass=machineWifi)'
else:
suffix = USERNAME_SUFFIX_FIL
base = u'(objectclass=machineFixe)'
mac = data.get('Calling-Station-Id', None)
if mac:
try:
mac = lc_ldap.crans_utils.format_mac(mac.decode('ascii', 'ignore'))
except:
logger.error('Cannot format MAC !')
mac = None
username = data.get('User-Name', None)
if username:
# Pour les requètes venant de federezwifi
username = username.split('@', 1)[0]
username = escape_ldap(username.decode('ascii', 'ignore'))
if username.endswith(suffix):
username = username[:-len(suffix)]
if mac is None:
logger.error('Cannot read mac from AP')
if username is None:
logger.error('Cannot read client User-Name !')
# Liste de recherches ldap à essayer, dans l'ordre
# ** Case 1: Search by mac
res = conn.search(u'(&%s(macAddress=%s))' % (base, mac))
if res:
return res
# Si proprio fourni, on ne cherche désormais que parmi ses machines
# (opt est le dico des params optionnels de search)
opt = {}
if proprio is not None:
opt['dn'] = proprio.dn
# Filaire: pas de username.
if not is_wifi:
username = '*'
# ** Case 2: unregistered mac : il nous faut au moins un username ou être sûr
# du propriétaire
if username != '*' or proprio is not None:
res = conn.search(u'(&%s(macAddress=<automatique>)(host=%s%s))' %
(base, username, suffix), **opt)
if TEST_SERVER:
res += conn.search(u'(&%s(host=%s%s))' %
(base, username, suffix), **opt)
return res
def get_prise_chbre(data):
"""Extrait la prise (filaire) et la chambre correspondante.
Par convention, le nom d'une chambre commence (lettre du bâtiment) par une
majuscule, tandis que la prise correspondante commence par une miniscule.
"""
## Filaire: NAS-Identifier => contient le nom du switch (batm-3.adm.crans.org)
## NAS-Port => port du switch (ex 42)
# Lettre du bâtiment (C, B, A, etc, en majuscule)
bat_name = None
# Numéro du switch
bat_num = None
# Port sur le switch
port = None
nas = data.get('NAS-Identifier', None)
if nas:
if nas.startswith('bat'):
nas = nas.split('.', 1)[0]
try:
bat_name = nas[3].upper()
bat_num = int(nas.split('-', 1)[1])
except (IndexError, ValueError):
pass
port = data.get('NAS-Port', None)
if port:
port = int(port)
if bat_num is not None and bat_name and port:
prise = bat_name.lower() + "%01d%02d" % (bat_num, port)
try:
chbre = bat_name + annuaires_pg.reverse(bat_name, prise[1:])[0]
except IndexError:
chbre = None
return prise, chbre
return None
def realm_of_machine(machine):
"""Renvoie le `realm` d'une machine. Don't ask"""
if isinstance(machine, lc_ldap.objets.machineFixe):
return 'adherents'
elif isinstance(machine, lc_ldap.objets.machineWifi):
return 'wifi-adh'
else:
raise NotImplementedError('Only fil and wifi realm supported')
def get_fresh_rid(machine):
"""Génère un rid tout frais pour la machine. Fonction kludge"""
lock_id = machine.conn.lockholder.newid()
realm = realm_of_machine(machine)
try:
return machine.conn._find_id('rid', realm, lock_id)
finally:
machine.conn.lockholder.purge(lock_id)
@use_ldap_admin
def register_machine(data, machine, conn):
"""Enregistre la mac actuelle et/ou assigne le rid sur une machine donnée."""
# TODO lc_ldap devrait posséder une fonction pour passer en rw depuis un ro
if 'w' not in machine.mode:
machine = conn.search(dn=machine.dn, scope=ldap.SCOPE_BASE, mode='rw')[0]
mac = data.get('Calling-Station-Id', None)
if mac is None:
logger.warn('Cannot find MAC for registration (aborting)')
return
mac = mac.decode('ascii', 'ignore').replace('"','')
try:
mac = lc_ldap.crans_utils.format_mac(mac).lower()
except Exception:
logger.warn('Cannot format MAC for registration (aborting)')
return
with machine:
logger.info('Registering mac %s' % mac)
machine['macAddress'] = mac
if not machine.get('rid', None):
logger.info('Registering rid')
machine['rid'] = get_fresh_rid(machine)
machine['ipHostNumber'] = u'<automatique>'
machine.validate_changes()
machine.history_gen()
machine.save()
# This part is pure kludge
logger.info('...Success. Now triggering firewall')
trigger_generate('odlyd')
logger.info('done ! (triggered firewall)')
@radius_event
@use_ldap_admin
@use_ldap
def instantiate(*_):
"""Utile pour initialiser les connexions ldap une première fois (otherwise,
do nothing)"""
logger.info('Instantiation')
if TEST_SERVER:
logger.info('DBG_FREERADIUS is enabled')
@radius_event
def authorize(data):
"""Fonction qui aiguille entre nas, wifi et filaire pour authorize
On se contecte de faire une verification basique de ce que contien la requète
pour déterminer la fonction à utiliser"""
if data.get('NAS-Port-Type', '')==u'Ethernet':
return authorize_fil(data)
elif u"Wireless" in data.get('NAS-Port-Type', ''):
return authorize_wifi(data)
else:
return authorize_nas(data)
@radius_event
def authorize_wifi(data):
"""Section authorize pour le wifi
(NB: le filaire est en accept pour tout le monde)
Éxécuté avant l'authentification proprement dite. On peut ainsi remplir les
champs login et mot de passe qui serviront ensuite à l'authentification
(MschapV2/PEAP ou MschapV2/TTLS)"""
items = get_machines(data)
if not items:
logger.error('No machine found in lc_ldap')
return radiusd.RLM_MODULE_NOTFOUND
if len(items) > 1:
logger.warn('lc_ldap: Too many results (taking first)')
machine = items[0]
proprio = machine.proprio()
if isinstance(proprio, lc_ldap.objets.AssociationCrans):
logger.error('Crans machine trying to authenticate !')
return radiusd.RLM_MODULE_INVALID
for bl in machine.blacklist_actif():
if bl.value['type'] in BL_REJECT:
return radiusd.RLM_MODULE_REJECT
# Kludge : vlan isolement pas possible, donc reject quand-même
if not WIFI_DYN_VLAN and bl.value['type'] in BL_ISOLEMENT:
return radiusd.RLM_MODULE_REJECT
if not machine.get('ipsec', False):
logger.error('WiFi auth but machine has no password')
return radiusd.RLM_MODULE_REJECT
password = machine['ipsec'][0].value.encode('ascii', 'ignore')
# TODO: feed cert here
return (radiusd.RLM_MODULE_UPDATED,
(),
(
("Cleartext-Password", password),
),
)
@radius_event
def authorize_fil(data):
"""
Check le challenge chap, et accepte.
TODO: check BL_REJECT.
"""
chap_ok = False
# Teste l'authentification chap fournie
# password et challenge doivent être données
# en hexa (avec ou sans le 0x devant)
# le User-Name est en réalité la mac ( xx:xx:xx:xx:xx )
password = data.get('CHAP-Password', '')
challenge = data.get('CHAP-Challenge', '')
mac = data.get('User-Name', '')
logger.debug('(fil) authorize(%r)' % ((password, challenge, mac),))
try:
challenge = binascii.a2b_hex(challenge.replace('0x',''))
password = binascii.a2b_hex(password.replace('0x',''))
if hashlib.md5(password[0] + mac + challenge).digest() == password[1:]:
logger.info("(fil) Chap ok")
chap_ok = True
else:
logger.info("(fil) Chap wrong")
except Exception as err:
logger.info("(fil) Chap challenge check failed with %r" % err)
if not chap_ok:
if TEST_SERVER:
logger.debug('(fil) Continue auth (debug)')
else:
return radiusd.RLM_MODULE_REJECT
return (radiusd.RLM_MODULE_UPDATED,
(),
(
("Auth-Type", "Accept"),
),
)
def radius_password(secret_name, machine=None):
"""Cherche le mdp radius pour la machine donnée, et fallback sur le
secret canonique nommé"""
if machine and machine.has_key('TODO'):
pass
return secrets.get(secret_name)
@radius_event
@use_ldap
def authorize_nas(data, ldap):
"""Remplis le mdp d'une borne, ou d'un switch"""
logger.info('nas_auth with %r' % data)
ip = data.get('NAS-Identifier', '')
is_v6 = ':' in ip
ip_stm = ("FreeRADIUS-Client-IP%s-Address" % ('v6'*is_v6, ), ip)
# Find machine
# On rajoute les Machines du club federez au base_filter (federez-wifi):
fed = ldap.search(u'(nom=Federez)')[0]
mach_fed = fed.machines()
base_filter = u'(|(objectClass=machineCrans)(objectClass=borneWifi)'
for mach in mach_fed:
base_filter = base_filter + "(mid=%s)" % mach['mid'][0]
base_filter = base_filter + u')'
if is_v6:
addr = netaddr.IPAddress(ip).value
# EUI64, hein ?
assert ((addr >> 24) & 0xffff) == 0xfffe
# Extrait la mac de l'EUI64 (« trust me, it works »)
mac = (addr >> 16) & (0xffffff << 24) ^ (addr & 0xffffff) ^ (1 << 41)
mac = lc_ldap.crans_utils.format_mac("%012x" % mac)
m_filter = u'(macAddress=%s)' % mac
else:
m_filter = u'(ipHostNumber=%s)' % escape_ldap(ip)
machines = ldap.search(u'(&%s%s)' % (base_filter, m_filter))
if not machines:
if TEST_SERVER or ip == '127.0.0.1':
password = radius_password('radius_eap_key')
shortname = "wifi"
vserver = 'inner-tunnel'
else:
logger.info('not found %r' % m_filter)
return radiusd.RLM_MODULE_NOTFOUND
elif unicode(machines[0]['host'][0]).startswith(u'bat'):
password = radius_password('radius_key', machines[0])
shortname = 'switchs'
vserver = 'filaire'
else:
password = radius_password('radius_eap_key', machines[0])
shortname = "wifi"
vserver = 'wifi'
return (radiusd.RLM_MODULE_OK,
(),
(
ip_stm,
("FreeRADIUS-Client-Require-MA", "no"),
("FreeRADIUS-Client-Secret", password),
("FreeRADIUS-Client-Shortname", shortname),
("FreeRADIUS-Client-NAS-Type", "other"),
# On teste avec une équipe qui marche
("FreeRADIUS-Client-Virtual-Server", vserver),
),
)
@radius_event
def post_auth(data):
# On cherche quel est le type de machine, et quel sites lui appliquer
if data.get('NAS-Port-Type', '')==u'Ethernet':
return post_auth_fil(data)
elif u"Wireless" in data.get('NAS-Port-Type', ''):
return post_auth_wifi(data)
@radius_event
def post_auth_wifi(data):
"""Appelé une fois que l'authentification est ok.
On peut rajouter quelques éléments dans la réponse radius ici.
Comme par exemple le vlan sur lequel placer le client"""
port, vlan_name, reason = decide_vlan(data, True)
mac = data.get('Calling-Station-Id', None)
log_message = '(wifi) %s -> %s [%s%s]' % \
(port, mac, vlan_name, (reason and u': ' + reason).encode('utf-8'))
logger.info(log_message)
# Si NAS ayant des mapping particuliers, à signaler ici
vlan_id = config.vlans[vlan_name]
# WiFi : Pour l'instant, on ne met pas d'infos de vlans dans la réponse
# les bornes wifi ont du mal avec cela
if WIFI_DYN_VLAN:
return (radiusd.RLM_MODULE_UPDATED,
(
("Tunnel-Type", "VLAN"),
("Tunnel-Medium-Type", "IEEE-802"),
("Tunnel-Private-Group-Id", '%d' % vlan_id),
),
()
)
return radiusd.RLM_MODULE_OK
@radius_event
def post_auth_fil(data):
"""Idem, mais en filaire.
"""
port, vlan_name, reason = decide_vlan(data, False)
mac = data.get('Calling-Station-Id', None)
log_message = '(fil) %s -> %s [%s%s]' % \
(port, mac, vlan_name, (reason and u': ' + reason).encode('utf-8'))
logger.info(log_message)
# Si NAS ayant des mapping particuliers, à signaler ici
vlan_id = config.vlans[vlan_name]
# Filaire
return (radiusd.RLM_MODULE_UPDATED,
(
("Tunnel-Type", "VLAN"),
("Tunnel-Medium-Type", "IEEE-802"),
("Tunnel-Private-Group-Id", '%d' % vlan_id),
),
()
)
@use_ldap
def decide_vlan(data, is_wifi, conn):
"""Décide du vlan non-taggué à assigner, et donne une raison
à ce choix.
Retourne un (port, vlan_name, reason)
où * port = est une prise réseau / chambre (si filaire)
"wifi" si wifi
* vlan_name est un nom de vlan (cf config.py)
* reason est un unicode explicant le choix
"""
# Switch de remplissage decision par défaut, port, hebergeurs
if is_wifi:
decision = 'wifi', u''
port = data.get('Called-Station-Id', '?')
hebergeurs = []
else:
decision = 'adherent', u''
prise, chbre = get_prise_chbre(data)
port = "%s/%s" % (prise, chbre or 'Inconnue')
if chbre:
chbre = escape_ldap(chbre)
hebergeurs = conn.search(u'(&(chbre=%s)(|(cid=*)(aid=*)))' % chbre)
else:
hebergeurs = []
# Prend la première machine candidat dans la base, ou exit
items = get_machines(data, is_wifi=is_wifi, proprio=(hebergeurs+[None])[0])
if not items:
return (port, 'accueil', u'Machine inconnue')
machine = items[0]
proprio = machine.proprio()
# Avant de continuer, on assigne la mac à la machine candidat
if '<automatique>' in machine['macAddress'] or not machine['rid']:
register_machine(data, machine)
if not machine['ipHostNumber']:
decision = 'v6only', u'No IPv4'
# TODO: condition plus générique:
# netaddr.all_matching_cidr(machine['ipHostNumber'][0].value,
# config.NETs_primaires['personnel-ens'])
elif machine['ipHostNumber'][0].value in netaddr.IPNetwork('10.2.9.0/24'):
# Cas des personnels logés dans les appartements de l'ENS
decision = 'appts', u'Personnel ENS'
# Application des blacklists
for bl in machine.blacklist_actif():
if bl.value['type'] in BL_ISOLEMENT:
decision = 'isolement', unicode(bl)
if bl.value['type'] in BL_ACCUEIL:
decision = 'accueil', unicode(bl)
# Filaire : protection anti-"squattage"
if not is_wifi:
# Si l'adhérent n'est pas membre actif, il doit se brancher depuis la
# prise d'un autre adhérent à jour de cotisation
force_ma = False
is_ma = bool(proprio.get('droits', False))
if chbre is None:
if is_ma:
force_ma = True
else:
decision = "accueil", u"Chambre inconnue"
elif 'cl' in chbre:
# Pour les locaux clubs, il n'y a pas forcément un club sédentaire
# (typiquement, les locaux sous digicode)
decision = decision[0], decision[1] + u' (local club)'
elif chbre in PUBLIC_CHBRE:
decision = decision[0], decision[1] + u' (lieu de vie)'
else:
for hebergeur in hebergeurs:
# Si on est hébergé par un adhérent ok, ou que c'est notre
# chambre, pas de problème
if hebergeur.dn == proprio.dn or not hebergeur.blacklist_actif():
break
else:
# Si tous les hebergeurs sont blacklistés, autoriser
# uniquement si MA
if is_ma:
force_ma = True
else:
decision = "accueil", u"Hébergeur blacklisté"
if force_ma:
decision = decision[0], decision[1] + u' (force MA)'
# Debug: si la machine a un commentaire de la forme "force_vlan: ..."
if TEST_SERVER:
for info in machine.get('info', []):
txt = info.value
if txt.startswith(u'force_vlan:'):
txt = txt[len(u'force_vlan:'):]
vlan, reason = txt.split(',')
vlan = vlan.encode('ascii', errors='ignore').strip()
reason = u"%s (was %r)" % (reason.strip(), decision)
decision = (vlan, reason)
return (port,) + decision
@radius_event
def dummy_fun(_):
"""Do nothing, successfully. (C'est pour avoir un truc à mettre)"""
return radiusd.RLM_MODULE_OK
def detach(_=None):
"""Appelé lors du déchargement du module (enfin, normalement)"""
print "*** goodbye from auth.py ***"
return radiusd.RLM_MODULE_OK