scripts/gestion/tools/whosthere.py

343 lines
11 KiB
Python
Executable file

#!/bin/bash /usr/scripts/python.sh
# -*- coding: utf-8 -*-
'''Ce script permet de savoir s'il y a du monde dans un local donné,
à des fins de perms, et cie, filtre par membres actifs.'''
import sys
import collections
import os
import xml.dom.minidom
import subprocess
from socket import gethostname
from lc_ldap import shortcuts
from gestion.hptools import hpswitch, ConversationError
from gestion.affich_tools import cprint
import gestion.affichage as affichage
import gestion.mail as mail_module
from gestion.mail import generate
from utils.sendmail import actually_sendmail
# Constantes pour munin.
# L'ordre est important : il détermine comment sont empilées les valeurs
# dans le graphe (mode STACK). Les premières valeurs ont donc intérêts
# à avoir le moins de variations (empilées les premières)
STATE_DESCR = collections.OrderedDict([
('unknown_macs', ('machines inconnues de la base', 0xff0000)),
('crans', ('machines du crans', 0x0000ff)),
('ma', ('machines de membres actifs', 0x00ff00)),
('adh', ('autres machines appartenant aux autres adhérents', 0xe5ff00)),
])
DN_CLUB_CRANS = 'cid=35,'
DN_CLUB_BDE = 'cid=1,'
DN_CRANS = 'ou=data,dc=crans,dc=org'
WIFIMAP_DIR = os.getenv('DBG_WIFIMAP_DB', '/usr/scripts/var/wifi_xml')
# Par défaut, l'affichage est limité et on ne prévient pas des machines manquantes
privacy = True
warn = False
def missing_tpe():
"""Envoie un mail si le tpe n'est pas présent avec l'arg --warn-tpe"""
dst = 'respbats@crans.org'
mail = generate('missing_tpe', {'to': dst})
actually_sendmail('respbats@crans.org', [dst], mail)
def pretty_name(item):
"""Affiche un joli nom pour un objet ldap (adh ou machine)"""
v = ""
try:
nom = unicode(item['nom'][0])
try:
prenom = unicode(item['prenom'][0])
v = prenom + u" " + nom
#Si c'est un club
except KeyError:
v = nom
except KeyError:
nom = unicode(item['host'][0])
v = nom
except TypeError:
v = item
v = v.replace('.wifi.crans.org', ' (WiFi)')
v = v.replace('.adm.crans.org', '')
v = v.replace('.crans.org', '')
return v
def show_liste_by_prop(liste):
"""Récupère une liste de machines et affiche un tableau deux colonnes
avec propriétaire d'un côté et ses machines de l'autre."""
by_owner = dict()
for machine in liste:
# ldapcrans(déprécié) or lc_ldap
owner = (getattr(machine, 'proprietaire', None) or \
getattr(machine, 'proprio', None))()
if owner.dn not in by_owner:
by_owner[owner.dn] = [pretty_name(owner), []]
by_owner[owner.dn][1].append(pretty_name(machine))
for items in by_owner.itervalues():
items[1] = ", ".join(items[1])
print affichage.tableau(by_owner.values(), largeur=[None, '*'],
alignement=['g', 'g']).rstrip()
def show_liste(liste):
"""Affiche une liste d'objet ldap"""
print ", ".join(pretty_name(m) for m in liste)
def _mucode(u):
"""
Renvoie le bytestr associé à un unicode, avec l'encodage de munin.
Sad but true: munin ne fait pas d'utf-8 …"""
return u.encode('iso-8859-15', errors='ignore')
class WhosThere(object):
""": Nom du local, tel qu'il apparaît sur munin, et cie"""
name = u"Unamed Local"
""": Liste de macs et hostsname qui doivent être ignorées"""
expected = []
_ignore_inactive = True
_ignore_wifi_only = False
def set_ignore_inactive(self, ignore, wifi_only=None):
"""Définit si l'ajout d'une mac a effectivement lieu pour les
mac à ajouter si elles n'appartiennent pas à un membre actif"""
self._ignore_inactive = ignore
if wifi_only is not None:
self._ignore_wifi_only = wifi_only
def populate_from_mac(self, mac):
"""Rempli à partir de la mac"""
fm = self.db.search(u"macAddress=%s" % mac)
res = self._res
if fm:
m = fm[0]
proprio = m.proprio()
if proprio.dn == DN_CRANS or proprio.dn.startswith(DN_CLUB_CRANS):
key = 'crans'
elif proprio.dn.startswith(DN_CLUB_BDE):
key = 'bde'
else:
try:
droits = proprio['droits']
except KeyError:
droits = False
if droits:
key = 'ma'
else:
if self._ignore_inactive:
if unicode(m['objectClass'][0]) == u'machineWifi' or self._ignore_wifi_only:
return
key = 'adh'
res[key].append(m)
else:
res['unknown_macs'].append(mac)
def expected_machine(self):
"""Remplie la liste des machines qui devraient etre dans le local"""
current = self.query()
mach_manq = []
detected_mach = [machine['host'][0] for machine in current['crans'] + current['bde']]
for mach in self.expected:
if mach not in detected_mach:
mach_manq.append(mach)
if warn:
missing_tpe()
return mach_manq
def populate_from_switch(self, host, port):
"""Rempli les macs à partir de la prise d'un switch"""
sw = hpswitch(host)
try:
macs = sw.show_prise_mac(port)
except ConversationError:
cprint("Impossible de communiquer avec le switch !", 'rouge')
for mac in macs:
self.populate_from_mac(mac)
def populate_from_ap(self, host):
"""Rempli les macs à partir de la prise d'un switch"""
path = os.path.join(WIFIMAP_DIR, 'alone', host + '.xml')
with open(path, 'r') as f:
doc = xml.dom.minidom.parse(f)
for mac in doc.getElementsByTagName('mac'):
self.populate_from_mac(mac.firstChild.nodeValue)
def populate_from_tty(self):
res = self._res
p = subprocess.check_output(["/usr/bin/w"])
for line in p.split("\n"):
if "gdm-session" in line:
res['ttyfound'].append(line)
def do_scan(self):
"""Fonction à surcharger pour remplir la liste de personnes présentes.
La fonction pourra faire appel à populate_from_*"""
pass # à surcharger
_res = None
def query(self):
if self._res:
return self._res
self._res = {
'ma': [],
'crans': [],
'adh': [],
'bde': [],
'unknown_macs': [],
'ttyfound': [],
}
self.db = shortcuts.lc_ldap_readonly()
self.do_scan()
return self._res
def summary(self):
u"""Réalise un joli aperçu de l'état donné en paramètre."""
current = self.query()
if current['ma']:
cprint('---=== Machines des membres actifs ===---', 'bleu')
show_liste_by_prop(current['ma'])
if hasattr(self, "tty_server"):
if gethostname()!=self.tty_server:
cprint(u'---=== Il faut executer ce script sur %s pour avoir les users logués ! ===---' % self.tty_server, 'rouge')
else:
if current['ttyfound']:
cprint('---=== W(ho) sur %s ===---' % self.tty_server, 'bleu')
for user in current["ttyfound"]:
cprint(user, 'jaune')
if current['ma'] or current['ttyfound']:
cprint("---=== Il y a du monde ===---", 'vert')
else:
cprint("---=== Il semble n'y avoir personne ... ===---", 'rouge')
if current['crans']:
cprint("---=== Machines Cr@ns ===---", 'bleu')
show_liste(current['crans'])
if current['bde']:
cprint("---=== Machines du BDE ===---", 'bleu')
show_liste(current['bde'])
if current['adh']:
cprint("---=== Machines d'adhérents ===---", 'bleu')
show_liste_by_prop(current['adh'])
for mac in current['unknown_macs']:
cprint("Machine inconnue: %s" % mac, 'rouge')
for machine_manquante in self.expected_machine():
cprint("Machine %s manquante !" % pretty_name(machine_manquante), 'rouge')
def munin_config(self):
"""Donne la configuration du graphe munin"""
munin_title = u"Fréquentation du local %s" % self.name
print """graph_title %s
graph_vlabel N
graph_category environnement""" % _mucode(munin_title)
for (name, (descr, color)) in STATE_DESCR.iteritems():
print """%(name)s.label %(descr)s
%(name)s.draw AREASTACK
%(name)s.colour %(color)06X""" % {
'name': name,
'descr': _mucode(descr),
'color': color,
}
# Dans le doute, n'affichons pas les adhérents
print "adh.graph no"
def munin_values(self):
res = self.query()
for name in STATE_DESCR.iterkeys():
print """%(name)s.value %(value)d\n""" % \
{'name': name, 'value': len(res[name])}
class WhoKfet(WhosThere):
name = u"Kfet"
expected = [
'kronos.wifi.crans.org',
'oison.crans.org',
'batk-0.crans.org',
'camera1.crans.org',
'camera2.crans.org',
'camera3.crans.org',
'camera4.crans.org',
'kfet.crans.org',
'impression-bureau.crans.org',
]
def do_scan(self):
self.set_ignore_inactive(privacy, wifi_only=privacy)
self.populate_from_switch('backbone.adm.crans.org', 21)
class Who2B(WhosThere):
name = u"2B"
expected = [
'terminal.crans.org',
'minigiga.adm.crans.org',
'mao.wifi.crans.org',
'tinybrother.adm.crans.org',
'vo.crans.org',
'freebox.crans.org',
]
def do_scan(self):
# Tous les gens au 2B sont supposés actifs (local technique quoi)
# mais on cache quand-même les personnes connectées en WiFi
self.set_ignore_inactive(privacy, wifi_only=privacy)
self.populate_from_switch('backbone.adm.crans.org', 33)
self.tty_server=u"vo"
self.populate_from_tty()
class WhoDAlembert(WhosThere):
name = u"D'Alembert (PR)"
expected = ['danae.wifi.crans.org']
def do_scan(self):
self.populate_from_ap('danae.wifi.crans.org')
class Who4J(WhosThere):
name = u"4J"
def do_scan(self):
self.set_ignore_inactive(privacy, wifi_only=privacy)
self.populate_from_switch('batj-3.adm.crans.org', 7)
self.tty_server=u"cochon"
self.populate_from_tty()
if __name__ == '__main__':
where = {
'dalembert': WhoDAlembert,
'2b': Who2B,
'kfet': WhoKfet,
'4j': Who4J,
}
if '--warn-tpe' in sys.argv:
warn=['tpe']
if '--all' in sys.argv:
cur_user = os.getenv("SUDO_USER") or os.getenv("USER")
if cur_user:
ldap = shortcuts.lc_ldap_readonly()
user = ldap.search(u'uid=%s' % cur_user)
if u'Nounou' in user[0]['droits']:
privacy = False
else:
cprint("Vous n'avez pas les droits requis, --all ignoré.", 'jaune')
for what in sys.argv[1:]:
try:
name = where[what.lower()]
except KeyError:
if what!='--all' and what!='--warn-tpe':
print "Usage: whosthere.py <local>\n Locaux : %s" % (", ".join(where.keys()))
sys.exit(1)
else:
continue
name().summary()