scripts/surveillance/mac_prises/mac_prise_analyzer.py
2013-04-08 10:00:29 +02:00

167 lines
6.2 KiB
Python
Executable file
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python
# -*- coding: utf8 -*-
import psycopg2
import psycopg2.extras
import sys
import smtplib
import time
sys.path.append('/usr/scripts/gestion')
from config import mac_prise
from affich_tools import tableau
sys.path.append('/usr/scripts/lc_ldap')
import lc_ldap
ldap = lc_ldap.lc_ldap_local()
membres_actifs = ldap.search('(|(droits=Cableur)(droits=Nounou)(droits=Apprenti)(droits=Bureau))')
chambres_ma = []
for membre_actif in membres_actifs:
try:
chambres_ma.append(str(membre_actif['chbre'][0]).lower())
except:
pass
clubs = ldap.search('cid=*')
chambres_clubs = []
for club in clubs:
try:
chambres_clubs.append(str(club['chbre'][0]).lower())
except:
pass
conn = psycopg2.connect(user='crans', database='mac_prises')
conn.set_session(autocommit = True)
cur = conn.cursor(cursor_factory = psycopg2.extras.DictCursor)
requete = "SELECT * FROM signales WHERE date >= timestamp 'now' - interval '1 day';"
cur.execute(requete)
signales = cur.fetchall()
longueur = { 'mac': (24, 3),
'chambre': (10, 7),
}
titres = { 'mac':(u'mac', u'chambres'),
'chambre': (u'chambre', u'macs'),
}
alignements = ('c', 'c')
def lin(x, y, z):
"""
Calcul linéaire d'un rapport
"""
return (float(x)/float(y)-1.0)/(float(z)-1.0)
def genere_comptage(duree, groupe, associes):
"""
Grosse fonction de hack pour ne pas écrire six fois le même formatage de sortie bidon.
Prend en argument :
* duree, qui peut valoir 'instant', 'heuristique', ou 'journalier', permet d'importer
les variables de config qui vont bien
* groupe, qui précise selon quoi on groupe (mac ou chambre)
* associes, qui contient l'autre champ
"""
pb_comptage_suspect = {}
output = ""
requete = "SELECT array_to_string(array_agg(DISTINCT date), ', ') AS dates , %(groupe)s, array_to_string(array_agg(DISTINCT %(associes)s), ', ') AS %(associes)ss, COUNT(DISTINCT %(associes)s) AS nb_%(associes)ss_distinctes, COUNT(%(associes)s) AS nb_%(associes)ss, COUNT(DISTINCT date) as nb_dates_distinctes, COUNT(DISTINCT %(groupe)s) as nb_%(groupe)ss_distinctes FROM correspondance WHERE date >= timestamp 'now' - interval '%(delay)s' GROUP BY %(groupe)s;" % { 'groupe': groupe, 'associes': associes, 'delay': mac_prise.delay[duree] }
cur.execute(requete)
fetched = cur.fetchall()
for entry in fetched:
if groupe == "mac":
machines = ldap.search('(macAddress=%s)' % entry[groupe])
if len(machines) > 0:
if isinstance(machines[0], lc_ldap.machineWifi):
continue
else:
continue
if entry['nb_'+associes+'s_distinctes'] >= mac_prise.suspect[duree][groupe]:
Logs.append(u"Recherche par %s, entrée suspecte : %s -> %s : %s distinctes sur %s dates distinctes\n" % (groupe, entry[groupe], entry[associes+'s'], entry['nb_'+associes+'s'], entry['nb_dates_distinctes']))
liste_associes = entry[associes+'s'].split(', ')
# On calcule la "probabilité" qu'un truc ne soit pas clair concernant la chambre/mac
rapport = lin(entry['nb_'+associes+'s'], entry['nb_dates_distinctes'], float(entry['nb_'+associes+'s_distinctes']))
if rapport >= mac_prise.rapport_suspect[duree][groupe]:
Logs.append(u"Entrée ajoutée au tableau %s pour la recherche par %s, car rapport supérieur au seuil.\n\n" % (duree, groupe))
pb_comptage_suspect[entry[groupe]] = (liste_associes, rapport, mac_prise.rapport_suspect[duree][groupe])
else:
Logs.append(u"Entrée rejetée du tableau %s pour la recherche par %s, car rapport inférieur au seuil : (%s).\n\n" % (duree, groupe, rapport))
else:
pass
if len(pb_comptage_suspect) > 0:
output += mac_prise.titre_suspect[duree][groupe] + "\n"
# On prend la longueur de la plus longue valeur, on s'assure que cette longueur fait celle de la légende, plus un entier de marge
longueur_max = max([len(", ".join(a[0])) for a in pb_comptage_suspect.values()] + [longueur[associes][1]]) + 4
largeurs = (longueur[groupe][0], longueur_max, 11, 9)
titre = (titres[groupe][0], titres[groupe][1], "rapport", "seuil")
alignement = (alignements[0], alignements[1], 'c', 'c')
data = []
clefs = pb_comptage_suspect.keys()
clefs.sort()
for clef in clefs:
data.append([clef, ", ".join(pb_comptage_suspect[clef][0]), pb_comptage_suspect[clef][1], pb_comptage_suspect[clef][2]])
output += tableau(data, titre, largeurs, alignement)
output += u"\n\n\n"
return output
if __name__ == '__main__':
output = u"Détection de spoof potentiel\n\n\n"
coupure = len(output)
Logs = [u"Logs du script mac_prise_analyzer\n\n\n"]
output += genere_comptage('instant', 'mac', 'chambre')
output += genere_comptage('heuristique', 'mac', 'chambre')
output += genere_comptage('journalier', 'mac', 'chambre')
if time.localtime().tm_min % 30 == 0 and mac_prise.hargneux:
hargneux = True
else:
hargneux = False
if len(output) == coupure and not hargneux:
sys.exit(0)
message = """From: %(from)s
To: %(to)s
Subject: %(subject)s
Content-Type: multipart/mixed; boundary="_424234545aaff-ffca234efff-556adceff5646_"
--_424234545aaff-ffca234efff-556adceff5646_
Content-Type: text/plain; charset="UTF-8"
%(contenu)s
--
Script d'analyse mac_prise (en test)
--_424234545aaff-ffca234efff-556adceff5646_
Content-Type: text/plain; charset="UTF-8"
Content-Disposition: attachment; filename="logs_analyse"
%(logs)s
--_424234545aaff-ffca234efff-556adceff5646_--
"""
corps = message % { 'from': 'Spoofing watcher <spoof-watcher@crans.org>',
'to': 'test@lists.crans.org',
'subject': 'Analyse du spoofing',
'contenu': output,
'logs': "".join(Logs),
}
mail = smtplib.SMTP('localhost')
mailfrom = 'spoof-watcher@crans.org'
mailto = 'test@lists.crans.org'
mail.sendmail(mailfrom, mailto, corps.encode('utf-8'))