113 lines
3.1 KiB
Python
113 lines
3.1 KiB
Python
#!/usr/bin/env python
|
|
# -*- coding: utf-8 -*-
|
|
# The authors of this code are
|
|
#
|
|
# Permission to use, copy, and modify this software with or without fee
|
|
# is hereby granted, provided that this entire notice is included in
|
|
# all source code copies of any software which is or includes a copy or
|
|
# modification of this software.
|
|
#
|
|
# THIS SOFTWARE IS BEING PROVIDED "AS IS", WITHOUT ANY EXPRSS OR
|
|
# IMPLIED WARRANTY. IN PARTICULAR, NONE OF THE AUTHORS MAKES ANY
|
|
# REPRESENTATION OR WARRANTY OF ANY KIND CONCERNING THE
|
|
# MERCHANTABILITY OF THIS SOFTWARE OR ITS FITNESS FOR ANY PARTICULAR
|
|
# PURPOSE.
|
|
|
|
|
|
|
|
import sys
|
|
if '/usr/scripts' not in sys.path:
|
|
sys.path.append('/usr/scripts')
|
|
|
|
import commands
|
|
import os
|
|
|
|
IPSET_PATH = '/sbin/ipset'
|
|
|
|
# Avant jessie: ipset était dans /usr/sbin
|
|
if not os.path.exists(IPSET_PATH):
|
|
IPSET_PATH = '/usr' + IPSET_PATH
|
|
|
|
class IpsetError(Exception):
|
|
# Gestion des erreurs d'ipset
|
|
def __init__(self, cmd, err_code, output):
|
|
self.cmd = cmd
|
|
self.err_code = err_code
|
|
self.output = output
|
|
def __str__(self):
|
|
return "%s\n status : %s\n %s" % (self.cmd, self.err_code, self.output)
|
|
|
|
class Ipset(object):
|
|
ipset = IPSET_PATH
|
|
|
|
def __str__(self):
|
|
return self.set
|
|
|
|
def __init__(self, set, type, typeopt=''):
|
|
self.set = set
|
|
self.type = type
|
|
self.typeopt = typeopt
|
|
try:
|
|
self.create()
|
|
except IpsetError as error:
|
|
if error.err_code != 256:
|
|
raise
|
|
elif not "already exists" in error.output:
|
|
raise
|
|
pass
|
|
|
|
def call(self, cmd, arg=''):
|
|
"""Appel système à ipset"""
|
|
cmd_line = "%s %s %s %s" % (self.ipset, cmd, self.set, arg)
|
|
status, output = commands.getstatusoutput(cmd_line)
|
|
if status:
|
|
raise IpsetError(cmd_line, status, output)
|
|
return output
|
|
|
|
def create(self, opt=''):
|
|
self.call("create", "%s %s" % (self.type, self.typeopt))
|
|
|
|
def add(self, arg):
|
|
self.call("add", arg)
|
|
|
|
def list(self):
|
|
output = self.call("list").splitlines()
|
|
list = []
|
|
for line in output[6:]:
|
|
if line == 'Bindings:':
|
|
break
|
|
list.append(line)
|
|
return list
|
|
|
|
def delete(self, ip):
|
|
"""Delete an IP"""
|
|
self.call("del", ip)
|
|
|
|
def restore(self, rules):
|
|
""" restore le set courrant"""
|
|
rules_str = self.restore_format(rules)
|
|
str = "%s\nCOMMIT\n" % rules_str
|
|
path = '/tmp/ipset_%s' % self.set
|
|
f = open(path, 'w+')
|
|
f.write(str)
|
|
f.close()
|
|
try:
|
|
self.flush()
|
|
except IpsetError as error:
|
|
sys.stderr.write("%s\n" % error)
|
|
|
|
cmd = "cat %s | %s -R" % (path, self.ipset)
|
|
status, output = commands.getstatusoutput(cmd)
|
|
if status:
|
|
raise IpsetError(cmd, status, output)
|
|
return output
|
|
|
|
def flush(self):
|
|
self.call("flush")
|
|
|
|
def destroy(self):
|
|
self.call("destroy")
|
|
|
|
def restore_format(self, rules):
|
|
return '\n'.join(["add %s %s" % (self.set, data) for data in rules])
|
|
|