lc_ldap/crans_utils.py
Pierre-Elliott Bécue 21c427ff9b Meilleure gestion de sys.path et des imports.
* Ajout de /usr/scripts aux paths
 * Déplacement de templates.py dans un endroit logique pour printing
2014-10-15 00:08:53 +02:00

336 lines
12 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# CRANS_UTILS.PY-- Utils for Cr@ns gestion
#
# Copyright (c) 2010-2013, Cr@ns <roots@crans.org>
# Authors: Antoine Durand-Gasselin <adg@crans.org>
# Pierre-Elliott Bécue <becue@crans.org>
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
# * Neither the name of the Cr@ns nor the names of its contributors may
# be used to endorse or promote products derived from this software
# without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL <COPYRIGHT HOLDER> BE LIABLE FOR ANY
# DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
# (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
# ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
import calendar
import netaddr
import re
import time
import smtplib
import sys
import os
import base64
import collections
import hashlib
import ldap.filter
if '/usr/scripts' not in sys.path:
sys.path.append('/usr/scripts')
from gestion import config
from unicodedata import normalize
import subprocess
from netifaces import interfaces, ifaddresses, AF_INET
DEVNULL = open(os.devnull, 'w')
def find_rid_plage(rid):
"""Trouve la plage du rid fourni"""
for (tp, plages) in config.rid_primaires.iteritems():
if isinstance(plages, list):
for begin, end in plages:
if begin <= rid <= end:
return tp, config.rid_primaires[tp][0]
else:
(begin, end) = plages
if begin <= rid <= end:
return tp, (begin, end)
else:
return 'Inconnu', (0, 0)
def find_ipv4_plage(ipv4):
"""Trouve la plage de l'ipv4 fournie"""
for (tp, plage) in config.NETs_primaires.iteritems():
for sousplage in map(netaddr.IPNetwork, plage):
if ipv4 in sousplage:
return tp, sousplage
def ip4_of_rid(rid):
"""Convertit un rid en son IP associée"""
# Au cas où
rid = int(rid)
if rid == -1:
return u""
net, plage = find_rid_plage(rid)
if net == 'Inconnu':
raise ValueError("Rid dans aucune plage: %d" % rid)
if net == 'special':
try:
return netaddr.IPAddress(config.rid_machines_speciales[rid])
except KeyError:
raise ValueError(u"Machine speciale inconnue: %d" % rid)
try:
return netaddr.IPAddress(netaddr.IPNetwork(config.NETs[net][0]).first + rid - plage[0])
except KeyError:
return u""
def rid_of_ip4(ipv4):
"""Convertit une ipv4 en rid, si possible"""
if ipv4 == "":
return -1
# Est-ce une machine spéciale ?
for (rid, ip) in config.rid_machines_speciales.iteritems():
if str(ipv4) == ip:
return rid
# Le cas non-échéant, on va devoir faire de la deep NETs inspection
realm, sousplage = find_ipv4_plage(ipv4)
return config.rid[realm][0][0] + int(ipv4 - sousplage.first)
def prefixev6_of_rid(rid):
"""
L'ip de sous-réseau privé d'une machine. L'adhérent en fait ce qu'il veut, mais c'est la machine
associée au rid qui est responsable du traffic.
Cette fonction retourne l'ip de début de ce sous-réseau.
"""
# Au cas où
rid = int(rid)
net, plage = find_rid_plage(rid)
if net == 'Inconnu':
raise ValueError("Rid dans aucune plage: %d" % rid)
# adherents-v6 ou wifi-adh-v6, we don't care
return netaddr.IPAddress(netaddr.IPNetwork(config.prefix['adherents-v6'][0]).first + 2**64*rid)
def ip6_of_mac(mac, rid):
"""
Retourne la bonne ipv6 de la machine en fonction de sa mac et de son rid.
"""
# Au cas où
rid = int(rid)
if rid == -1:
return u""
net, plage = find_rid_plage(rid)
if net == 'Inconnu':
raise ValueError("Rid dans aucune plage: %d" % rid)
# En théorie, format_mac est inutile, car on ne devrait avoir
# que des mac formatées.
mac = format_mac(mac)
if mac == u'<automatique>':
return u''
mac = mac.replace(u':', u'')
# hex retourne un str, donc on concatène, suivant la RFC
euid64v6 = hex(int(mac[:2], 16)^0b00000010) + mac[2:6] + u'fffe' + mac[6:12]
# adherents-v6 ou wifi-adh-v6, we don't care
if net != "special":
return netaddr.IPAddress(netaddr.IPNetwork(config.prefix[net][0]).first + int(euid64v6, 16))
else:
return netaddr.IPAddress(config.ipv6_machines_speciales[rid])
def strip_accents(a):
""" Supression des accents de la chaîne fournie"""
res = normalize('NFKD', a).encode('ASCII', 'ignore')
return unicode(res)
def strip_spaces(a, by=u'_'):
""" Suppression des espaces et des apostrophes"""
return a.replace(u' ', by).replace(u"'", u'')
def mailexist(mail):
"""Vérifie si une adresse mail existe ou non grace à la commande
vrfy du serveur mail """
mail = mail.split('@', 1)[0]
# try:
s = smtplib.SMTP('smtp.adm.crans.org')
s.putcmd("vrfy", mail)
r = s.getreply()[0] in [250, 252]
s.close()
# except:
# raise ValueError(u'Serveur de mail injoignable')
return r
def format_ldap_time(tm):
"""Formatage des dates provenant de la base LDAP
Transforme la date YYYYMMDDHHMMSS.XXXXXXZ (UTC)
en date DD/MM/YY HH:MM (local)"""
tm_st = time.strptime(tm.split('.')[0], "%Y%m%d%H%M%S") # struct_time UTC
timestamp = calendar.timegm(tm_st)
tm_st = time.localtime(timestamp) # struct_time locale
return time.strftime("%d/%m/%Y %H:%M", tm_st)
def format_mac(mac):
""" Formatage des adresses mac
Transforme une adresse pour obtenir la forme xx:xx:xx:xx:xx:xx
Retourne la mac formatée.
"""
mac = unicode(mac).lower()
if mac == u'<automatique>':
return mac
mac = netaddr.EUI(mac)
if not mac:
raise ValueError(u"MAC nulle interdite\nIl doit être possible de modifier l'adresse de la carte.")
return unicode(str(mac).replace('-', ':'))
def format_tel(tel):
"""Formatage des numéros de téléphone
Transforme un numéro de téléphone pour ne contenir que des chiffres
(00ii... pour les numéros internationaux)
Retourne le numéro formaté.
"""
tel_f = tel.strip()
if tel_f.startswith(u"+"):
tel_f = u"00" + tel_f[1:]
if u"(0)" in tel_f:
tel_f = tel_f.replace(u"(0)", u"")
# \D = non-digit
tel_f = re.sub(r'\D', '', tel_f)
return unicode(tel_f)
def validate_name(value, more_chars=""):
"""Valide un nom: ie un unicode qui contient lettres, espaces et
apostrophes, et éventuellement des caractères additionnels"""
if re.match("^[A-Za-z0-9]([-' %s]?[A-Za-z0-9]?)*$" % more_chars,
normalize('NFKD', value).encode('ASCII', 'ignore')):
return unicode(value)
else:
raise ValueError("Nom invalide (%r)" % value)
def process_status(pid):
"""
Vérifie l'état du processus pid
"""
try:
os.getpgid(int(pid))
return True
except OSError:
return False
def escape(chaine):
"""Renvoie une chaîne échapée pour pouvoir la mettre en toute sécurité
dans une requête ldap."""
return ldap.filter.escape_filter_chars(chaine)
def hash_password(password, salt=None, longueur=4):
if longueur < 4:
raise ValueError("salt devrait faire au moins 4 octets")
if salt is None:
salt = os.urandom(longueur)
elif len(salt) < 4:
raise ValueError("salt devrait faire au moins 4 octets")
return '{SSHA}' + base64.b64encode(hashlib.sha1(password + salt).digest() + salt)
def decode_subjectAltName(data):
from pyasn1.codec.der import decoder
from pyasn1_modules.rfc2459 import SubjectAltName
altName = []
sa_names = decoder.decode(data, asn1Spec=SubjectAltName())[0]
for name in sa_names:
name_type = name.getName()
if name_type == 'dNSName':
altName.append(unicode(name.getComponent()))
# Cacert met des othername, du coup, on ignore juste
# else:
# raise ValueError("Seulement les dNSName sont supporté pour l'extension de certificat SubjectAltName (et pas %s)" % name_type)
return altName
def fetch_cert_info(x509):
# Attention, pour les X509req, la vertion de openssl utilisé et celle du fichier /usr/scripts/lib/python2.7/site-packages/pyOpenSSL-0.14-py2.7.egg
# /usr/scripts/python.sh met le path comme il faut, sinon, il faudrait faire quelque chose comme :
# for file in os.listdir('/usr/scripts/lib/python2.7/site-packages/'):
# if file.endswith(".egg"):
# sys.path.insert(2, '/usr/scripts/lib/python2.7/site-packages/%s' % file)
# sys.path.insert(2, '/usr/scripts/lib/python2.7/site-packages/')
import OpenSSL
data = {}
data['subject'] = dict(x509.get_subject().get_components())
if isinstance(x509, OpenSSL.crypto.X509):
data['issuer'] = dict(x509.get_issuer().get_components())
data['start'] = int(time.mktime(time.strptime(x509.get_notBefore(), '%Y%m%d%H%M%SZ')))
data['end'] = int(time.mktime(time.strptime(x509.get_notAfter(), '%Y%m%d%H%M%SZ')))
data['serialNumber'] = unicode(int(x509.get_serial_number()))
data['extensions'] = collections.defaultdict(list)
def do_ext(data, ext):
ext_name = ext.get_short_name()
if ext_name == 'subjectAltName':
data['extensions'][ext_name] = decode_subjectAltName(ext.get_data())
elif ext_name == 'extendedKeyUsage':
data['extensions'][ext_name].append(str(ext))
else:
data['extensions'][ext_name] = str(ext)
if isinstance(x509, OpenSSL.crypto.X509):
for i in range(0, x509.get_extension_count()):
ext = x509.get_extension(i)
do_ext(data, ext)
else:
for ext in x509.get_extensions():
do_ext(data, ext)
return data
def ip4_addresses():
"""Renvois la liste des ipv4 de la machine physique courante"""
ip_list = []
for interface in interfaces():
if interface!='lo' and AF_INET in ifaddresses(interface).keys():
for link in ifaddresses(interface)[AF_INET]:
ip_list.append(link['addr'])
return ip_list
def extract_tz(thetz):
abstz = 100*abs(thetz)
if thetz == 0:
return u"Z"
else:
return u"%s%04d" % ("+"*(thetz < 0) + "-"*(thetz > 0), abstz)
def to_generalized_time_format(stamp):
"""Converts a timestamp (local) in a generalized time format
for LDAP.
* stamp : float value
* output : a string without the dot second
"""
return u"%s%s" % (time.strftime("%Y%m%d%H%M%S", time.localtime(stamp)), extract_tz(time.altzone/3600))
def from_generalized_time_format(gtf):
"""Converts a GTF stamp to unix timestamp
* gtf : a generalized time format resource without dotsecond
* output : a float value
"""
return time.mktime(time.strptime(gtf.split("-", 1)[0].split("+", 1)[0].split('Z', 1)[0], "%Y%m%d%H%M%S"))