[mac_prises] En place, avec script d'analyse.

Je commence à en avoir marre des gens qui spoofent. J'ai donc pris
en charge l'écriture de mac_prises cette nuit.
Je le mets en test vers la mailing list test@lists.crans.org, avis
aux amateurs.
This commit is contained in:
Pierre-Elliott Bécue 2013-02-01 06:15:34 +01:00
parent baf4aa3645
commit df49d0b6dc
6 changed files with 305 additions and 90 deletions

View file

@ -0,0 +1,156 @@
#!/usr/bin/env python
# -*- coding: utf8 -*-
import psycopg2
import psycopg2.extras
import sys
import smtplib
sys.path.append('/usr/scripts/gestion')
from config import mac_prise
from affich_tools import tableau
sys.path.append('/usr/scripts/lc_ldap')
import lc_ldap
ldap = lc_ldap.lc_ldap_admin()
membres_actifs = ldap.search('(|(droits=Cableur)(droits=Nounou)(droits=Apprenti)(droits=Bureau))')
chambres_ma = []
for membre_actif in membres_actifs:
try:
chambres_ma.append(str(membre_actif['chbre'][0]).lower())
except:
pass
clubs = ldap.search('cid=*')
chambres_clubs = []
for club in clubs:
try:
chambres_clubs.append(str(club['chbre'][0]).lower())
except:
pass
conn = psycopg2.connect(user='crans', database='mac_prises')
cur = conn.cursor(cursor_factory = psycopg2.extras.DictCursor)
longueur = { 'mac': (24, 3),
'chambre': (10, 7),
}
titres = { 'mac':(u'mac', u'chambres'),
'chambre': (u'chambre', u'macs'),
}
alignements = ('c', 'c')
def lin(x, y, z):
"""
Calcul linéaire d'un rapport
"""
return (float(x)/float(y)-1.0)/(float(z)-1.0)*100
def genere_comptage(duree, groupe, associes):
"""
Grosse fonction de hack pour ne pas écrire six fois le même formatage de sortie bidon.
Prend en argument :
* duree, qui peut valoir 'instant', 'heuristique', ou 'journalier', permet d'importer
les variables de config qui vont bien
* groupe, qui précise selon quoi on groupe (mac ou chambre)
* associes, qui contient l'autre champ
"""
pb_comptage_suspect = {}
pb_comptage_tres_suspect = {}
output = ""
requete = "SELECT array_to_string(array_agg(DISTINCT date), ',') AS dates , %(groupe)s, array_to_string(array_agg(DISTINCT %(associes)s), ',') AS %(associes)ss, COUNT(DISTINCT %(associes)s) AS nb_%(associes)ss_distinctes, COUNT(%(associes)s) AS nb_%(associes)ss, COUNT(DISTINCT date) as nb_dates_distinctes, COUNT(DISTINCT %(groupe)s) as nb_%(groupe)ss_distinctes FROM correspondance WHERE date >= timestamp 'now' - interval '%(delay)s' GROUP BY %(groupe)s;" % { 'groupe': groupe, 'associes': associes, 'delay': mac_prise.delay[duree] }
cur.execute(requete)
fetched = cur.fetchall()
for entry in fetched:
# Si c'est la chambre d'un membre actif ou d'un club, on droppe.
if groupe == 'chambre':
if entry[groupe] in chambres_ma + chambres_clubs:
continue
# Sinon, on vérifie si le local est bien rempli.
if entry['nb_'+associes+'s_distinctes'] >= mac_prise.tres_suspect[duree][groupe]:
liste_associes = entry[associes+'s'].split(',')
# On retire les machines associées à l'adhérent possédant la chambre
if groupe == 'chambre':
for i in liste_associes:
proprio_associe = ldap.search('macAddress=%s' % i)[0].proprio()
if str(proprio_associe['chbre']).lower() == entry[groupe]:
liste_associes.remove(i)
if len(liste_associes) < mac_prise.tres_suspect[duree][groupe]:
continue
# Toujours un problème ? On ajoute au dico
pb_comptage_tres_suspect[entry[groupe]] = liste_associes
# Même chose avec un seuil plus faible
elif entry['nb_'+associes+'s_distinctes'] >= mac_prise.suspect[duree][groupe]:
liste_associes = entry[associes+'s'].split(',')
if groupe == 'chambre':
for i in liste_associes:
try:
proprio_associe = ldap.search('macAddress=%s' % i)[0].proprio()
if str(proprio_associe['chbre'][0]).lower() == entry[groupe]:
liste_associes.remove(i)
except:
pass
if len(liste_associes) < mac_prise.suspect[duree][groupe]:
continue
# On calcul la "probabilité" qu'un truc ne soit pas clair concernant la chambre/mac
rapport = lin(entry['nb_'+associes+'s'], entry['nb_dates_distinctes'], float(entry['nb_'+associes+'s_distinctes']))
if rapport >= mac_prise.rapport_suspect[duree][groupe]:
pb_comptage_suspect[entry[groupe]] = liste_associes
else:
pass
else:
pass
if len(pb_comptage_suspect) > 0:
output += mac_prise.titre_suspect[duree][groupe]+"\n"
# On prend la longueur de la plus longue valeur, on s'assure que cette longueur fait celle de la légende, plus un entier de marge
longueur_max = max([len(", ".join(a)) for a in pb_comptage_suspect.values()] + [longueur[associes][1]]) + 4
largeurs = (longueur[groupe][0], longueur_max)
data = []
for clef, valeurs in pb_comptage_suspect.items():
data.append([clef, ", ".join(valeurs)])
output += tableau(data, titres[groupe], largeurs, alignements)
output += "\n\n\n"
if len(pb_comptage_tres_suspect) > 0:
output += mac_prise.titre_tres_suspect[duree][groupe]+"\n"
longueur_max = max([len(", ".join(a)) for a in pb_comptage_tres_suspect.values()] + [longueur[associes][1]]) + 4
largeurs = (longueur[groupe][0], longueur_max)
data = []
for clef, valeurs in pb_comptage_tres_suspect.items():
data.append([clef, ", ".join(valeurs)])
output += tableau(data, titres[groupe], largeurs, alignements)
output += "\n\n\n"
return output
if __name__ == '__main__':
output = u"Détection de spoof potentiel\n\n\n"
coupure = len(output)
output += genere_comptage('instant', 'mac', 'chambre')
output += genere_comptage('heuristique', 'mac', 'chambre')
output += genere_comptage('journalier', 'mac', 'chambre')
output += genere_comptage('instant', 'chambre', 'mac')
output += genere_comptage('heuristique', 'chambre', 'mac')
output += genere_comptage('journalier', 'chambre', 'mac')
if len(output) == coupure:
sys.exit(0)
print output