171 lines
6.2 KiB
Python
171 lines
6.2 KiB
Python
# -*- coding: utf-8 -*-
|
|
|
|
""" Raccourcis pour se connecter facilement à la base LDAP avec le binding lc_ldap. """
|
|
|
|
import sys
|
|
import os
|
|
|
|
import functools
|
|
from ldap import SERVER_DOWN
|
|
from time import sleep
|
|
|
|
# Import du fichier de secrets. Le kludge de path ci-dessous ne devrait plus
|
|
# avoir lieu dans le futur
|
|
try:
|
|
from gestion import secrets_new as secrets
|
|
except ImportError:
|
|
sys.stderr.write("lc_ldap shortcuts: shaa, cannot import secrets_new. " +
|
|
"try again with /usr/scripts in PYTHONPATH " +
|
|
"(argv: %s)\n" % " ".join(getattr(sys, 'argv', [])))
|
|
sys.path.append("/usr/scripts")
|
|
from gestion import secrets_new as secrets
|
|
sys.path.pop()
|
|
|
|
import lc_ldap as module_qui_a_le_meme_nom_que_sa_classe_principale
|
|
import variables
|
|
|
|
#: Pour enregistrer dans l'historique, on a besoin de savoir qui exécute le script
|
|
#: Si le script a été exécuté via un sudo, la variable SUDO_USER (l'utilisateur qui a effectué le sudo)
|
|
#: est plus pertinente que USER (qui sera root)
|
|
#: À noter que Cron, par exemple, n'a ni USER ni SUDO_USER mais possède bien LOGNAME
|
|
import getpass
|
|
current_user = os.getenv("SUDO_USER") or os.getenv("USER") or os.getenv("LOGNAME") or getpass.getuser()
|
|
if isinstance(current_user, str):
|
|
current_user = current_user.decode("utf-8")
|
|
|
|
#: Les racourcis définis plus bas pointent tous vers lc_ldap_test en mode
|
|
# debug
|
|
DB_TEST_OVERRIDE = os.getenv('DBG_LDAP', None)
|
|
if DB_TEST_OVERRIDE == '1':
|
|
DB_TEST_OVERRIDE = 'localhost'
|
|
|
|
def lc_ldap(*args, **kwargs):
|
|
"""Renvoie une connexion à la base LDAP."""
|
|
return module_qui_a_le_meme_nom_que_sa_classe_principale.lc_ldap(*args, **kwargs)
|
|
|
|
def lc_ldap_test(*args, **kwargs):
|
|
"""Renvoie une connexion LDAP à la base de tests."""
|
|
module_qui_a_le_meme_nom_que_sa_classe_principale.cimetiere.cimetiere_root='/tmp/cimetiere_lc_%s/' % (os.getenv("USER") or os.getenv("LOGNAME"))
|
|
try:
|
|
os.mkdir(module_qui_a_le_meme_nom_que_sa_classe_principale.cimetiere.cimetiere_root)
|
|
except OSError:
|
|
pass
|
|
# On impose le serveur
|
|
kwargs["uri"] = 'ldap://%s' % (DB_TEST_OVERRIDE or 'localhost')
|
|
kwargs.setdefault("dn", 'cn=admin,dc=crans,dc=org')
|
|
# Le mot de passe de la base de test
|
|
kwargs.setdefault("cred", variables.ldap_test_password)
|
|
# On en a aussi besoin pour le lookup en readonly
|
|
kwargs.setdefault("readonly_dn", variables.readonly_dn)
|
|
kwargs.setdefault("readonly_password", variables.ldap_test_password)
|
|
kwargs.setdefault("user", current_user)
|
|
return module_qui_a_le_meme_nom_que_sa_classe_principale.lc_ldap(*args, **kwargs)
|
|
|
|
def lc_ldap_admin(*args, **kwargs):
|
|
"""Renvoie une connexion LDAP à la vraie base, en admin.
|
|
Possible seulement si on peut lire secrets.py
|
|
|
|
"""
|
|
kwargs["uri"] = 'ldap://ldap.adm.crans.org/'
|
|
kwargs["dn"] = secrets.get('ldap_auth_dn')
|
|
kwargs["cred"] = secrets.get('ldap_password')
|
|
kwargs.setdefault("user", current_user)
|
|
return module_qui_a_le_meme_nom_que_sa_classe_principale.lc_ldap(*args, **kwargs)
|
|
|
|
def lc_ldap_readonly(*args, **kwargs):
|
|
"""Connexion LDAP à la vraie base, en readonly.
|
|
Possible seulement si on peut lire secrets.py
|
|
|
|
"""
|
|
kwargs["uri"] = 'ldap://ldap.adm.crans.org/'
|
|
kwargs["dn"] = secrets.get('ldap_readonly_auth_dn')
|
|
kwargs["cred"] = secrets.get('ldap_readonly_password')
|
|
kwargs.setdefault("user", current_user)
|
|
return module_qui_a_le_meme_nom_que_sa_classe_principale.lc_ldap(*args, **kwargs)
|
|
|
|
def lc_ldap_local(*args, **kwargs):
|
|
"""Connexion LDAP en lecture seule sur la base locale.
|
|
L'idée est que les machines avec un réplica bossent
|
|
avec elles-mêmes pour la lecture, pas avec vert.
|
|
|
|
Attention, les accès internes en lecture seule
|
|
ou avec une socket ldapi semblent moins prioritaires
|
|
qu'avec cn=admin. Ne pas utiliser cette fonction
|
|
si vous souhaitez faire beaucoup de recherches
|
|
indépendantes (c'est le temps d'accès à la socket
|
|
qui est problématique).
|
|
|
|
"""
|
|
if os.path.exists('/var/run/slapd/ldapi'):
|
|
ro_uri = 'ldapi://%2fvar%2frun%2fslapd%2fldapi/'
|
|
elif os.path.exists('/var/run/ldapi'):
|
|
ro_uri = 'ldapi://%2fvar%2frun%2fldapi/'
|
|
else:
|
|
ro_uri = 'ldap://127.0.0.1'
|
|
kwargs["uri"] = ro_uri
|
|
if not kwargs.has_key('dn'):
|
|
kwargs['dn'] = secrets.get('ldap_readonly_auth_dn')
|
|
if not kwargs.has_key('cred'):
|
|
kwargs['cred'] = secrets.get('ldap_readonly_password')
|
|
kwargs.setdefault("user", current_user)
|
|
return module_qui_a_le_meme_nom_que_sa_classe_principale.lc_ldap(*args, **kwargs)
|
|
|
|
def lc_ldap_anonymous(*args, **kwargs):
|
|
kwargs.update({
|
|
'user': None,
|
|
'dn': None,
|
|
'cred': None,
|
|
})
|
|
return lc_ldap_local(*args, **kwargs)
|
|
|
|
if DB_TEST_OVERRIDE:
|
|
lc_ldap = lc_ldap_test
|
|
lc_ldap_admin = lc_ldap_test
|
|
lc_ldap_readonly = lc_ldap_test
|
|
lc_ldap_local = lc_ldap_test
|
|
lc_ldap_anonymous = lc_ldap_test
|
|
|
|
class with_ldap_conn(object):
|
|
"""Décorateur (instance = decorator) pour les fonctions nécessitant une
|
|
connexion ldap. Rajoute un argument à la fonction (à la fin) pour passer
|
|
la connexion"""
|
|
|
|
#: fonction à appeler (sans argument) pour créer une connexion
|
|
constructor = None
|
|
|
|
#: nombre d'essais de rattrage de serveurs down, avant échec
|
|
retries = 0
|
|
|
|
#: délai entre deux essais (en secondes)
|
|
delay = 1
|
|
|
|
#: la connexion ldap
|
|
conn = None
|
|
|
|
def __init__(self, constructor=lc_ldap_local, retries=2, delay=1):
|
|
self.constructor = constructor
|
|
self.retries = retries
|
|
self.delay = delay
|
|
|
|
def apply(self, f, nargs, kargs):
|
|
attempt = 0
|
|
while True:
|
|
attempt += 1
|
|
try:
|
|
if not self.conn:
|
|
self.conn = self.constructor()
|
|
return f(*(nargs + (self.conn, )), **kargs)
|
|
except SERVER_DOWN:
|
|
if attempt >= self.retries:
|
|
raise
|
|
else:
|
|
print "one caught !"
|
|
self.conn = None
|
|
sleep(self.delay)
|
|
|
|
def __call__(self, f):
|
|
@functools.wraps(f)
|
|
def new_f(*nargs, **kargs):
|
|
return self.apply(f, nargs, kargs)
|
|
|
|
return new_f
|