scripts/surveillance/dhcp-detect.py
Antoine Durand-Gasselin 4a68475e34 [wiki-lenny] suppression de static/
darcs-hash:20090314092631-bd074-b01256aeaf71e935851b3ecdbd623eaae8c9e8a1.gz
2009-03-14 10:26:31 +01:00

158 lines
6 KiB
Python

#! /usr/bin/env python
# -*- encoding: iso-8859-15 -*-
# Utilisation de scappy pour détecter un DHCP pirate.
# $Id: dhcp-detect.py,v 1.3 2006/12/11 23:31:39 glondu Exp $
import sys, os
from threading import Thread
from time import time, sleep
from syslog import *
sys.path.append("/usr/bin")
sys.path.append("/usr/scripts/gestion")
sys.path.append("/usr/scripts/gestion/tools")
# Hack pour scapy
os.environ["HOME"] = "/tmp"
from scapy import Ether, sendp, sniff, BOOTP, IP, UDP
from email_tools import send_email
from locate_mac import trace_machine, info_machine
PIDFILE = "/var/run/dhcp-detect.pid"
# Interface à surveiller
INTERFACE = "crans"
# Adresse MAC
mac = os.popen(r"ifconfig | grep '^%s' | awk '{print $(NF)}'" % INTERFACE).readline().strip()
# Paquet à envoyer pour détecter un DHCP (il a été capturé pour avoir la bonne tête)
tosend = Ether("\xff\xff\xff\xff\xff\xff\x00\x80\xc8\xc9\xab\x01\x08\x00E\x10\x01H\x00\x00\x00\x00@\x11y\x96\x00\x00\x00\x00\xff\xff\xff\xff\x00D\x00C\x014\x9aA\x01\x01\x06\x00\xb2\x87\x01\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x80\xc8\xc9\xab\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00c\x82Sc5\x01\x012\x04R\xe1'67\x07\x01\x1c\x02\x03\x0f\x06\x0c\xff\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00")
# On met à jour ce paquet
tosend.getlayer(Ether).src = mac
tosend.getlayer(IP).chksum = None
tosend.getlayer(UDP).chksum = None
tosend.getlayer(BOOTP).chaddr = ''.join(map(lambda x: chr(int(x,16)),mac.split(":")+['0']*10))
tosend = Ether(tosend.build())
# Tableau associatif "mac" - "mailé pour la dernière fois"
dejavu = {}
def createDaemon():
"""Detach a process from the controlling terminal and run it in the
background as a daemon.
"""
try:
pid = os.fork()
except OSError, e:
raise Exception, "%s [%d]" % (e.strerror, e.errno)
if (pid == 0): # The first child.
os.setsid()
try:
pid = os.fork() # Fork a second child.
except OSError, e:
raise Exception, "%s [%d]" % (e.strerror, e.errno)
if (pid == 0): # The second child.
os.chdir("/")
os.umask(0)
else:
os._exit(0) # Exit parent (the first child) of the second child.
else:
os._exit(0) # Exit parent of the first child.
# Iterate through and close all file descriptors.
for fd in range(0, 3):
try:
os.close(fd)
except OSError: # ERROR, fd wasn't open to begin with (ignored)
pass
os.open("/dev/null", os.O_RDWR) # standard input (0)
os.dup2(0, 1) # standard output (1)
os.dup2(0, 2) # standard error (2)
# Envoi par mail le paquet
def mail(paquet):
mac_pirate = paquet.getlayer(Ether).src
if (mac_pirate in globals()['dejavu'] and (time() - globals()['dejavu'][mac_pirate]) < 60*60):
pass
else:
globals()['dejavu'][mac_pirate] = time()
print "Envoi d'un mail...",
msg = u"""Boujour,
Un DHCP pirate a été découvert sur le réseau. Voici quelques renseignements mineurs à son sujet :
Son adresse Ethernet : %s
Son adresse IP : %s
Son TTL : %d
""" % (mac_pirate, paquet.getlayer(IP).src, paquet.getlayer(IP).ttl)
msg += trace_machine(mac_pirate)
msg += u"\n"
msg += info_machine(mac_pirate)
msg += u"""
Merci de votre attention et à bientôt.
--
dhcp-detect.py
"""
send_email(u"DHCP-detect <disconnect@crans.org>",
u"Disconnect Team <disconnect@crans.org",
u"DHCP pirate",
msg)
print "ok"
# Réception d'une réponse
def recoit(paquet):
# On affiche
print "Réception de : ", paquet.summary()
# On verifie que c'est bien ce qu'on attend
if paquet.getlayer(Ether).dst.upper() == globals()['mac'] and paquet.haslayer(BOOTP) and paquet.getlayer(BOOTP).op == 2 and paquet.getlayer(IP).src != '138.231.136.3':
# DHCP pirate ?
msg = "DHCP pirate ? (%s)" % paquet.getlayer(Ether).src
print msg
syslog(msg)
mail(paquet)
# Envoi du paquet test
def send():
while True:
sleep(60)
print "Envoi de :", tosend.summary()
sendp(tosend, verbose=False)
# Sniffer
def get():
while True:
sleep(1)
# On prend les paquets par 100, sinon, le process grossit beaucoup trop
a = sniff(iface=INTERFACE, filter="port bootpc and ether dst %s" % mac, prn=recoit, count=100)
if __name__ == "__main__":
# On quitte les éventuelles instances démonisées en cours
try:
pid = int(file(PIDFILE).read().strip())
os.kill(pid, 15)
except:
pass
if "-d" in sys.argv:
createDaemon()
file(PIDFILE, "w").write("%d\n" % os.getpid())
else:
print "Le paquet suivant va être envoyé à intervalles réguliers pour tester la présence de DHCP pirates :"
print tosend.summary()
openlog("dhcp-detect", LOG_PID)
syslog("Démarrage de dhcp-detect")
# On démarre le thread qui envoie régulièrement le paquet...
Thread(target=send, name="send").start()
# ...et celui qui sniffe régulièrement la réponse
Thread(target=get, name="get").start()