lc_ldap/shortcuts.py
2013-10-11 23:32:32 +02:00

145 lines
5.1 KiB
Python

#!/usr/bin/env python
# -*- coding: utf-8 -*-
""" Raccourcis pour se connecter facilement à la base LDAP avec le binding lc_ldap. """
import sys
import os.path
import lc_ldap as module_qui_a_le_meme_nom_que_sa_classe_principale
import variables
import functools
from ldap import SERVER_DOWN
from time import sleep
#: Pour enregistrer dans l'historique, on a besoin de savoir qui exécute le script
#: Si le script a été exécuté via un sudo, la variable SUDO_USER (l'utilisateur qui a effectué le sudo)
#: est plus pertinente que USER (qui sera root)
#: À noter que Cron, par exemple, n'a ni USER ni SUDO_USER mais possède bien LOGNAME
current_user = os.getenv("SUDO_USER") or os.getenv("USER") or os.getenv("LOGNAME")
if isinstance(current_user, str):
current_user = current_user.decode("utf-8")
# Quand on a besoin du fichier de secrets
def import_secrets():
"""Importe le fichier de secrets."""
if not "/etc/crans/secrets/" in sys.path:
sys.path.append("/etc/crans/secrets/")
import secrets
return secrets
def lc_ldap(*args, **kwargs):
"""Renvoie une connexion à la base LDAP."""
return module_qui_a_le_meme_nom_que_sa_classe_principale.lc_ldap(*args, **kwargs)
def lc_ldap_test(*args, **kwargs):
"""Renvoie une connexion LDAP à la base de tests."""
# On impose le serveur
kwargs["uri"] = 'ldap://vo.adm.crans.org'
kwargs.setdefault("dn", 'cn=admin,dc=crans,dc=org')
# Le mot de passe de la base de test
kwargs.setdefault("cred", variables.ldap_test_password)
# On en a aussi besoin pour le lookup en readonly
kwargs.setdefault("readonly_dn", variables.readonly_dn)
kwargs.setdefault("readonly_password", variables.ldap_test_password)
kwargs.setdefault("user", current_user)
return module_qui_a_le_meme_nom_que_sa_classe_principale.lc_ldap(*args, **kwargs)
def lc_ldap_admin(*args, **kwargs):
"""Renvoie une connexion LDAP à la vraie base, en admin.
Possible seulement si on peut lire secrets.py
"""
secrets = import_secrets()
kwargs["uri"] = 'ldap://ldap.adm.crans.org/'
kwargs["dn"] = secrets.ldap_auth_dn
kwargs["cred"] = secrets.ldap_password
kwargs.setdefault("user", current_user)
return module_qui_a_le_meme_nom_que_sa_classe_principale.lc_ldap(*args, **kwargs)
def lc_ldap_readonly(*args, **kwargs):
"""Connexion LDAP à la vraie base, en readonly.
Possible seulement si on peut lire secrets.py
"""
secrets = import_secrets()
kwargs["uri"] = 'ldap://ldap.adm.crans.org/'
kwargs["dn"] = secrets.ldap_readonly_auth_dn
kwargs["cred"] = secrets.ldap_readonly_password
kwargs.setdefault("user", current_user)
return module_qui_a_le_meme_nom_que_sa_classe_principale.lc_ldap(*args, **kwargs)
def lc_ldap_local(*args, **kwargs):
"""Connexion LDAP en lecture seule sur la base locale.
L'idée est que les machines avec un réplica bossent
avec elles-mêmes pour la lecture, pas avec vert.
Attention, les accès internes en lecture seule
ou avec une socket ldapi semblent moins prioritaires
qu'avec cn=admin. Ne pas utiliser cette fonction
si vous souhaitez faire beaucoup de recherches
indépendantes (c'est le temps d'accès à la socket
qui est problématique).
"""
secrets = import_secrets()
auth_dn = secrets.ldap_readonly_auth_dn
auth_pw = secrets.ldap_readonly_password
if os.path.exists('/var/run/slapd/ldapi'):
ro_uri = 'ldapi://%2fvar%2frun%2fslapd%2fldapi/'
elif os.path.exists('/var/run/ldapi'):
ro_uri = 'ldapi://%2fvar%2frun%2fldapi/'
else:
ro_uri = 'ldap://127.0.0.1'
kwargs["uri"] = ro_uri
kwargs["dn"] = auth_dn
kwargs["cred"] = auth_pw
kwargs.setdefault("user", current_user)
return module_qui_a_le_meme_nom_que_sa_classe_principale.lc_ldap(*args, **kwargs)
class with_ldap_conn(object):
"""Décorateur (instance = decorator) pour les fonctions nécessitant une
connexion ldap. Rajoute un argument à la fonction (à la fin) pour passer
la connexion"""
#: fonction à appeler (sans argument) pour créer une connexion
constructor = None
#: nombre d'essais de rattrage de serveurs down, avant échec
retries = 0
#: délai entre deux essais (en secondes)
delay = 1
#: la connexion ldap
conn = None
def __init__(self, constructor=lc_ldap_local, retries=2, delay=1):
self.constructor = constructor
self.retries = retries
self.delay = delay
def apply(self, f, nargs, kargs):
attempt = 0
while True:
attempt += 1
try:
if not self.conn:
self.conn = self.constructor()
return f(*(nargs + (self.conn, )), **kargs)
except SERVER_DOWN:
if attempt >= self.retries:
raise
else:
print "one caught !"
self.conn = None
sleep(self.delay)
def __call__(self, f):
@functools.wraps(f)
def new_f(*nargs, **kargs):
return self.apply(f, nargs, kargs)
return new_f