lc_ldap/crans_utils.py
2013-10-07 01:02:47 +02:00

228 lines
7.7 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# CRANS_UTILS.PY-- Utils for Cr@ns gestion
#
# Copyright (c) 2010-2013, Cr@ns <roots@crans.org>
# Authors: Antoine Durand-Gasselin <adg@crans.org>
# Pierre-Elliott Bécue <becue@crans.org>
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
# * Neither the name of the Cr@ns nor the names of its contributors may
# be used to endorse or promote products derived from this software
# without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL <COPYRIGHT HOLDER> BE LIABLE FOR ANY
# DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
# (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
# ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
import calendar
import netaddr
import re
import time
import smtplib
import sys
import os
import ldap.filter
sys.path.append('/usr/scripts/gestion')
import config
from unicodedata import normalize
import subprocess
DEVNULL = open(os.devnull, 'w')
def find_rid_plage(rid):
"""Trouve la plage du rid fourni"""
for (tp, plages) in config.rid_primaires.iteritems():
if isinstance(plages, list):
for begin, end in plages:
if begin <= rid <= end:
return tp, config.rid_primaires[tp][0]
else:
(begin, end) = plages
if begin <= rid <= end:
return tp, (begin, end)
else:
return 'Inconnu', (0, 0)
def find_ipv4_plage(ipv4):
"""Trouve la plage de l'ipv4 fournie"""
for (tp, plage) in config.NETs_primaires.iteritems():
for sousplage in map(netaddr.IPNetwork, plage):
if ipv4 in sousplage:
return tp, sousplage
def ip4_of_rid(rid):
"""Convertit un rid en son IP associée"""
# Au cas où
rid = int(rid)
if rid == -1:
return u""
net, plage = find_rid_plage(rid)
if net == 'Inconnu':
raise ValueError("Rid dans aucune plage: %d" % rid)
if net == 'special':
try:
return netaddr.IPAddress(config.rid_machines_speciales[rid])
except KeyError:
raise ValueError(u"Machine speciale inconnue: %d" % rid)
try:
return netaddr.IPAddress(netaddr.IPNetwork(config.NETs[net][0]).first + rid - plage[0])
except KeyError:
return u""
def rid_of_ip4(ipv4):
"""Convertit une ipv4 en rid, si possible"""
if ipv4 == "":
return -1
# Est-ce une machine spéciale ?
for (rid, ip) in config.rid_machines_speciales.iteritems():
if str(ipv4) == ip:
return rid
# Le cas non-échéant, on va devoir faire de la deep NETs inspection
realm, sousplage = find_ipv4_plage(ipv4)
return config.rid[realm][0][0] + int(ipv4 - sousplage.first)
def prefixev6_of_rid(rid):
"""
L'ip de sous-réseau privé d'une machine. L'adhérent en fait ce qu'il veut, mais c'est la machine
associée au rid qui est responsable du traffic.
Cette fonction retourne l'ip de début de ce sous-réseau.
"""
# Au cas où
rid = int(rid)
net, plage = find_rid_plage(rid)
if net == 'Inconnu':
raise ValueError("Rid dans aucune plage: %d" % rid)
# adherents-v6 ou wifi-adh-v6, we don't care
return netaddr.IPAddress(netaddr.IPNetwork(config.prefix['adherents-v6'][0]).first + 2**64*rid)
def ip6_of_mac(mac, rid):
"""
Retourne la bonne ipv6 de la machine en fonction de sa mac et de son rid.
"""
# Au cas où
rid = int(rid)
if rid == -1:
return u""
net, plage = find_rid_plage(rid)
if net == 'Inconnu':
raise ValueError("Rid dans aucune plage: %d" % rid)
# En théorie, format_mac est inutile, car on ne devrait avoir
# que des mac formatées.
mac = format_mac(mac).replace(u':', u'')
# hex retourne un str, donc on concatène, suivant la RFC
euid64v6 = hex(int(mac[:2], 16)^0b00000010) + mac[2:6] + u'fffe' + mac[6:12]
# adherents-v6 ou wifi-adh-v6, we don't care
if net != "special":
return netaddr.IPAddress(netaddr.IPNetwork(config.prefix[net][0]).first + int(euid64v6, 16))
else:
return netaddr.IPAddress(config.ipv6_machines_speciales[rid])
def strip_accents(a):
""" Supression des accents de la chaîne fournie"""
res = normalize('NFKD', a).encode('ASCII', 'ignore')
return unicode(res)
def strip_spaces(a):
""" Suppression des espaces et des apostrophes"""
return a.replace(u' ', u'_').replace(u"'", u'')
def mailexist(mail):
"""Vérifie si une adresse mail existe ou non grace à la commande
vrfy du serveur mail """
mail = mail.split('@', 1)[0]
# try:
s = smtplib.SMTP('smtp.adm.crans.org')
s.putcmd("vrfy", mail)
r = s.getreply()[0] in [250, 252]
s.close()
# except:
# raise ValueError(u'Serveur de mail injoignable')
return r
def format_ldap_time(tm):
"""Formatage des dates provenant de la base LDAP
Transforme la date YYYYMMDDHHMMSS.XXXXXXZ (UTC)
en date DD/MM/YY HH:MM (local)"""
tm_st = time.strptime(tm.split('.')[0], "%Y%m%d%H%M%S") # struct_time UTC
timestamp = calendar.timegm(tm_st)
tm_st = time.localtime(timestamp) # struct_time locale
return time.strftime("%d/%m/%Y %H:%M", tm_st)
def format_mac(mac):
""" Formatage des adresses mac
Transforme une adresse pour obtenir la forme xx:xx:xx:xx:xx:xx
Retourne la mac formatée.
"""
mac = netaddr.EUI(mac)
if not mac:
raise ValueError(u"MAC nulle interdite\nIl doit être possible de modifier l'adresse de la carte.")
return unicode(str(mac).replace('-', ':'))
def format_tel(tel):
"""Formatage des numéros de téléphone
Transforme un numéro de téléphone pour ne contenir que des chiffres
(00ii... pour les numéros internationaux)
Retourne le numéro formaté.
"""
tel_f = tel.strip()
if tel_f.startswith(u"+"):
tel_f = u"00" + tel_f[1:]
if u"(0)" in tel_f:
tel_f = tel_f.replace(u"(0)", u"")
# \D = non-digit
tel_f = re.sub(r'\D', '', tel_f)
return unicode(tel_f)
def validate_name(value, more_chars=""):
"""Valide un nom: ie un unicode qui contient lettres, espaces et
apostrophes, et éventuellement des caractères additionnels"""
if re.match("^[A-Za-z0-9]([-' %s]?[A-Za-z0-9]?)*$" % more_chars,
normalize('NFKD', value).encode('ASCII', 'ignore')):
return unicode(value)
else:
raise ValueError("Nom invalide (%r)" % value)
def process_status(pid):
"""
Vérifie l'état du processus pid
"""
cmd = subprocess.call(['ps', '%s' % pid], stdout=DEVNULL, stderr=subprocess.STDOUT)
if cmd != 0:
return False
else:
return True
def escape(chaine):
"""Renvoie une chaîne échapée pour pouvoir la mettre en toute sécurité
dans une requête ldap."""
return ldap.filter.escape_filter_chars(chaine)