#! /usr/bin/env python # -*- coding: iso-8859-15 -*- ############################################################################### # parse_auth_log.py : Détecte les problèmes d'authentifications ############################################################################### # The authors of this code are # Jérémie Dimino # # Copyright (C) 2006 Jérémie Dimino # All rights reserved. # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA ############################################################################### import sys sys.path.append('/usr/scripts/gestion') from ldap_crans import crans_ldap from annuaires import chbre_prises, reverse from affich_tools import cprint import sys, datetime base = None # Correspondance prise -> chambre prise_chbre = {} aujourdhui = datetime.date.today() def trouve_chambre(bat, prise): """ Trouve la chambre associée à une prise """ if prise in ('????', 'EXT', 'CRA'): return prise if not prise_chbre.has_key(bat): prise_chbre[bat] = reverse(bat) chbre = prise_chbre[bat].get(prise) or prise_chbre[bat].get(prise+"-") if chbre: chbre = chbre[0] else: cprint('La prise %s%s est inconnue' % (bat, prise), 'rouge') chbre = "????" return chbre def trouve_prise(chbre): """ Trouve la prise associée à une chambre """ if chbre in ('EXT', '????', 'CRA'): return chbre else: bat = chbre[0].lower() prise = bat + chbre_prises[bat][chbre[1:]] if prise[-1] == '-': prise = prise[:-1] return prise def __calcul_max(table): """ Calcule les différents maxima (voir parse_auth_log pour plus de détails) """ nb_m = 0 nb_p = 0 for prise, erreur_prise in table.items(): nb = 0 nb_max = 0 for mac, erreur_mac in erreur_prise['macs'].items(): n = len(erreur_mac['lignes']) erreur_mac['nombre'] = n nb += n nb_max += n erreur_prise['nombre'] = nb erreur_prise['nombre_max'] = nb_max nb_m = max(nb_m, nb_max) nb_p = max(nb_p, nb) return nb_m, nb_p def __ajoute_ligne(errs, ligne, prise, mac, info=None): """ Ajoute une ligne dans le dico pour la prise et la mac donnée """ erreur_prise = errs.get(prise) if not erreur_prise: erreur_prise = { 'macs': {} } errs[prise] = erreur_prise erreur_mac = erreur_prise['macs'].get(mac) if not erreur_mac: erreur_mac = { 'lignes': [], 'info': info } erreur_prise['macs'][mac] = erreur_mac erreur_mac['lignes'].append(ligne) # Correspondance 'noms de mois' -> n° __mois_num = { 'Jan': 1, 'Feb': 2, 'Mar': 3, 'Apr': 4, 'May': 5, 'Jun': 6, 'Jul': 7, 'Aug': 8, 'Sep': 9, 'Oct': 10, 'Nov': 11, 'Dec': 12 } def parse_auth_log(fichier_log='/var/log/freeradius/radius_auth.log', jour=None): """ Retourne la liste des problèmes de connexions: retourne deux dicos, la première étant pour les macs qui n'ont pas été trouvées dans la base et la seconde pour les connexions qui ont eu lieu à partir d'une mauvaise prise les deux dictionnaires ont la structure suivante: - nombre_mac_max: nombre maximal d'erreurs trouvées pour une mac - nombre_prise_max: nombre maximal d'erreurs trouvées pour une prise - prises: dico associant à une prise la listes des erreurs à partir de cette prise: - nombre: nombre total d'erreurs survenues sur la prise - nombre_max: nombre maximal d'erreurs trouvéss pour une mac - macs: dico associant à une mac la liste des erreurs à partir de cette mac - nombre: le nombre d'erreurs survenues pour cette mac sur cette prise - lignes: la liste des lignes du fichier de logs où la mac apparaît sur cette prise - info: informations sur le propriétaire (vaut None pour la première table) - machine: la machine en question - prise: la prise de sa chambre - chambre: sa chambre - prop: le propriétaire Si jour est spécifié (de type datetime.date), toutes les connexions avant cette dates seront ignorées. """ try: log = open(fichier_log) except: cprint(u"Impossible d'ouvrir le fichier %s" % fichier_log, 'rouge') sys.exit(1) # Les macs non trouvées errs_inconnue = {} # Les pc connectés sur la mauvaise prise errs_prise = {} # Les recherches déjà effectuées sur les macs mac_machine = {} annee = None mois = None for ligne in log: champs = ligne.split() prise = champs[5] mac = champs[7] if jour != None: nouveau_mois = __mois_num[champs[0]] if annee == None: if nouveau_mois > aujourdhui.month: # Là on suppose qu'il s'agit de l'année dernière annee = aujourdhui.year - 1 else: annee = aujourdhui.year elif mois > nouveau_mois: # Là on suppose qu'on est passé à l'année suivante annee += 1 mois = nouveau_mois if datetime.date(annee, mois, int(champs[1])) < jour: continue info_mac = mac_machine.get(mac) if info_mac: machine = info_mac['machine'] else: machine = base.search('mac=' + mac)['machine'] info_mac = { 'machine': machine } mac_machine[mac] = info_mac if machine == []: __ajoute_ligne(errs_inconnue, ligne, prise, mac) else: info = info_mac.get('info_prop') if not info: prop = machine[0].proprietaire() chbre = prop.chbre() info = { 'machine': machine[0], 'prise': trouve_prise(chbre), 'chambre': chbre, 'prop': prop } info_mac['info_prop'] = info if prise != info['prise']: __ajoute_ligne(errs_prise, ligne, prise, mac, info) log.close() nb_inconnue_m, nb_inconnue_p = __calcul_max(errs_inconnue) nb_prise_m, nb_prise_p = __calcul_max(errs_prise) return { 'prises': errs_inconnue, 'nombre_mac_max': nb_inconnue_m, 'nombre_prise_max': nb_inconnue_p }, \ { 'prises': errs_prise, 'nombre_mac_max': nb_prise_m, 'nombre_prise_max': nb_prise_p } def affiche_erreurs(errs, message, min_mac=1, min_prise=1): """ Affiche les infos contenues dans un dico renvoyé par parse_auth_log """ if errs['nombre_mac_max'] >= min_mac and errs['nombre_prise_max'] >= min_prise: cprint(message, 'bleu') for prise, erreur_prise in errs['prises'].items(): if erreur_prise['nombre'] >= min_prise and erreur_prise['nombre_max'] >= min_mac: chambre = prise[0].upper() + trouve_chambre(prise[0], prise[1:]) cprint(u' prise %s (chambre %s)' % (prise, chambre), 'gras') for mac, erreur_mac in erreur_prise['macs'].items(): if erreur_mac['nombre'] >= min_mac: print ' %s (x%d)' % (erreur_mac['lignes'][-1].strip(), erreur_mac['nombre']) info = erreur_mac['info'] if info != None: prop = info['prop'] cprint(u' -> propriétaire de la machine: %s <%s>, prise %s (chambre %s)' % (prop.Nom(), prop.email(), info['prise'], info['chambre'])) def __usage(err=''): """ Message d'erreur """ if err : cprint(err, 'rouge') cprint(u"Tapez %s -h pour plus d'informations" % sys.argv[0].split('/')[-1].split('.')[0]) sys.exit(2) def __param_entier(opt, val): """ Récupère un entier passé en ligne de commande """ try: return int(val) except: __usage(u'La valeur du paramètre %s est incorecte (doit être un entier positif)' % opt) def __aide(): """ Aide """ cprint(u"""Usage: %s [OPTIONS] Parse les logs d'authentifications et affiche les erreurs trouvées (macs inconnues ou connexion d'une machine sur une prise autre que celle du propriétaire). Options: -h, --help affiche cette aide -l, --log fichier de log à parser (par défaut: /var/log/freeradius/radius_auth.log) -m, --min-mac nombre minimal d'occurrences d'une mac sur une prise pour qu'elle soit reportée -n, --min-prise nombre minimal d'erreurs sur une prise pour qu'elle soit reportée -i, --inconnue n'affiche que les erreurs de mac inconnues -p, --prise n'affiche que les connexions sur une prise autre que celle du propriétaire -d, --date ignorer totalement les lignes avant cette date peut être: - une date absolue sous la forme AAAAMMJJ - une date relative à aujourd'hui sous la forme + Par exemple pour ne considérer que les deux dernier jours: +2 Rapporter toutes anomalies à .""" % sys.argv[0].split('/')[-1].split('.')[0]) sys.exit(0) if __name__ == '__main__': fichier_log = '/var/log/freeradius/radius_auth.log' min_mac = 1 min_prise = 1 # Info que l'on veut afficher aff_inconnue = True aff_prise = True # Date de départ jour = None import getopt, time try: options, arg = getopt.getopt(sys.argv[1:], 'hipl:m:n:d:', [ 'help', 'log=', 'min-mac=', 'min-prise=', 'inconnue', 'prise', 'date=']) except getopt.error, msg: __usage(unicode(msg)) for opt, val in options: if opt in [ '-h', '--help' ]: __aide() elif opt in [ '-l', '--log-file' ]: fichier_log = val elif opt in [ '-m', '--min-mac' ]: min_mac = __param_entier(opt, val) elif opt in [ '-n', '--min-prise' ]: min_prise = __param_entier(opt, val) elif opt in [ '-i', '--inconnue' ]: aff_prise = 0 elif opt in [ '-p', '--prise' ]: aff_inconnue = 0 elif opt in [ '-d', '--date' ]: if val[0] == '+': jour = aujourdhui - datetime.timedelta(__param_entier(opt, val[1:])) else: try: jour = datetime.date(*(time.strptime(val, "%Y%m%d")[0:3])) except: __usage(u'La valeur du paramètre %s est incorecte (doit être de la forme AAAAMMJJ)' % opt) else: cprint(u'Option inconnue: %s' % opt, 'rouge') __usage() base = crans_ldap() errs_inconnue, errs_prise = parse_auth_log(fichier_log, jour) if aff_inconnue: affiche_erreurs(errs_inconnue, u"Pour les connexions suivantes la mac n'a pas été trouvée dans la base:", min_mac, min_prise) if aff_prise: affiche_erreurs(errs_prise, u"Les connexions suivantes ont eu lieu sur une prise autre que celle du proriétaire:", min_mac, min_prise)