
Cette méthode a l'air de moins planter pour trouver le nom de l'utilisateur, par exemple depuis un initscript qui a nettoyé drastiquement l'environnement. En réalité je me demande s'il ne vaudrait pas mieux utiliser uniquement celle-là… À méditer.
151 lines
5.4 KiB
Python
151 lines
5.4 KiB
Python
#!/usr/bin/env python
|
|
# -*- coding: utf-8 -*-
|
|
|
|
""" Raccourcis pour se connecter facilement à la base LDAP avec le binding lc_ldap. """
|
|
|
|
import sys
|
|
import os
|
|
|
|
import lc_ldap as module_qui_a_le_meme_nom_que_sa_classe_principale
|
|
import variables
|
|
|
|
import functools
|
|
from ldap import SERVER_DOWN
|
|
from time import sleep
|
|
|
|
#: Pour enregistrer dans l'historique, on a besoin de savoir qui exécute le script
|
|
#: Si le script a été exécuté via un sudo, la variable SUDO_USER (l'utilisateur qui a effectué le sudo)
|
|
#: est plus pertinente que USER (qui sera root)
|
|
#: À noter que Cron, par exemple, n'a ni USER ni SUDO_USER mais possède bien LOGNAME
|
|
import getpass
|
|
current_user = os.getenv("SUDO_USER") or os.getenv("USER") or os.getenv("LOGNAME") or getpass.getuser()
|
|
if isinstance(current_user, str):
|
|
current_user = current_user.decode("utf-8")
|
|
|
|
# Quand on a besoin du fichier de secrets
|
|
def import_secrets():
|
|
"""Importe le fichier de secrets."""
|
|
if not "/etc/crans/secrets/" in sys.path:
|
|
sys.path.append("/etc/crans/secrets/")
|
|
import secrets
|
|
return secrets
|
|
|
|
def lc_ldap(*args, **kwargs):
|
|
"""Renvoie une connexion à la base LDAP."""
|
|
return module_qui_a_le_meme_nom_que_sa_classe_principale.lc_ldap(*args, **kwargs)
|
|
|
|
def lc_ldap_test(*args, **kwargs):
|
|
"""Renvoie une connexion LDAP à la base de tests."""
|
|
module_qui_a_le_meme_nom_que_sa_classe_principale.cimetiere.cimetiere_root='/tmp/cimetiere_lc/'
|
|
try:
|
|
os.mkdir(module_qui_a_le_meme_nom_que_sa_classe_principale.cimetiere.cimetiere_root)
|
|
except OSError:
|
|
pass
|
|
# On impose le serveur
|
|
kwargs["uri"] = 'ldap://vo.adm.crans.org'
|
|
kwargs.setdefault("dn", 'cn=admin,dc=crans,dc=org')
|
|
# Le mot de passe de la base de test
|
|
kwargs.setdefault("cred", variables.ldap_test_password)
|
|
# On en a aussi besoin pour le lookup en readonly
|
|
kwargs.setdefault("readonly_dn", variables.readonly_dn)
|
|
kwargs.setdefault("readonly_password", variables.ldap_test_password)
|
|
kwargs.setdefault("user", current_user)
|
|
return module_qui_a_le_meme_nom_que_sa_classe_principale.lc_ldap(*args, **kwargs)
|
|
|
|
def lc_ldap_admin(*args, **kwargs):
|
|
"""Renvoie une connexion LDAP à la vraie base, en admin.
|
|
Possible seulement si on peut lire secrets.py
|
|
|
|
"""
|
|
secrets = import_secrets()
|
|
kwargs["uri"] = 'ldap://ldap.adm.crans.org/'
|
|
kwargs["dn"] = secrets.ldap_auth_dn
|
|
kwargs["cred"] = secrets.ldap_password
|
|
kwargs.setdefault("user", current_user)
|
|
return module_qui_a_le_meme_nom_que_sa_classe_principale.lc_ldap(*args, **kwargs)
|
|
|
|
def lc_ldap_readonly(*args, **kwargs):
|
|
"""Connexion LDAP à la vraie base, en readonly.
|
|
Possible seulement si on peut lire secrets.py
|
|
|
|
"""
|
|
secrets = import_secrets()
|
|
kwargs["uri"] = 'ldap://ldap.adm.crans.org/'
|
|
kwargs["dn"] = secrets.ldap_readonly_auth_dn
|
|
kwargs["cred"] = secrets.ldap_readonly_password
|
|
kwargs.setdefault("user", current_user)
|
|
return module_qui_a_le_meme_nom_que_sa_classe_principale.lc_ldap(*args, **kwargs)
|
|
|
|
def lc_ldap_local(*args, **kwargs):
|
|
"""Connexion LDAP en lecture seule sur la base locale.
|
|
L'idée est que les machines avec un réplica bossent
|
|
avec elles-mêmes pour la lecture, pas avec vert.
|
|
|
|
Attention, les accès internes en lecture seule
|
|
ou avec une socket ldapi semblent moins prioritaires
|
|
qu'avec cn=admin. Ne pas utiliser cette fonction
|
|
si vous souhaitez faire beaucoup de recherches
|
|
indépendantes (c'est le temps d'accès à la socket
|
|
qui est problématique).
|
|
|
|
"""
|
|
secrets = import_secrets()
|
|
auth_dn = secrets.ldap_readonly_auth_dn
|
|
auth_pw = secrets.ldap_readonly_password
|
|
if os.path.exists('/var/run/slapd/ldapi'):
|
|
ro_uri = 'ldapi://%2fvar%2frun%2fslapd%2fldapi/'
|
|
elif os.path.exists('/var/run/ldapi'):
|
|
ro_uri = 'ldapi://%2fvar%2frun%2fldapi/'
|
|
else:
|
|
ro_uri = 'ldap://127.0.0.1'
|
|
kwargs["uri"] = ro_uri
|
|
kwargs["dn"] = auth_dn
|
|
kwargs["cred"] = auth_pw
|
|
kwargs.setdefault("user", current_user)
|
|
return module_qui_a_le_meme_nom_que_sa_classe_principale.lc_ldap(*args, **kwargs)
|
|
|
|
|
|
class with_ldap_conn(object):
|
|
"""Décorateur (instance = decorator) pour les fonctions nécessitant une
|
|
connexion ldap. Rajoute un argument à la fonction (à la fin) pour passer
|
|
la connexion"""
|
|
|
|
#: fonction à appeler (sans argument) pour créer une connexion
|
|
constructor = None
|
|
|
|
#: nombre d'essais de rattrage de serveurs down, avant échec
|
|
retries = 0
|
|
|
|
#: délai entre deux essais (en secondes)
|
|
delay = 1
|
|
|
|
#: la connexion ldap
|
|
conn = None
|
|
|
|
def __init__(self, constructor=lc_ldap_local, retries=2, delay=1):
|
|
self.constructor = constructor
|
|
self.retries = retries
|
|
self.delay = delay
|
|
|
|
def apply(self, f, nargs, kargs):
|
|
attempt = 0
|
|
while True:
|
|
attempt += 1
|
|
try:
|
|
if not self.conn:
|
|
self.conn = self.constructor()
|
|
return f(*(nargs + (self.conn, )), **kargs)
|
|
except SERVER_DOWN:
|
|
if attempt >= self.retries:
|
|
raise
|
|
else:
|
|
print "one caught !"
|
|
self.conn = None
|
|
sleep(self.delay)
|
|
|
|
def __call__(self, f):
|
|
@functools.wraps(f)
|
|
def new_f(*nargs, **kargs):
|
|
return self.apply(f, nargs, kargs)
|
|
|
|
return new_f
|