#!/bin/bash /usr/scripts/python.sh # -*- coding: utf-8 -*- '''Ce script permet de savoir s'il y a du monde dans un local donné, à des fins de perms, et cie, filtre par membres actifs.''' import sys import collections import os import xml.dom.minidom import subprocess from socket import gethostname from lc_ldap import shortcuts from gestion.hptools import hpswitch, ConversationError from gestion.affich_tools import cprint import gestion.affichage as affichage import gestion.mail as mail_module from gestion.mail import generate from utils.sendmail import actually_sendmail # Constantes pour munin. # L'ordre est important : il détermine comment sont empilées les valeurs # dans le graphe (mode STACK). Les premières valeurs ont donc intérêts # à avoir le moins de variations (empilées les premières) STATE_DESCR = collections.OrderedDict([ ('unknown_macs', ('machines inconnues de la base', 0xff0000)), ('crans', ('machines du crans', 0x0000ff)), ('ma', ('machines de membres actifs', 0x00ff00)), ('adh', ('autres machines appartenant aux autres adhérents', 0xe5ff00)), ]) DN_CLUB_CRANS = 'cid=35,' DN_CLUB_BDE = 'cid=1,' DN_CRANS = 'ou=data,dc=crans,dc=org' WIFIMAP_DIR = os.getenv('DBG_WIFIMAP_DB', '/usr/scripts/var/wifi_xml') # Par défaut, l'affichage est limité et on ne prévient pas des machines manquantes privacy = True warn = False def missing_tpe(): """Envoie un mail si le tpe n'est pas présent avec l'arg --warn-tpe""" dst = 'respbats@crans.org' mail = generate('missing_tpe', {'to': dst}) actually_sendmail('respbats@crans.org', [dst], mail) def pretty_name(item): """Affiche un joli nom pour un objet ldap (adh ou machine)""" v = "" try: nom = unicode(item['nom'][0]) prenom = unicode(item['prenom'][0]) v = prenom + u" " + nom except KeyError: nom = unicode(item['host'][0]) v = nom except TypeError: v = item v = v.replace('.wifi.crans.org', ' (WiFi)') v = v.replace('.adm.crans.org', '') v = v.replace('.crans.org', '') return v def show_liste_by_prop(liste): """Récupère une liste de machines et affiche un tableau deux colonnes avec propriétaire d'un côté et ses machines de l'autre.""" by_owner = dict() for machine in liste: # ldapcrans(déprécié) or lc_ldap owner = (getattr(machine, 'proprietaire', None) or \ getattr(machine, 'proprio', None))() if owner.dn not in by_owner: by_owner[owner.dn] = [pretty_name(owner), []] by_owner[owner.dn][1].append(pretty_name(machine)) for items in by_owner.itervalues(): items[1] = ", ".join(items[1]) print affichage.tableau(by_owner.values(), largeur=[None, '*'], alignement=['g', 'g']).rstrip() def show_liste(liste): """Affiche une liste d'objet ldap""" print ", ".join(pretty_name(m) for m in liste) def _mucode(u): """ Renvoie le bytestr associé à un unicode, avec l'encodage de munin. Sad but true: munin ne fait pas d'utf-8 …""" return u.encode('iso-8859-15', errors='ignore') class WhosThere(object): """: Nom du local, tel qu'il apparaît sur munin, et cie""" name = u"Unamed Local" """: Liste de macs et hostsname qui doivent être ignorées""" expected = [] _ignore_inactive = True _ignore_wifi_only = False def set_ignore_inactive(self, ignore, wifi_only=None): """Définit si l'ajout d'une mac a effectivement lieu pour les mac à ajouter si elles n'appartiennent pas à un membre actif""" self._ignore_inactive = ignore if wifi_only is not None: self._ignore_wifi_only = wifi_only def populate_from_mac(self, mac): """Rempli à partir de la mac""" fm = self.db.search(u"macAddress=%s" % mac) res = self._res if fm: m = fm[0] proprio = m.proprio() if proprio.dn == DN_CRANS or proprio.dn.startswith(DN_CLUB_CRANS): key = 'crans' elif proprio.dn.startswith(DN_CLUB_BDE): key = 'bde' else: try: droits = proprio['droits'] except KeyError: droits = False if droits: key = 'ma' else: if self._ignore_inactive: if unicode(m['objectClass'][0]) == u'machineWifi' or self._ignore_wifi_only: return key = 'adh' res[key].append(m) else: res['unknown_macs'].append(mac) def expected_machine(self): """Remplie la liste des machines qui devraient etre dans le local""" current = self.query() mach_manq = [] detected_mach = [machine['host'][0] for machine in current['crans'] + current['bde']] for mach in self.expected: if mach not in detected_mach: mach_manq.append(mach) if warn: missing_tpe() return mach_manq def populate_from_switch(self, host, port): """Rempli les macs à partir de la prise d'un switch""" sw = hpswitch(host) try: macs = sw.show_prise_mac(port) except ConversationError: cprint("Impossible de communiquer avec le switch !", 'rouge') for mac in macs: self.populate_from_mac(mac) def populate_from_ap(self, host): """Rempli les macs à partir de la prise d'un switch""" path = os.path.join(WIFIMAP_DIR, 'alone', host + '.xml') with open(path, 'r') as f: doc = xml.dom.minidom.parse(f) for mac in doc.getElementsByTagName('mac'): self.populate_from_mac(mac.firstChild.nodeValue) def populate_from_tty(self): res = self._res p = subprocess.check_output(["/usr/bin/w"]) for line in p.split("\n"): if "gdm-session" in line: res['ttyfound'].append(line) def do_scan(self): """Fonction à surcharger pour remplir la liste de personnes présentes. La fonction pourra faire appel à populate_from_*""" pass # à surcharger _res = None def query(self): if self._res: return self._res self._res = { 'ma': [], 'crans': [], 'adh': [], 'bde': [], 'unknown_macs': [], 'ttyfound': [], } self.db = shortcuts.lc_ldap_readonly() self.do_scan() return self._res def summary(self): u"""Réalise un joli aperçu de l'état donné en paramètre.""" current = self.query() if current['ma']: cprint('---=== Machines des membres actifs ===---', 'bleu') show_liste_by_prop(current['ma']) if hasattr(self, "tty_server"): if gethostname()!=self.tty_server: cprint(u'---=== Il faut executer ce script sur %s pour avoir les users logués ! ===---' % self.tty_server, 'rouge') else: if current['ttyfound']: cprint('---=== W(ho) sur %s ===---' % self.tty_server, 'bleu') for user in current["ttyfound"]: cprint(user, 'jaune') if current['ma'] or current['ttyfound']: cprint("---=== Il y a du monde ===---", 'vert') else: cprint("---=== Il semble n'y avoir personne ... ===---", 'rouge') if current['crans']: cprint("---=== Machines Cr@ns ===---", 'bleu') show_liste(current['crans']) if current['bde']: cprint("---=== Machines du BDE ===---", 'bleu') show_liste(current['bde']) if current['adh']: cprint("---=== Machines d'adhérents ===---", 'bleu') show_liste_by_prop(current['adh']) for mac in current['unknown_macs']: cprint("Machine inconnue: %s" % mac, 'rouge') for machine_manquante in self.expected_machine(): cprint("Machine %s manquante !" % pretty_name(machine_manquante), 'rouge') def munin_config(self): """Donne la configuration du graphe munin""" munin_title = u"Fréquentation du local %s" % self.name print """graph_title %s graph_vlabel N graph_category environnement""" % _mucode(munin_title) for (name, (descr, color)) in STATE_DESCR.iteritems(): print """%(name)s.label %(descr)s %(name)s.draw AREASTACK %(name)s.colour %(color)06X""" % { 'name': name, 'descr': _mucode(descr), 'color': color, } # Dans le doute, n'affichons pas les adhérents print "adh.graph no" def munin_values(self): res = self.query() for name in STATE_DESCR.iterkeys(): print """%(name)s.value %(value)d\n""" % \ {'name': name, 'value': len(res[name])} class WhoKfet(WhosThere): name = u"Kfet" expected = [ 'kronos.wifi.crans.org', 'oison.crans.org', 'batk-0.crans.org', 'camera1.crans.org', 'camera2.crans.org', 'camera3.crans.org', 'camera4.crans.org', 'kfet.crans.org', 'impression-bureau.crans.org', ] def do_scan(self): self.set_ignore_inactive(privacy, wifi_only=privacy) self.populate_from_switch('backbone.adm.crans.org', 21) class Who2B(WhosThere): name = u"2B" expected = [ 'terminal.crans.org', 'minigiga.adm.crans.org', 'mao.wifi.crans.org', 'tinybrother.adm.crans.org', 'vo.crans.org', 'freebox.crans.org', ] def do_scan(self): # Tous les gens au 2B sont supposés actifs (local technique quoi) # mais on cache quand-même les personnes connectées en WiFi self.set_ignore_inactive(privacy, wifi_only=privacy) self.populate_from_switch('backbone.adm.crans.org', 33) self.tty_server=u"vo" self.populate_from_tty() class WhoDAlembert(WhosThere): name = u"D'Alembert (PR)" expected = ['danae.wifi.crans.org'] def do_scan(self): self.populate_from_ap('danae.wifi.crans.org') class Who4J(WhosThere): name = u"4J" def do_scan(self): self.set_ignore_inactive(privacy, wifi_only=privacy) self.populate_from_switch('batj-3.adm.crans.org', 7) self.tty_server=u"cochon" self.populate_from_tty() if __name__ == '__main__': where = { 'dalembert': WhoDAlembert, '2b': Who2B, 'kfet': WhoKfet, '4j': Who4J, } if '--warn-tpe' in sys.argv: warn=['tpe'] if '--all' in sys.argv: cur_user = os.getenv("SUDO_USER") or os.getenv("USER") if cur_user: ldap = shortcuts.lc_ldap_readonly() user = ldap.search(u'uid=%s' % cur_user) if u'Nounou' in user[0]['droits']: privacy = False else: cprint("Vous n'avez pas les droits requis, --all ignoré.", 'jaune') for what in sys.argv[1:]: try: name = where[what.lower()] except KeyError: if what!='--all' and what!='--warn-tpe': print "Usage: whosthere.py \n Locaux : %s" % (", ".join(where.keys())) sys.exit(1) else: continue name().summary()