lc_ldap/shortcuts.py
2014-10-29 13:15:44 +01:00

169 lines
6 KiB
Python

#!/usr/bin/env python
# -*- coding: utf-8 -*-
""" Raccourcis pour se connecter facilement à la base LDAP avec le binding lc_ldap. """
import sys
import os
import functools
from ldap import SERVER_DOWN
from time import sleep
# Import du fichier de secrets. Le kludge de path ci-dessous ne devrait plus
# avoir lieu dans le futur
try:
from gestion import secrets_new as secrets
except ImportError:
sys.stderr.write("lc_ldap shortcuts: shaa, cannot import secrets_new. " +
"try again with /usr/scripts/ in PYTHONPATH " +
"(argv: %s)\n" % " ".join(getattr(sys, 'argv', [])))
sys.path.append("/usr/scripts")
from gestion import secrets_new as secrets
sys.path.pop()
import lc_ldap as module_qui_a_le_meme_nom_que_sa_classe_principale
import variables
#: Pour enregistrer dans l'historique, on a besoin de savoir qui exécute le script
#: Si le script a été exécuté via un sudo, la variable SUDO_USER (l'utilisateur qui a effectué le sudo)
#: est plus pertinente que USER (qui sera root)
#: À noter que Cron, par exemple, n'a ni USER ni SUDO_USER mais possède bien LOGNAME
import getpass
current_user = os.getenv("SUDO_USER") or os.getenv("USER") or os.getenv("LOGNAME") or getpass.getuser()
if isinstance(current_user, str):
current_user = current_user.decode("utf-8")
#: Les racourcis définis plus bas pointent tous vers lc_ldap_test en mode
# debug
DB_TEST_OVERRIDE = bool(os.getenv('DBG_LDAP', False))
def lc_ldap(*args, **kwargs):
"""Renvoie une connexion à la base LDAP."""
return module_qui_a_le_meme_nom_que_sa_classe_principale.lc_ldap(*args, **kwargs)
def lc_ldap_test(*args, **kwargs):
"""Renvoie une connexion LDAP à la base de tests."""
module_qui_a_le_meme_nom_que_sa_classe_principale.cimetiere.cimetiere_root='/tmp/cimetiere_lc/'
try:
os.mkdir(module_qui_a_le_meme_nom_que_sa_classe_principale.cimetiere.cimetiere_root)
except OSError:
pass
# On impose le serveur
kwargs["uri"] = 'ldap://vo.adm.crans.org'
kwargs.setdefault("dn", 'cn=admin,dc=crans,dc=org')
# Le mot de passe de la base de test
kwargs.setdefault("cred", variables.ldap_test_password)
# On en a aussi besoin pour le lookup en readonly
kwargs.setdefault("readonly_dn", variables.readonly_dn)
kwargs.setdefault("readonly_password", variables.ldap_test_password)
kwargs.setdefault("user", current_user)
return module_qui_a_le_meme_nom_que_sa_classe_principale.lc_ldap(*args, **kwargs)
def lc_ldap_admin(*args, **kwargs):
"""Renvoie une connexion LDAP à la vraie base, en admin.
Possible seulement si on peut lire secrets.py
"""
kwargs["uri"] = 'ldap://ldap.adm.crans.org/'
kwargs["dn"] = secrets.get('ldap_auth_dn')
kwargs["cred"] = secrets.get('ldap_password')
kwargs.setdefault("user", current_user)
return module_qui_a_le_meme_nom_que_sa_classe_principale.lc_ldap(*args, **kwargs)
def lc_ldap_readonly(*args, **kwargs):
"""Connexion LDAP à la vraie base, en readonly.
Possible seulement si on peut lire secrets.py
"""
kwargs["uri"] = 'ldap://ldap.adm.crans.org/'
kwargs["dn"] = secrets.get('ldap_readonly_auth_dn')
kwargs["cred"] = secrets.get('ldap_readonly_password')
kwargs.setdefault("user", current_user)
return module_qui_a_le_meme_nom_que_sa_classe_principale.lc_ldap(*args, **kwargs)
def lc_ldap_local(*args, **kwargs):
"""Connexion LDAP en lecture seule sur la base locale.
L'idée est que les machines avec un réplica bossent
avec elles-mêmes pour la lecture, pas avec vert.
Attention, les accès internes en lecture seule
ou avec une socket ldapi semblent moins prioritaires
qu'avec cn=admin. Ne pas utiliser cette fonction
si vous souhaitez faire beaucoup de recherches
indépendantes (c'est le temps d'accès à la socket
qui est problématique).
"""
if os.path.exists('/var/run/slapd/ldapi'):
ro_uri = 'ldapi://%2fvar%2frun%2fslapd%2fldapi/'
elif os.path.exists('/var/run/ldapi'):
ro_uri = 'ldapi://%2fvar%2frun%2fldapi/'
else:
ro_uri = 'ldap://127.0.0.1'
kwargs["uri"] = ro_uri
if not kwargs.has_key('dn'):
kwargs['dn'] = secrets.get('ldap_readonly_auth_dn')
if not kwargs.has_key('cred'):
kwargs['cred'] = secrets.get('ldap_readonly_password')
kwargs.setdefault("user", current_user)
return module_qui_a_le_meme_nom_que_sa_classe_principale.lc_ldap(*args, **kwargs)
def lc_ldap_anonymous(*args, **kwargs):
kwargs.update({
'user': None,
'dn': None,
'cred': None,
})
return lc_ldap_local(*args, **kwargs)
if DB_TEST_OVERRIDE:
lc_ldap_admin = lc_ldap_test
lc_ldap_readonly = lc_ldap_test
lc_ldap_local = lc_ldap_test
lc_ldap_anonymous = lc_ldap_test
class with_ldap_conn(object):
"""Décorateur (instance = decorator) pour les fonctions nécessitant une
connexion ldap. Rajoute un argument à la fonction (à la fin) pour passer
la connexion"""
#: fonction à appeler (sans argument) pour créer une connexion
constructor = None
#: nombre d'essais de rattrage de serveurs down, avant échec
retries = 0
#: délai entre deux essais (en secondes)
delay = 1
#: la connexion ldap
conn = None
def __init__(self, constructor=lc_ldap_local, retries=2, delay=1):
self.constructor = constructor
self.retries = retries
self.delay = delay
def apply(self, f, nargs, kargs):
attempt = 0
while True:
attempt += 1
try:
if not self.conn:
self.conn = self.constructor()
return f(*(nargs + (self.conn, )), **kargs)
except SERVER_DOWN:
if attempt >= self.retries:
raise
else:
print "one caught !"
self.conn = None
sleep(self.delay)
def __call__(self, f):
@functools.wraps(f)
def new_f(*nargs, **kargs):
return self.apply(f, nargs, kargs)
return new_f