scripts/surveillance/parse_auth_log.py
glondu d63f6384d3 Oubli d'un cas particulier...
darcs-hash:20071028124207-68412-7eb9343833ab96fbfa5af9f3d899999b6981a074.gz
2007-10-28 13:42:07 +01:00

331 lines
11 KiB
Python
Executable file

#! /usr/bin/env python
# -*- coding: iso-8859-15 -*-
###############################################################################
# parse_auth_log.py : Détecte les problèmes d'authentifications
###############################################################################
# The authors of this code are
# Jérémie Dimino <dimino@crans.org>
#
# Copyright (C) 2006 Jérémie Dimino
# All rights reserved.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA
###############################################################################
import sys
sys.path.append('/usr/scripts/gestion')
from ldap_crans import crans_ldap
from annuaires import chbre_prises, reverse
from affich_tools import cprint
import sys, datetime
base = None
# Correspondance prise -> chambre
prise_chbre = {}
aujourdhui = datetime.date.today()
def trouve_chambre(bat, prise):
""" Trouve la chambre associée à une prise """
if prise in ('????', 'EXT', 'CRA'):
return prise
if not prise_chbre.has_key(bat):
prise_chbre[bat] = reverse(bat)
chbre = prise_chbre[bat].get(prise) or prise_chbre[bat].get(prise+"-")
if chbre:
chbre = chbre[0]
else:
cprint('La prise %s%s est inconnue' % (bat, prise), 'rouge')
chbre = "????"
return chbre
def trouve_prise(chbre):
""" Trouve la prise associée à une chambre """
if chbre in ('EXT', '????', 'CRA'):
return chbre
else:
bat = chbre[0].lower()
prise = bat + chbre_prises[bat][chbre[1:]]
if prise[-1] == '-':
prise = prise[:-1]
return prise
def __calcul_max(table):
""" Calcule les différents maxima (voir parse_auth_log pour plus de détails) """
nb_m = 0
nb_p = 0
for prise, erreur_prise in table.items():
nb = 0
nb_max = 0
for mac, erreur_mac in erreur_prise['macs'].items():
n = len(erreur_mac['lignes'])
erreur_mac['nombre'] = n
nb += n
nb_max += n
erreur_prise['nombre'] = nb
erreur_prise['nombre_max'] = nb_max
nb_m = max(nb_m, nb_max)
nb_p = max(nb_p, nb)
return nb_m, nb_p
def __ajoute_ligne(errs, ligne, prise, mac, info=None):
""" Ajoute une ligne dans le dico pour la prise et la mac donnée """
erreur_prise = errs.get(prise)
if not erreur_prise:
erreur_prise = { 'macs': {} }
errs[prise] = erreur_prise
erreur_mac = erreur_prise['macs'].get(mac)
if not erreur_mac:
erreur_mac = { 'lignes': [], 'info': info }
erreur_prise['macs'][mac] = erreur_mac
erreur_mac['lignes'].append(ligne)
# Correspondance 'noms de mois' -> n°
__mois_num = { 'Jan': 1, 'Feb': 2, 'Mar': 3, 'Apr': 4, 'May': 5, 'Jun': 6,
'Jul': 7, 'Aug': 8, 'Sep': 9, 'Oct': 10, 'Nov': 11, 'Dec': 12 }
def parse_auth_log(fichier_log='/var/log/freeradius/radius_auth.log', jour=None):
""" Retourne la liste des problèmes de connexions: retourne deux dicos, la première étant pour les macs qui n'ont pas été trouvées dans la base et la seconde pour les connexions qui ont eu lieu à partir d'une mauvaise prise
les deux dictionnaires ont la structure suivante:
- nombre_mac_max: nombre maximal d'erreurs trouvées pour une mac
- nombre_prise_max: nombre maximal d'erreurs trouvées pour une prise
- prises: dico associant à une prise la listes des erreurs à partir de cette prise:
- nombre: nombre total d'erreurs survenues sur la prise
- nombre_max: nombre maximal d'erreurs trouvéss pour une mac
- macs: dico associant à une mac la liste des erreurs à partir de cette mac
- nombre: le nombre d'erreurs survenues pour cette mac sur cette prise
- lignes: la liste des lignes du fichier de logs où la mac apparaît sur cette prise
- info: informations sur le propriétaire (vaut None pour la première table)
- machine: la machine en question
- prise: la prise de sa chambre
- chambre: sa chambre
- prop: le propriétaire
Si jour est spécifié (de type datetime.date), toutes les connexions avant cette dates seront ignorées.
"""
try:
log = open(fichier_log)
except:
cprint(u"Impossible d'ouvrir le fichier %s" % fichier_log, 'rouge')
sys.exit(1)
# Les macs non trouvées
errs_inconnue = {}
# Les pc connectés sur la mauvaise prise
errs_prise = {}
# Les recherches déjà effectuées sur les macs
mac_machine = {}
annee = None
mois = None
for ligne in log:
champs = ligne.split()
prise = champs[5]
mac = champs[7]
if jour != None:
nouveau_mois = __mois_num[champs[0]]
if annee == None:
if nouveau_mois > aujourdhui.month:
# Là on suppose qu'il s'agit de l'année dernière
annee = aujourdhui.year - 1
else:
annee = aujourdhui.year
elif mois > nouveau_mois:
# Là on suppose qu'on est passé à l'année suivante
annee += 1
mois = nouveau_mois
if datetime.date(annee, mois, int(champs[1])) < jour:
continue
info_mac = mac_machine.get(mac)
if info_mac:
machine = info_mac['machine']
else:
machine = base.search('mac=' + mac)['machine']
info_mac = { 'machine': machine }
mac_machine[mac] = info_mac
if machine == []:
__ajoute_ligne(errs_inconnue, ligne, prise, mac)
else:
info = info_mac.get('info_prop')
if not info:
prop = machine[0].proprietaire()
chbre = prop.chbre()
info = { 'machine': machine[0],
'prise': trouve_prise(chbre),
'chambre': chbre,
'prop': prop }
info_mac['info_prop'] = info
if prise != info['prise']:
__ajoute_ligne(errs_prise, ligne, prise, mac, info)
log.close()
nb_inconnue_m, nb_inconnue_p = __calcul_max(errs_inconnue)
nb_prise_m, nb_prise_p = __calcul_max(errs_prise)
return { 'prises': errs_inconnue,
'nombre_mac_max': nb_inconnue_m,
'nombre_prise_max': nb_inconnue_p }, \
{ 'prises': errs_prise,
'nombre_mac_max': nb_prise_m,
'nombre_prise_max': nb_prise_p }
def affiche_erreurs(errs, message, min_mac=1, min_prise=1):
""" Affiche les infos contenues dans un dico renvoyé par parse_auth_log """
if errs['nombre_mac_max'] >= min_mac and errs['nombre_prise_max'] >= min_prise:
cprint(message, 'bleu')
for prise, erreur_prise in errs['prises'].items():
if erreur_prise['nombre'] >= min_prise and erreur_prise['nombre_max'] >= min_mac:
chambre = prise[0].upper() + trouve_chambre(prise[0], prise[1:])
cprint(u' prise %s (chambre %s)' % (prise, chambre), 'gras')
for mac, erreur_mac in erreur_prise['macs'].items():
if erreur_mac['nombre'] >= min_mac:
print ' %s (x%d)' % (erreur_mac['lignes'][-1].strip(),
erreur_mac['nombre'])
info = erreur_mac['info']
if info != None:
prop = info['prop']
cprint(u' -> propriétaire de la machine: %s <%s>, prise %s (chambre %s)' % (prop.Nom(), prop.email(), info['prise'], info['chambre']))
def __usage(err=''):
""" Message d'erreur """
if err : cprint(err, 'rouge')
cprint(u"Tapez %s -h pour plus d'informations" % sys.argv[0].split('/')[-1].split('.')[0])
sys.exit(2)
def __param_entier(opt, val):
""" Récupère un entier passé en ligne de commande """
try:
return int(val)
except:
__usage(u'La valeur du paramètre %s est incorecte (doit être un entier positif)' % opt)
def __aide():
""" Aide """
cprint(u"""Usage: %s [OPTIONS]
Parse les logs d'authentifications et affiche les erreurs trouvées (macs inconnues ou connexion d'une machine sur une prise autre que celle du propriétaire).
Options:
-h, --help affiche cette aide
-l, --log <fichier> fichier de log à parser (par défaut: /var/log/freeradius/radius_auth.log)
-m, --min-mac <nombre> nombre minimal d'occurrences d'une mac sur une prise pour qu'elle soit reportée
-n, --min-prise <nombre> nombre minimal d'erreurs sur une prise pour qu'elle soit reportée
-i, --inconnue n'affiche que les erreurs de mac inconnues
-p, --prise n'affiche que les connexions sur une prise autre que celle du propriétaire
-d, --date <date> ignorer totalement les lignes avant cette date
<date> peut être:
- une date absolue sous la forme AAAAMMJJ
- une date relative à aujourd'hui sous la forme +<nombre de jour>
Par exemple pour ne considérer que les deux dernier jours: +2
Rapporter toutes anomalies à <dimino@crans.org>.""" % sys.argv[0].split('/')[-1].split('.')[0])
sys.exit(0)
if __name__ == '__main__':
fichier_log = '/var/log/freeradius/radius_auth.log'
min_mac = 1
min_prise = 1
# Info que l'on veut afficher
aff_inconnue = True
aff_prise = True
# Date de départ
jour = None
import getopt, time
try:
options, arg = getopt.getopt(sys.argv[1:], 'hipl:m:n:d:', [ 'help', 'log=', 'min-mac=', 'min-prise=', 'inconnue', 'prise', 'date='])
except getopt.error, msg:
__usage(unicode(msg))
for opt, val in options:
if opt in [ '-h', '--help' ]:
__aide()
elif opt in [ '-l', '--log-file' ]:
fichier_log = val
elif opt in [ '-m', '--min-mac' ]:
min_mac = __param_entier(opt, val)
elif opt in [ '-n', '--min-prise' ]:
min_prise = __param_entier(opt, val)
elif opt in [ '-i', '--inconnue' ]:
aff_prise = 0
elif opt in [ '-p', '--prise' ]:
aff_inconnue = 0
elif opt in [ '-d', '--date' ]:
if val[0] == '+':
jour = aujourdhui - datetime.timedelta(__param_entier(opt, val[1:]))
else:
try:
jour = datetime.date(*(time.strptime(val, "%Y%m%d")[0:3]))
except:
__usage(u'La valeur du paramètre %s est incorecte (doit être de la forme AAAAMMJJ)' % opt)
else:
cprint(u'Option inconnue: %s' % opt, 'rouge')
__usage()
base = crans_ldap()
errs_inconnue, errs_prise = parse_auth_log(fichier_log, jour)
if aff_inconnue:
affiche_erreurs(errs_inconnue, u"Pour les connexions suivantes la mac n'a pas été trouvée dans la base:", min_mac, min_prise)
if aff_prise:
affiche_erreurs(errs_prise, u"Les connexions suivantes ont eu lieu sur une prise autre que celle du proriétaire:", min_mac, min_prise)