
* Il y a bien une méthode, mais elle régénère TOUT l'ipset * Il y en a bien une autre, mais elle REFAIT des requêtes LDAP, et elle n'est pas utilisée.
240 lines
10 KiB
Python
240 lines
10 KiB
Python
#!/usr/bin/env python
|
|
# -*- coding: utf-8 -*-
|
|
import os
|
|
import sys
|
|
import socket
|
|
|
|
import utils
|
|
from utils import pretty_print, anim, OK, cprint
|
|
|
|
from gestion import config
|
|
from gestion.gen_confs.ipset import IpsetError, Ipset
|
|
|
|
import config.firewall
|
|
|
|
#: Nom de la machine exécutant le script
|
|
hostname = socket.gethostname()
|
|
|
|
#: Association des interfaces de ``hostname``
|
|
dev = hostname in config.firewall.dev.keys() and config.firewall.dev[hostname] or {}
|
|
|
|
|
|
class firewall(utils.firewall_tools) :
|
|
"""Classe de base du pare-feu implémentant l'association mac-ip (pour les machines filaires) et les blacklists hard"""
|
|
|
|
def __init__(self):
|
|
super(firewall, self).__init__()
|
|
|
|
self.reloadable = {
|
|
'blacklist_hard' : self.blacklist_hard,
|
|
'test_mac_ip' : self.test_mac_ip,
|
|
'blacklists' : self.blacklists,
|
|
}
|
|
|
|
self.use_ipset = [self.blacklist_hard, self.test_mac_ip, self.blacklists]
|
|
|
|
self.ipset['mac_ip']={
|
|
'adh' : Ipset("MAC-IP-ADH","macipmap","--from 138.231.136.0 --to 138.231.151.255"),
|
|
'adm' : Ipset("MAC-IP-ADM","macipmap","--from 10.231.136.0 --to 10.231.136.255"),
|
|
'app' : Ipset("MAC-IP-APP","macipmap","--from 10.2.9.0 --to 10.2.9.255"),
|
|
}
|
|
|
|
self.ipset['blacklist']={
|
|
'hard' : Ipset("BLACKLIST-HARD","ipmap","--from 138.231.136.0 --to 138.231.151.255"),
|
|
}
|
|
|
|
|
|
def blacklist_maj(self, ips):
|
|
"""Met à jours les blacklists pour les ip présentent dans la liste ``ips``"""
|
|
self.blacklist_hard_maj(ips)
|
|
|
|
def blacklists(self, table=None, fill_ipset=False, apply=False):
|
|
"""Recharge toutes les blacklists"""
|
|
self.blacklist_hard(table=table, fill_ipset=fill_ipset, apply=apply)
|
|
|
|
|
|
def raw_table(self):
|
|
"""Génère les règles pour la table ``raw`` et remplis les chaines de la table"""
|
|
table = 'raw'
|
|
return
|
|
|
|
def mangle_table(self):
|
|
"""Génère les règles pour la table ``mangle`` et remplis les chaines de la table"""
|
|
table = 'mangle'
|
|
return
|
|
|
|
def filter_table(self):
|
|
"""Génère les règles pour la table ``filter`` et remplis les chaines de la table"""
|
|
table = 'filter'
|
|
|
|
mac_ip_chain = self.test_mac_ip(table, fill_ipset=True)
|
|
blacklist_hard_chain = self.blacklist_hard(table, fill_ipset=True)
|
|
|
|
chain = 'INPUT'
|
|
self.add(table, chain, '-i lo -j ACCEPT')
|
|
self.add(table, chain, '-p icmp -j ACCEPT')
|
|
self.add(table, chain, '-m state --state RELATED,ESTABLISHED -j ACCEPT')
|
|
for net in config.NETs['all'] + config.NETs['adm'] + config.NETs['personnel-ens']:
|
|
self.add(table, chain, '-s %s -j %s' % (net, mac_ip_chain))
|
|
self.add(table, chain, '-j %s' % blacklist_hard_chain)
|
|
|
|
chain = 'FORWARD'
|
|
self.add(table, chain, '-j REJECT')
|
|
|
|
return
|
|
|
|
def nat_table(self):
|
|
"""Génère les règles pour la table ``nat`` et remplis les chaines de la table"""
|
|
table = 'nat'
|
|
return
|
|
|
|
|
|
def blacklist_hard_maj(self, ip_list):
|
|
"""Met à jour les blacklists hard, est appelée par :py:func:`blacklist_maj`"""
|
|
self.blacklist_hard(fill_ipset=True)
|
|
# for ip in ip_list:
|
|
# machine = self.conn.search(u"ipHostNumber=%s" % ip)
|
|
# # Est-ce qu'il y a des blacklists hard parmis les blacklists de la machine
|
|
# if machine and set([bl['type'] for bl in machine[0].blacklist_actif() ]).intersection(config.blacklist_sanctions):
|
|
# try: self.ipset['blacklist']['hard'].add(ip)
|
|
# except IpsetError: pass
|
|
# else:
|
|
# try: self.ipset['blacklist']['hard'].delete(ip)
|
|
# except IpsetError: pass
|
|
|
|
def blacklist_hard(self, table=None, fill_ipset=False, apply=False):
|
|
"""Génère la chaine ``BLACKLIST_HARD``.
|
|
Si ``fill_ipset`` est à ``True``, remplis l'ipset ``BLACKLIST-HARD``.
|
|
Si ``apply`` est à True, applique directement les règles"""
|
|
chain = 'BLACKLIST_HARD'
|
|
|
|
if fill_ipset:
|
|
# On récupère la liste de toutes les ips blacklistés hard
|
|
bl_hard_ips = self.blacklisted_ips(config.blacklist_sanctions, config.NETs['all'])
|
|
anim('\tRestoration de l\'ipset %s' % self.ipset['blacklist']['hard'])
|
|
self.ipset['blacklist']['hard'].restore(bl_hard_ips)
|
|
print OK
|
|
|
|
if table == 'filter':
|
|
pretty_print(table, chain)
|
|
self.add(table, chain, '-m set --match-set %s src -j REJECT' % self.ipset['blacklist']['hard'] )
|
|
self.add(table, chain, '-m set --match-set %s dst -j REJECT' % self.ipset['blacklist']['hard'] )
|
|
print OK
|
|
|
|
if apply:
|
|
self.apply(table, chain)
|
|
return chain
|
|
|
|
def mac_ip_append(self, mac, ip):
|
|
machine = {'macAddress':[mac], 'ipHostNumber': [ip]}
|
|
self.test_mac_ip_dispatch(lambda set, data: self.ipset['mac_ip'][set].add(data), machine)
|
|
|
|
def test_mac_ip_dispatch(self, func, machine):
|
|
"""Détermine à quel set de mac-ip appliquer la fonction ``func`` (add, delete, append, ...)"""
|
|
ips = machine['ipHostNumber']
|
|
if '<automatique>' in machine['macAddress'] :
|
|
return
|
|
for ip in ips:
|
|
# Si la machines est sur le réseau des adhérents
|
|
if utils.AddrInNet(str(ip), config.NETs['wifi']):
|
|
# Les machines wifi sont vues à travers komaz
|
|
func('adh', "%s,%s" % (ip, config.mac_komaz))
|
|
elif utils.AddrInNet(str(ip), config.NETs['fil']):
|
|
func('adh', "%s,%s" % (ip, machine['macAddress'][0]))
|
|
# Si la machine est sur le réseau admin
|
|
elif utils.AddrInNet(str(ip), config.NETs['adm']):
|
|
func('adm', "%s,%s" % (ip, machine['macAddress'][0]))
|
|
|
|
def test_mac_ip(self, table=None, fill_ipset=False, apply=False):
|
|
"""Génère la chaine ``TEST_MAC-IP``.
|
|
Si ``fill_ipset`` est à ``True``, remplis les ipsets ``MAC-IP-ADH``, ``MAC-IP-ADM``, ``MAC-IP-ADM``.
|
|
Si ``apply`` est à True, applique directement les règles"""
|
|
chain = 'TEST_MAC-IP'
|
|
|
|
if fill_ipset:
|
|
anim('\tRestoration des ipsets %s' % ', '.join(self.ipset['mac_ip'].keys()))
|
|
rules={
|
|
'adh':[],
|
|
'adm':[],
|
|
'app':[],
|
|
}
|
|
for machine in self.machines():
|
|
self.test_mac_ip_dispatch(lambda set, data: rules[set].append(data), machine)
|
|
|
|
for set,rules in rules.items():
|
|
self.ipset['mac_ip'][set].restore(rules)
|
|
print OK
|
|
|
|
if table == 'filter':
|
|
pretty_print(table, chain)
|
|
|
|
for key in ['accueil', 'isolement', ]:
|
|
for net in config.NETs[key]:
|
|
self.add(table, chain, '-s %s -j RETURN' % net)
|
|
for key in self.ipset['mac_ip'].keys():
|
|
self.add(table, chain, '-m set --match-set %s src,src -j RETURN' % self.ipset['mac_ip'][key])
|
|
|
|
# Proxy ARP de Komaz et Titanic pour OVH
|
|
ip_soyouz = self.conn.search(u"host=soyouz.adm.crans.org")[0]['ipHostNumber'][0]
|
|
self.add(table, chain, '-m mac -s %s --mac-source %s -j RETURN' % (ip_soyouz, config.mac_komaz))
|
|
self.add(table, chain, '-m mac -s %s --mac-source %s -j RETURN' % (ip_soyouz, config.mac_titanic))
|
|
|
|
self.add(table, chain, '-j REJECT')
|
|
print OK
|
|
|
|
if apply:
|
|
self.apply(table, chain)
|
|
return chain
|
|
|
|
def mac_ip_maj(self, ip_list):
|
|
"""Met à jour la correspondance mac-ip"""
|
|
anim('\tActualisation de la correspondance mac-ipv4')
|
|
for ip in ip_list:
|
|
machine = self.conn.search(u"ipHostNumber=%s" % ip)
|
|
if machine:
|
|
try: self.test_mac_ip_dispatch(lambda set, data: self.ipset['mac_ip'][set].delete(data.split(',',1)[0]), {'ipHostNumber' : [ip], 'macAddress':[''] })
|
|
except IpsetError: pass
|
|
self.test_mac_ip_dispatch(lambda set, data: self.ipset['mac_ip'][set].add(data), machine[0])
|
|
else:
|
|
try: self.test_mac_ip_dispatch(lambda set, data: self.ipset['mac_ip'][set].delete(data.split(',',1)[0]), {'ipHostNumber' : [ip], 'macAddress':[''] })
|
|
except IpsetError: pass
|
|
print OK
|
|
|
|
|
|
|
|
class firewall_routeur(firewall):
|
|
"""Associe mac-ip pour les machines voyant plusieurs réseaux (wifi, filaire, personnel, ...)"""
|
|
def test_mac_ip_dispatch(self, func, machine):
|
|
"""Détermine à quel set de mac-ip appliquer la fonction func (add, delete, append, ...)"""
|
|
ips = machine['ipHostNumber']
|
|
if '<automatique>' in machine['macAddress'] :
|
|
return
|
|
for ip in ips:
|
|
# Si la machines est sur le réseau des adhérents
|
|
if utils.AddrInNet(str(ip), config.NETs['wifi']):
|
|
func('adh', "%s,%s" % (ip, machine['macAddress'][0]))
|
|
elif utils.AddrInNet(str(ip), config.NETs['fil']):
|
|
func('adh', "%s,%s" % (ip, machine['macAddress'][0]))
|
|
# Si la machine est sur le réseau admin
|
|
elif utils.AddrInNet(str(ip), config.NETs['adm']):
|
|
func('adm', "%s,%s" % (ip, machine['macAddress'][0]))
|
|
# Si la machine est sur le réseaux des appartements de l'ENS
|
|
elif utils.AddrInNet(str(ip), config.NETs['personnel-ens']):
|
|
func('app', "%s,%s" % (ip, machine['macAddress'][0]))
|
|
|
|
class firewall_wifionly(firewall):
|
|
"""Associe mac-ip pour les machines wifi only : les machines filaires sont vues avec la mac de komaz"""
|
|
def test_mac_ip_dispatch(self, func, machine):
|
|
"""Détermine à quel set de mac-ip appliquer la fonction func (add, delete, append, ...)"""
|
|
ips = machine['ipHostNumber']
|
|
if '<automatique>' in machine['macAddress'] :
|
|
return
|
|
for ip in ips:
|
|
# Si la machines est sur le réseau des adhérents
|
|
if utils.AddrInNet(str(ip), config.NETs['wifi']):
|
|
func('adh', "%s,%s" % (ip, machine['macAddress'][0]))
|
|
elif utils.AddrInNet(str(ip), config.NETs['fil']):
|
|
func('adh', "%s,%s" % (ip, config.mac_komaz))
|
|
# Si la machine est sur le réseau admin
|
|
elif utils.AddrInNet(str(ip), config.NETs['adm']):
|
|
func('adm', "%s,%s" % (ip, machine['macAddress'][0]))
|