487 lines
16 KiB
Python
487 lines
16 KiB
Python
# ⁻*- coding: utf-8 -*-
|
|
#
|
|
# Ce fichier contient la définition de plusieurs fonctions d'interface à freeradius
|
|
# qui peuvent être appelées (suivant les configurations) à certains moment de
|
|
# l'éxécution.
|
|
#
|
|
|
|
import logging
|
|
import netaddr
|
|
import radiusd # Module magique freeradius (radiusd.py is dummy)
|
|
import ldap
|
|
import os
|
|
|
|
import lc_ldap.shortcuts
|
|
from lc_ldap.crans_utils import escape as escape_ldap
|
|
import lc_ldap.crans_utils
|
|
import lc_ldap.objets
|
|
import gestion.config.config as config
|
|
from gestion.gen_confs.trigger import trigger_generate_cochon as trigger_generate
|
|
import annuaires_pg
|
|
|
|
TEST_SERVER = bool(os.getenv('DBG_FREERADIUS', False))
|
|
WIFI_DYN_VLAN = TEST_SERVER
|
|
|
|
USERNAME_SUFFIX_WIFI = '.wifi.crans.org'
|
|
USERNAME_SUFFIX_FIL = '.crans.org'
|
|
|
|
## -*- Logging -*-
|
|
# Initialisation d'un logger pour faire des stats etc
|
|
# pour l'instant, on centralise tout sur thot en mode debug
|
|
logger = logging.getLogger('auth.py')
|
|
logger.setLevel(logging.DEBUG)
|
|
formatter = logging.Formatter('%(name)s: [%(levelname)s] %(message)s')
|
|
handler = logging.handlers.SysLogHandler(address = '/dev/log')
|
|
try:
|
|
handler.addFormatter(formatter)
|
|
except AttributeError:
|
|
handler.formatter = formatter
|
|
logger.addHandler(handler)
|
|
|
|
## -*- Types de blacklists -*-
|
|
#: reject tout de suite
|
|
BL_REJECT = [u'bloq']
|
|
|
|
#: place sur le vlan isolement
|
|
BL_ISOLEMENT = [u'virus', u'autodisc_virus', u'autodisc_p2p', u'ipv6_ra']
|
|
|
|
# TODO carte_etudiant: dépend si sursis ou non (regarder lc_ldap)
|
|
# TODO LOGSSSSS
|
|
|
|
#: place sur accueil
|
|
BL_ACCUEIL = []
|
|
|
|
# Ces blacklists ont des effets soft (portail captif port 80)
|
|
#BL_ACCUEIL = [u'carte_etudiant', u'chambre_invalide', u'paiement']
|
|
|
|
## -*- Decorateurs -*-
|
|
# À appliquer sur les fonctions qui ont besoin d'une conn ldap
|
|
use_ldap_admin = lc_ldap.shortcuts.with_ldap_conn(retries=2, delay=5,
|
|
constructor=lc_ldap.shortcuts.lc_ldap_admin)
|
|
use_ldap = lc_ldap.shortcuts.with_ldap_conn(retries=2, delay=5,
|
|
constructor=lc_ldap.shortcuts.lc_ldap_anonymous)
|
|
|
|
def radius_event(f):
|
|
"""Décorateur pour les fonctions d'interfaces avec radius.
|
|
Une telle fonction prend un uniquement argument, qui est une liste de tuples
|
|
(clé, valeur) et renvoie un triplet dont les composantes sont :
|
|
* le code de retour (voir radiusd.RLM_MODULE_* )
|
|
* un tuple de couples (clé, valeur) pour les valeurs de réponse (accès ok
|
|
et autres trucs du genre)
|
|
* un tuple de couples (clé, valeur) pour les valeurs internes à mettre à
|
|
jour (mot de passe par exemple)
|
|
Voir des exemples plus complets ici:
|
|
https://github.com/FreeRADIUS/freeradius-server/blob/master/src/modules/rlm_python/
|
|
|
|
On se contente avec ce décorateur (pour l'instant) de convertir la liste de
|
|
tuples en entrée en un dictionnaire."""
|
|
|
|
def new_f(auth_data):
|
|
data = dict()
|
|
for (key, value) in auth_data or []:
|
|
# Beware: les valeurs scalaires sont entre guillemets
|
|
# Ex: Calling-Station-Id: "une_adresse_mac"
|
|
data[key] = value.replace('"', '')
|
|
try:
|
|
return f(data)
|
|
except Exception as e:
|
|
logger.error(repr(e) + ' on data ' + repr(auth_data))
|
|
raise
|
|
|
|
return new_f
|
|
|
|
@use_ldap
|
|
def get_machines(data, conn, is_wifi=True, proprio=None):
|
|
"""Obtient la liste de machine essayant actuellement de se connecter"""
|
|
if is_wifi:
|
|
suffix = USERNAME_SUFFIX_WIFI
|
|
base = u'(objectclass=machineWifi)'
|
|
else:
|
|
suffix = USERNAME_SUFFIX_FIL
|
|
base = u'(objectclass=machineFixe)'
|
|
|
|
mac = data.get('Calling-Station-Id', None)
|
|
if mac:
|
|
try:
|
|
mac = lc_ldap.crans_utils.format_mac(mac.decode('ascii', 'ignore'))
|
|
except:
|
|
radiusd.radlog(radiusd.L_ERR, 'Cannot format MAC !')
|
|
mac = None
|
|
username = data.get('User-Name', None)
|
|
if username:
|
|
username = escape_ldap(username.decode('ascii', 'ignore'))
|
|
if username.endswith(suffix):
|
|
username = username[:-len(suffix)]
|
|
|
|
if mac is None:
|
|
radiusd.radlog(radiusd.L_ERR, 'Cannot read client MAC from AP !')
|
|
if username is None:
|
|
radiusd.radlog(radiusd.L_ERR, 'Cannot read client User-Name !')
|
|
|
|
# Liste de recherches ldap à essayer, dans l'ordre
|
|
# ** Case 1: Search by mac
|
|
res = conn.search(u'(&%s(macAddress=%s))' % (base, mac))
|
|
if res:
|
|
return res
|
|
|
|
# Si proprio fourni, on ne cherche désormais que parmi ses machines
|
|
# (opt est le dico des params optionnels de search)
|
|
opt = {}
|
|
if proprio is not None:
|
|
opt['dn'] = proprio.dn
|
|
# Filaire: pas de username.
|
|
if not is_wifi:
|
|
username = '*'
|
|
|
|
# ** Case 2: unregistered mac : il nous faut au moins un username ou être sûr
|
|
# du propriétaire
|
|
if username != '*' or proprio is not None:
|
|
res = conn.search(u'(&%s(macAddress=<automatique>)(host=%s%s))' %
|
|
(base, username, suffix), **opt)
|
|
|
|
return res
|
|
|
|
def get_prise_chbre(data):
|
|
"""Extrait la prise (filaire) et la chambre correspondante.
|
|
Par convention, le nom d'une chambre commence (lettre du bâtiment) par une
|
|
majuscule, tandis que la prise correspondante commence par une miniscule.
|
|
"""
|
|
## Filaire: NAS-Identifier => contient le nom du switch (batm-3.adm.crans.org)
|
|
## NAS-Port => port du switch (ex 42)
|
|
|
|
# Lettre du bâtiment (C, B, A, etc, en majuscule)
|
|
bat_name = None
|
|
|
|
# Numéro du switch
|
|
bat_num = None
|
|
|
|
# Port sur le switch
|
|
port = None
|
|
|
|
nas = data.get('NAS-Identifier', None)
|
|
if nas:
|
|
if nas.startswith('bat'):
|
|
nas = nas.split('.', 1)[0]
|
|
try:
|
|
bat_name = nas[3].upper()
|
|
bat_num = int(nas.split('-', 1)[1])
|
|
except IndexError, ValueError:
|
|
pass
|
|
port = data.get('NAS-Port', None)
|
|
if port:
|
|
port = int(port)
|
|
|
|
if bat_num is not None and bat_name and port:
|
|
prise = bat_name.lower() + "%01d%02d" % (bat_num, port)
|
|
try:
|
|
chbre = bat_name + annuaires_pg.reverse(bat_name, prise[1:])[0]
|
|
except IndexError:
|
|
chbre = None
|
|
return prise, chbre
|
|
return None
|
|
|
|
def realm_of_machine(machine):
|
|
"""Renvoie le `realm` d'une machine. Don't ask"""
|
|
if isinstance(machine, lc_ldap.objets.machineFixe):
|
|
return 'fil'
|
|
elif isinstance(machine, lc_ldap.objets.machineWifi):
|
|
return 'wifi-adh'
|
|
else:
|
|
raise NotImplementedError('Only fil and wifi realm supported')
|
|
|
|
def get_fresh_rid(machine):
|
|
"""Génère un rid tout frais pour la machine. Fonction kludge"""
|
|
lockId = machine.conn.lockholder.newid()
|
|
realm = realm_of_machine(machine)
|
|
try:
|
|
return machine.conn._find_id('rid', realm, lockId)
|
|
finally:
|
|
machine.conn.lockholder.purge(lockId)
|
|
|
|
@use_ldap_admin
|
|
def register_mac(data, machine, conn):
|
|
"""Enregistre la mac actuelle sur une machine donnée."""
|
|
# TODO lc_ldap devrait posséder une fonction pour passer en rw depuis un ro
|
|
if 'w' not in machine.mode:
|
|
machine = conn.search(dn=machine.dn, scope=ldap.SCOPE_BASE, mode='rw')[0]
|
|
|
|
mac = data.get('Calling-Station-Id', None)
|
|
if mac is None:
|
|
radiusd.radlog(radiusd.L_ERR, 'Cannot find MAC')
|
|
return
|
|
mac = mac.decode('ascii', 'ignore').replace('"','')
|
|
try:
|
|
mac = lc_ldap.crans_utils.format_mac(mac).lower()
|
|
except:
|
|
radiusd.radlog(radiusd.L_ERR, 'Cannot format MAC !')
|
|
return
|
|
|
|
with machine:
|
|
logger.info('Registering mac %s' % mac)
|
|
machine['macAddress'] = mac
|
|
if not machine.get('rid', None):
|
|
logger.info('Registering rid')
|
|
machine['rid'] = get_fresh_rid(machine)
|
|
machine['ipHostNumber'] = u'<automatique>'
|
|
machine.validate_changes()
|
|
machine.history_gen()
|
|
machine.save()
|
|
# This part is pure kludge
|
|
logger.info('...Success. Now triggering firewall')
|
|
trigger_generate('odlyd')
|
|
logger.info('done ! (triggered firewall)')
|
|
|
|
@radius_event
|
|
@use_ldap_admin
|
|
@use_ldap
|
|
def instantiate(p, *conns):
|
|
"""Utile pour initialiser les connexions ldap une première fois (otherwise,
|
|
do nothing)"""
|
|
logger.info('Instantiation')
|
|
if TEST_SERVER:
|
|
logger.info('DBG_FREERADIUS is enabled')
|
|
|
|
@radius_event
|
|
def authorize_wifi(data):
|
|
"""Section authorize pour le wifi
|
|
(NB: le filaire est en accept pour tout le monde)
|
|
Éxécuté avant l'authentification proprement dite. On peut ainsi remplir les
|
|
champs login et mot de passe qui serviront ensuite à l'authentification
|
|
(MschapV2/PEAP ou MschapV2/TTLS)"""
|
|
|
|
items = get_machines(data)
|
|
|
|
if not items:
|
|
radiusd.radlog(radiusd.L_ERR, 'lc_ldap: Nobody found')
|
|
return radiusd.RLM_MODULE_NOTFOUND
|
|
|
|
if len(items) > 1:
|
|
radiusd.radlog(radiusd.L_ERR, 'lc_ldap: Too many results (took first)')
|
|
|
|
machine = items[0]
|
|
|
|
proprio = machine.proprio()
|
|
if isinstance(proprio, lc_ldap.objets.AssociationCrans):
|
|
radiusd.radlog(radiusd.L_ERR, 'Crans machine trying to authenticate !')
|
|
return radiusd.RLM_MODULE_INVALID
|
|
|
|
for bl in machine.blacklist_actif():
|
|
if bl.value['type'] in BL_REJECT:
|
|
return radiusd.RLM_MODULE_REJECT
|
|
# Kludge : vlan isolement pas possible, donc reject quand-même
|
|
if not WIFI_DYN_VLAN and bl.value['type'] in BL_ISOLEMENT:
|
|
return radiusd.RLM_MODULE_REJECT
|
|
|
|
|
|
if not machine.get('ipsec', False):
|
|
radiusd.radlog(radiusd.L_ERR, 'WiFi authentication but machine has no' +
|
|
'password')
|
|
return radiusd.RLM_MODULE_REJECT
|
|
|
|
password = machine['ipsec'][0].value.encode('ascii', 'ignore')
|
|
|
|
# TODO: feed cert here
|
|
return (radiusd.RLM_MODULE_UPDATED,
|
|
(),
|
|
(
|
|
("Cleartext-Password", password),
|
|
),
|
|
)
|
|
|
|
@radius_event
|
|
def authorize_fil(data):
|
|
"""For now, do nothing.
|
|
TODO: check BL_REJECT.
|
|
TODO: check chap auth
|
|
"""
|
|
return (radiusd.RLM_MODULE_UPDATED,
|
|
(),
|
|
(
|
|
("Auth-Type", "crans_fil"),
|
|
),
|
|
)
|
|
|
|
@radius_event
|
|
def post_auth_wifi(data):
|
|
"""Appelé une fois que l'authentification est ok.
|
|
On peut rajouter quelques éléments dans la réponse radius ici.
|
|
Comme par exemple le vlan sur lequel placer le client"""
|
|
|
|
port, vlan_name, reason = decide_vlan(data, True)
|
|
mac = data.get('Calling-Station-Id', None)
|
|
|
|
log_message = '(wifi) %s -> %s [%s%s]' % \
|
|
(port, mac, vlan_name, (reason and u': ' + reason).encode('utf-8'))
|
|
logger.info(log_message)
|
|
radiusd.radlog(radiusd.L_AUTH, log_message)
|
|
|
|
# Si NAS ayant des mapping particuliers, à signaler ici
|
|
vlan_id = config.vlans[vlan_name]
|
|
|
|
# WiFi : Pour l'instant, on ne met pas d'infos de vlans dans la réponse
|
|
# les bornes wifi ont du mal avec cela
|
|
if WIFI_DYN_VLAN:
|
|
return (radiusd.RLM_MODULE_UPDATED,
|
|
(
|
|
("Tunnel-Type", "VLAN"),
|
|
("Tunnel-Medium-Type", "IEEE-802"),
|
|
("Tunnel-Private-Group-Id", '%d' % vlan_id),
|
|
),
|
|
()
|
|
)
|
|
|
|
return radiusd.RLM_MODULE_OK
|
|
|
|
@radius_event
|
|
def post_auth_fil(data):
|
|
"""Idem, mais en filaire.
|
|
"""
|
|
|
|
port, vlan_name, reason = decide_vlan(data, False)
|
|
mac = data.get('Calling-Station-Id', None)
|
|
|
|
log_message = '(fil) %s -> %s [%s%s]' % \
|
|
(port, mac, vlan_name, (reason and u': ' + reason).encode('utf-8'))
|
|
logger.info(log_message)
|
|
radiusd.radlog(radiusd.L_AUTH, log_message)
|
|
|
|
# Si NAS ayant des mapping particuliers, à signaler ici
|
|
vlan_id = config.vlans[vlan_name]
|
|
|
|
# Filaire
|
|
return (radiusd.RLM_MODULE_UPDATED,
|
|
(
|
|
("Tunnel-Type", "VLAN"),
|
|
("Tunnel-Medium-Type", "IEEE-802"),
|
|
("Tunnel-Private-Group-Id", '%d' % vlan_id),
|
|
),
|
|
()
|
|
)
|
|
|
|
@use_ldap
|
|
def decide_vlan(data, is_wifi, conn):
|
|
"""Décide du vlan non-taggué à assigner, et donne une raison
|
|
à ce choix.
|
|
Retourne un (port, vlan_name, reason)
|
|
où * port = est une prise réseau / chambre (si filaire)
|
|
"wifi" si wifi
|
|
* vlan_name est un nom de vlan (cf config.py)
|
|
* reason est un unicode explicant le choix
|
|
"""
|
|
|
|
# Switch de remplissage decision par défaut, port, hebergeurs
|
|
if is_wifi:
|
|
decision = 'wifi', u''
|
|
port = data.get('Called-Station-Id', '?')
|
|
hebergeurs = []
|
|
else:
|
|
decision = 'adherent', u''
|
|
prise, chbre = get_prise_chbre(data)
|
|
port = "%s/%s" % (prise, chbre or 'Inconnue')
|
|
|
|
if chbre:
|
|
chbre = escape_ldap(chbre)
|
|
hebergeurs = conn.search(u'(&(chbre=%s)(|(cid=*)(aid=*)))' % chbre)
|
|
else:
|
|
hebergeurs = []
|
|
|
|
# Prend la première machine candidat dans la base, ou exit
|
|
items = get_machines(data, is_wifi=is_wifi, proprio=(hebergeurs+[None])[0])
|
|
if not items:
|
|
return (port, 'accueil', u'Machine inconnue')
|
|
machine = items[0]
|
|
|
|
proprio = machine.proprio()
|
|
|
|
# Avant de continuer, on assigne la mac à la machine candidat
|
|
if '<automatique>' in machine['macAddress']:
|
|
register_mac(data, machine)
|
|
|
|
if not machine['ipHostNumber']:
|
|
decision = 'v6only', u'No IPv4'
|
|
# TODO: condition plus générique:
|
|
# netaddr.all_matching_cidr(machine['ipHostNumber'][0].value,
|
|
# config.NETs_primaires['personnel-ens'])
|
|
elif machine['ipHostNumber'][0].value in netaddr.IPNetwork('10.2.9.0/24'):
|
|
# Cas des personnels logés dans les appartements de l'ENS
|
|
decision = 'appts', u'Personnel ENS'
|
|
|
|
# Application des blacklists
|
|
for bl in machine.blacklist_actif():
|
|
if bl.value['type'] in BL_ISOLEMENT:
|
|
decision = 'isolement', unicode(bl)
|
|
if bl.value['type'] in BL_ACCUEIL:
|
|
decision = 'accueil', unicode(bl)
|
|
|
|
# Filaire : protection anti-"squattage"
|
|
if not is_wifi:
|
|
# Si l'adhérent n'est pas membre actif, il doit se brancher depuis la
|
|
# prise d'un autre adhérent à jour de cotisation
|
|
force_ma = False
|
|
is_ma = bool(proprio.get('droits', False))
|
|
if chbre is None:
|
|
if is_ma:
|
|
force_ma = True
|
|
else:
|
|
decision = "accueil", u"Chambre inconnue"
|
|
elif 'cl' in chbre:
|
|
# Pour les locaux clubs, il n'y a pas forcément un club sédentaire
|
|
# (typiquement, les locaux sous digicode)
|
|
decision = decision[0], decision[1] + u' (local club)'
|
|
else:
|
|
for hebergeur in hebergeurs:
|
|
# Si on est hébergé par un adhérent ok, ou que c'est notre
|
|
# chambre, pas de problème
|
|
if hebergeur.dn == proprio.dn or not hebergeur.blacklist_actif():
|
|
break
|
|
else:
|
|
# Si tous les hebergeurs sont blacklistés, autoriser
|
|
# uniquement si MA
|
|
if is_ma:
|
|
force_ma = True
|
|
else:
|
|
decision = "accueil", u"Hébergeur blacklisté"
|
|
|
|
if force_ma:
|
|
decision = decision[0], decision[1] + u' (force MA)'
|
|
|
|
# Debug: si la machine a un commentaire de la forme "force_vlan: ..."
|
|
if TEST_SERVER:
|
|
for info in machine.get('info', []):
|
|
txt = info.value
|
|
if txt.startswith(u'force_vlan:'):
|
|
txt = txt[len(u'force_vlan:'):]
|
|
vlan, reason = txt.split(',')
|
|
vlan = vlan.encode('ascii', errors='ignore').strip()
|
|
reason = u"%s (was %r)" % (reason.strip(), decision)
|
|
|
|
decision = (vlan, reason)
|
|
|
|
return (port,) + decision
|
|
|
|
@radius_event
|
|
def dummy_fun(p):
|
|
return radiusd.RLM_MODULE_OK
|
|
|
|
def detach(p=None):
|
|
"""Appelé lors du déchargement du module (enfin, normalement)"""
|
|
print "*** goodbye from auth.py ***"
|
|
return radiusd.RLM_MODULE_OK
|
|
|
|
# à réimplémenter dans le authorize
|
|
# chap_ok(os.getenv('CHAP_PASSWORD'), os.getenv('CHAP_CHALLENGE'), mac)
|
|
def chap_ok(password, challenge, clear_pass) :
|
|
""" Test l'authentification chap fournie
|
|
password et chalenge doivent être données
|
|
en hexa (avec ou sans le 0x devant)
|
|
|
|
retourne True si l'authentification est OK
|
|
retourne False sinon
|
|
"""
|
|
try :
|
|
challenge = binascii.a2b_hex(challenge.replace('0x',''))
|
|
password = binascii.a2b_hex(password.replace('0x',''))
|
|
if hashlib.md5(password[0] + clear_pass + challenge).digest() == password[1:] :
|
|
return True
|
|
except :
|
|
return False
|