lc_ldap/shortcuts.py
Daniel STAN a9601b3660 shortcuts: utilisation de secrets_new
On retire la fonction import_secrets. Elle ne semble pas être utilisée
ailleurs.
Normalement, "/usr/scripts" est dans le PYTHONPATH, si ce  n'est pas le cas,
l'import va planter. En cas d'erreur, on essaie (temporairement) un second
import en rajoutant d'abord ce path, tout en crachant un truc sur stderr
(histoire de trouver les méchants scripts qui n'ont pas le path de base).
2014-01-14 20:37:37 +01:00

153 lines
5.6 KiB
Python

#!/usr/bin/env python
# -*- coding: utf-8 -*-
""" Raccourcis pour se connecter facilement à la base LDAP avec le binding lc_ldap. """
import sys
import os
import functools
from ldap import SERVER_DOWN
from time import sleep
# Import du fichier de secrets. Le kludge de path ci-dessous ne devrait plus
# avoir lieu dans le futur
try:
from gestion import secrets_new as secrets
except ImportError:
sys.stderr.write("lc_ldap shortcuts: shaa, cannot import secrets_new. " +
"try again with /usr/scripts/ in PYTHONPATH " +
"(argv: %s)\n" % " ".join(sys.argv))
sys.path.append("/usr/scripts")
from gestion import secrets_new as secrets
sys.path.pop()
import lc_ldap as module_qui_a_le_meme_nom_que_sa_classe_principale
import variables
#: Pour enregistrer dans l'historique, on a besoin de savoir qui exécute le script
#: Si le script a été exécuté via un sudo, la variable SUDO_USER (l'utilisateur qui a effectué le sudo)
#: est plus pertinente que USER (qui sera root)
#: À noter que Cron, par exemple, n'a ni USER ni SUDO_USER mais possède bien LOGNAME
import getpass
current_user = os.getenv("SUDO_USER") or os.getenv("USER") or os.getenv("LOGNAME") or getpass.getuser()
if isinstance(current_user, str):
current_user = current_user.decode("utf-8")
def lc_ldap(*args, **kwargs):
"""Renvoie une connexion à la base LDAP."""
return module_qui_a_le_meme_nom_que_sa_classe_principale.lc_ldap(*args, **kwargs)
def lc_ldap_test(*args, **kwargs):
"""Renvoie une connexion LDAP à la base de tests."""
module_qui_a_le_meme_nom_que_sa_classe_principale.cimetiere.cimetiere_root='/tmp/cimetiere_lc/'
try:
os.mkdir(module_qui_a_le_meme_nom_que_sa_classe_principale.cimetiere.cimetiere_root)
except OSError:
pass
# On impose le serveur
kwargs["uri"] = 'ldap://vo.adm.crans.org'
kwargs.setdefault("dn", 'cn=admin,dc=crans,dc=org')
# Le mot de passe de la base de test
kwargs.setdefault("cred", variables.ldap_test_password)
# On en a aussi besoin pour le lookup en readonly
kwargs.setdefault("readonly_dn", variables.readonly_dn)
kwargs.setdefault("readonly_password", variables.ldap_test_password)
kwargs.setdefault("user", current_user)
return module_qui_a_le_meme_nom_que_sa_classe_principale.lc_ldap(*args, **kwargs)
def lc_ldap_admin(*args, **kwargs):
"""Renvoie une connexion LDAP à la vraie base, en admin.
Possible seulement si on peut lire secrets.py
"""
kwargs["uri"] = 'ldap://ldap.adm.crans.org/'
kwargs["dn"] = secrets.get('ldap_auth_dn')
kwargs["cred"] = secrets.get('ldap_password')
kwargs.setdefault("user", current_user)
return module_qui_a_le_meme_nom_que_sa_classe_principale.lc_ldap(*args, **kwargs)
def lc_ldap_readonly(*args, **kwargs):
"""Connexion LDAP à la vraie base, en readonly.
Possible seulement si on peut lire secrets.py
"""
kwargs["uri"] = 'ldap://ldap.adm.crans.org/'
kwargs["dn"] = secrets.get('ldap_readonly_auth_dn')
kwargs["cred"] = secrets.get('ldap_readonly_password')
kwargs.setdefault("user", current_user)
return module_qui_a_le_meme_nom_que_sa_classe_principale.lc_ldap(*args, **kwargs)
def lc_ldap_local(*args, **kwargs):
"""Connexion LDAP en lecture seule sur la base locale.
L'idée est que les machines avec un réplica bossent
avec elles-mêmes pour la lecture, pas avec vert.
Attention, les accès internes en lecture seule
ou avec une socket ldapi semblent moins prioritaires
qu'avec cn=admin. Ne pas utiliser cette fonction
si vous souhaitez faire beaucoup de recherches
indépendantes (c'est le temps d'accès à la socket
qui est problématique).
"""
auth_dn = secrets.get('ldap_readonly_auth_dn')
auth_pw = secrets.get('ldap_readonly_password')
if os.path.exists('/var/run/slapd/ldapi'):
ro_uri = 'ldapi://%2fvar%2frun%2fslapd%2fldapi/'
elif os.path.exists('/var/run/ldapi'):
ro_uri = 'ldapi://%2fvar%2frun%2fldapi/'
else:
ro_uri = 'ldap://127.0.0.1'
kwargs["uri"] = ro_uri
kwargs["dn"] = auth_dn
kwargs["cred"] = auth_pw
kwargs.setdefault("user", current_user)
return module_qui_a_le_meme_nom_que_sa_classe_principale.lc_ldap(*args, **kwargs)
class with_ldap_conn(object):
"""Décorateur (instance = decorator) pour les fonctions nécessitant une
connexion ldap. Rajoute un argument à la fonction (à la fin) pour passer
la connexion"""
#: fonction à appeler (sans argument) pour créer une connexion
constructor = None
#: nombre d'essais de rattrage de serveurs down, avant échec
retries = 0
#: délai entre deux essais (en secondes)
delay = 1
#: la connexion ldap
conn = None
def __init__(self, constructor=lc_ldap_local, retries=2, delay=1):
self.constructor = constructor
self.retries = retries
self.delay = delay
def apply(self, f, nargs, kargs):
attempt = 0
while True:
attempt += 1
try:
if not self.conn:
self.conn = self.constructor()
return f(*(nargs + (self.conn, )), **kargs)
except SERVER_DOWN:
if attempt >= self.retries:
raise
else:
print "one caught !"
self.conn = None
sleep(self.delay)
def __call__(self, f):
@functools.wraps(f)
def new_f(*nargs, **kargs):
return self.apply(f, nargs, kargs)
return new_f