scripts/gestion/gen_confs/firewall4/base.py
Valentin Samir 2d2cbf2d9f [firewall4] Séparation en plusieurs fichiers
En gros, un par pare feu
2013-11-08 20:02:16 +01:00

233 lines
10 KiB
Python

#!/usr/bin/env python
# -*- coding: utf-8 -*-
import os
import sys
import socket
import netaddr
import utils
from utils import pretty_print, anim, OK, cprint
from gestion import config
from gestion.gen_confs.ipset import IpsetError, Ipset
import config.firewall
#: Nom de la machine exécutant le script
hostname = socket.gethostname()
hostname='routeur'
#: Association des interfaces de ``hostname``
dev = hostname in config.firewall.dev.keys() and config.firewall.dev[hostname] or {}
class firewall(utils.firewall_tools) :
"""Classe de base du pare-feu implémentant l'association mac-ip (pour les machines filaires) et les blacklists hard"""
def __init__(self):
super(firewall, self).__init__()
self.reloadable = {
'blacklist_hard' : self.blacklist_hard,
'test_mac_ip' : self.test_mac_ip,
}
self.use_ipset = [self.blacklist_hard, self.test_mac_ip]
self.ipset['mac_ip']={
'adh' : Ipset("MAC-IP-ADH","macipmap","--from 138.231.136.0 --to 138.231.151.255"),
'adm' : Ipset("MAC-IP-ADM","macipmap","--from 10.231.136.0 --to 10.231.136.255"),
'app' : Ipset("MAC-IP-APP","macipmap","--from 10.2.9.0 --to 10.2.9.255"),
}
self.ipset['blacklist']={
'hard' : Ipset("BLACKLIST-HARD","ipmap","--from 138.231.136.0 --to 138.231.151.255"),
}
def blacklist_maj(self, ips):
"""Met à jours les blacklists pour les ip présentent dans la liste ``ips``"""
self.blacklist_hard_maj(ips)
def raw_table(self):
"""Génère les règles pour la table ``raw`` et remplis les chaines de la table"""
table = 'raw'
return
def mangle_table(self):
"""Génère les règles pour la table ``mangle`` et remplis les chaines de la table"""
table = 'mangle'
return
def filter_table(self):
"""Génère les règles pour la table ``filter`` et remplis les chaines de la table"""
table = 'filter'
mac_ip_chain = self.test_mac_ip(table, fill_ipset=True)
blacklist_hard_chain = self.blacklist_hard(table, fill_ipset=True)
chain = 'INPUT'
self.add(table, chain, '-i lo -j ACCEPT')
self.add(table, chain, '-p icmp -j ACCEPT')
self.add(table, chain, '-m state --state RELATED,ESTABLISHED -j ACCEPT')
for net in config.NETs['all'] + config.NETs['adm'] + config.NETs['personnel-ens']:
self.add(table, chain, '-s %s -j %s' % (net, mac_ip_chain))
self.add(table, chain, '-j %s' % blacklist_hard_chain)
chain = 'FORWARD'
self.add(table, chain, '-j REJECT')
return
def nat_table(self):
"""Génère les règles pour la table ``nat`` et remplis les chaines de la table"""
table = 'nat'
return
def blacklist_hard_maj(self, ip_list):
"""Met à jour les blacklists hard, est appelée par :py:func:`blacklist_maj`"""
self.blacklist_hard(fill_ipset=True)
# for ip in ip_list:
# machine = self.conn.search(u"ipHostNumber=%s" % ip)
# # Est-ce qu'il y a des blacklists hard parmis les blacklists de la machine
# if machine and set([bl.value['type'] for bl in machine[0].blacklist_actif() ]).intersection(config.blacklist_sanctions):
# try: self.ipset['blacklist']['hard'].add(ip)
# except IpsetError: pass
# else:
# try: self.ipset['blacklist']['hard'].delete(ip)
# except IpsetError: pass
def blacklist_hard(self, table=None, fill_ipset=False, apply=False):
"""Génère la chaine ``BLACKLIST_HARD``.
Si ``fill_ipset`` est à ``True``, remplis l'ipset ``BLACKLIST-HARD``.
Si ``apply`` est à True, applique directement les règles"""
chain = 'BLACKLIST_HARD'
if fill_ipset:
anim('\tRestoration de l\'ipset %s' % self.ipset['blacklist']['hard'])
# On récupère la liste de toutes les ips blacklistés hard
bl_hard_ips = set(
str(ip) for ips in
[
machine['ipHostNumber'] for machine in self.blacklisted_machines() if machine['ipHostNumber'] and reduce(lambda x,y: x or y, ( ip.value in netaddr.IPNetwork(n) for n in config.NETs['all'] for ip in machine['ipHostNumber']))
if set([bl.value['type'] for bl in machine.blacklist_actif() ]).intersection(config.blacklist_sanctions)
]
for ip in ips
)
self.ipset['blacklist']['hard'].restore(bl_hard_ips)
print OK
if table == 'filter':
pretty_print(table, chain)
self.add(table, chain, '-m set --match-set %s src -j REJECT' % self.ipset['blacklist']['hard'] )
self.add(table, chain, '-m set --match-set %s dst -j REJECT' % self.ipset['blacklist']['hard'] )
print OK
if apply:
self.apply(table, chain)
return chain
def test_mac_ip_dispatch(self, func, machine):
"""Détermine à quel set de mac-ip appliquer la fonction ``func`` (add, delete, append, ...)"""
ips = machine['ipHostNumber']
for ip in ips:
# Si la machines est sur le réseau des adhérents
if utils.AddrInNet(str(ip), config.NETs['wifi']):
# Les machines wifi sont vues à travers komaz
func('adh', "%s,%s" % (ip, config.mac_komaz))
elif utils.AddrInNet(str(ip), config.NETs['fil']):
func('adh', "%s,%s" % (ip, machine['macAddress'][0]))
# Si la machine est sur le réseau admin
elif utils.AddrInNet(str(ip), config.NETs['adm']):
func('adm', "%s,%s" % (ip, machine['macAddress'][0]))
def test_mac_ip(self, table=None, fill_ipset=False, apply=False):
"""Génère la chaine ``TEST_MAC-IP``.
Si ``fill_ipset`` est à ``True``, remplis les ipsets ``MAC-IP-ADH``, ``MAC-IP-ADM``, ``MAC-IP-ADM``.
Si ``apply`` est à True, applique directement les règles"""
chain = 'TEST_MAC-IP'
if fill_ipset:
anim('\tRestoration des ipsets %s' % ', '.join(self.ipset['mac_ip'].keys()))
rules={
'adh':[],
'adm':[],
'app':[],
}
for machine in self.machines():
self.test_mac_ip_dispatch(lambda set, data: rules[set].append(data), machine)
for set,rules in rules.items():
self.ipset['mac_ip'][set].restore(rules)
print OK
if table == 'filter':
pretty_print(table, chain)
for key in ['accueil', 'isolement', ]:
for net in config.NETs[key]:
self.add(table, chain, '-s %s -j RETURN' % net)
for key in self.ipset['mac_ip'].keys():
self.add(table, chain, '-m set --match-set %s src,src -j RETURN' % self.ipset['mac_ip'][key])
# Proxy ARP de Komaz et Titanic pour OVH
ip_ovh = self.conn.search(u"host=ovh.adm.crans.org")[0]['ipHostNumber'][0]
self.add(table, chain, '-m mac -s %s --mac-source %s -j RETURN' % (ip_ovh, config.mac_komaz))
self.add(table, chain, '-m mac -s %s --mac-source %s -j RETURN' % (ip_ovh, config.mac_titanic))
self.add(table, chain, '-j REJECT')
print OK
if apply:
self.apply(table, chain)
return chain
def mac_ip_maj(self, ip_list):
"""Met à jour la correspondance mac-ip"""
anim('\tActualisation de la correspondance mac-ipv4')
for ip in ip_list:
machine = self.conn.search(u"ipHostNumber=%s" % ip)
if machine:
try: self.test_mac_ip_dispatch(lambda set, data: self.ipset['mac_ip'][set].delete(data.split(',',1)[0]), {'ipHostNumber' : [ip], 'macAddress':[''] })
except IpsetError: pass
self.test_mac_ip_dispatch(lambda set, data: self.ipset['mac_ip'][set].add(data), machine[0])
else:
try: self.test_mac_ip_dispatch(lambda set, data: self.ipset['mac_ip'][set].delete(data.split(',',1)[0]), {'ipHostNumber' : [ip], 'macAddress':[''] })
except IpsetError: pass
print OK
class firewall_routeur(firewall):
"""Associe mac-ip pour les machines voyant plusieurs réseaux (wifi, filaire, personnel, ...)"""
def test_mac_ip_dispatch(self, func, machine):
"""Détermine à quel set de mac-ip appliquer la fonction func (add, delete, append, ...)"""
ips = machine['ipHostNumber']
for ip in ips:
# Si la machines est sur le réseau des adhérents
if utils.AddrInNet(str(ip), config.NETs['wifi']):
func('adh', "%s,%s" % (ip, machine['macAddress'][0]))
elif utils.AddrInNet(str(ip), config.NETs['fil']):
func('adh', "%s,%s" % (ip, machine['macAddress'][0]))
# Si la machine est sur le réseau admin
elif utils.AddrInNet(str(ip), config.NETs['adm']):
func('adm', "%s,%s" % (ip, machine['macAddress'][0]))
# Si la machine est sur le réseaux des appartements de l'ENS
elif utils.AddrInNet(str(ip), config.NETs['personnel-ens']):
func('app', "%s,%s" % (ip, machine['macAddress'][0]))
class firewall_wifionly(firewall):
"""Associe mac-ip pour les machines wifi only : les machines filaires sont vues avec la mac de komaz"""
def test_mac_ip_dispatch(self, func, machine):
"""Détermine à quel set de mac-ip appliquer la fonction func (add, delete, append, ...)"""
ips = machine['ipHostNumber']
for ip in ips:
# Si la machines est sur le réseau des adhérents
if utils.AddrInNet(str(ip), config.NETs['wifi']):
func('adh', "%s,%s" % (ip, machine['macAddress'][0]))
elif utils.AddrInNet(str(ip), config.NETs['fil']):
func('adh', "%s,%s" % (ip, config.mac_komaz))
# Si la machine est sur le réseau admin
elif utils.AddrInNet(str(ip), config.NETs['adm']):
func('adm', "%s,%s" % (ip, machine['macAddress'][0]))