scripts/gestion/gen_confs/ipset.py

113 lines
3.1 KiB
Python

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# The authors of this code are
#
# Permission to use, copy, and modify this software with or without fee
# is hereby granted, provided that this entire notice is included in
# all source code copies of any software which is or includes a copy or
# modification of this software.
#
# THIS SOFTWARE IS BEING PROVIDED "AS IS", WITHOUT ANY EXPRSS OR
# IMPLIED WARRANTY. IN PARTICULAR, NONE OF THE AUTHORS MAKES ANY
# REPRESENTATION OR WARRANTY OF ANY KIND CONCERNING THE
# MERCHANTABILITY OF THIS SOFTWARE OR ITS FITNESS FOR ANY PARTICULAR
# PURPOSE.
import sys
if '/usr/scripts' not in sys.path:
sys.path.append('/usr/scripts')
import commands
import os
IPSET_PATH = '/sbin/ipset'
# Avant jessie: ipset était dans /usr/sbin
if not os.path.exists(IPSET_PATH):
IPSET_PATH = '/usr' + IPSET_PATH
class IpsetError(Exception):
# Gestion des erreurs d'ipset
def __init__(self, cmd, err_code, output):
self.cmd = cmd
self.err_code = err_code
self.output = output
def __str__(self):
return "%s\n status : %s\n %s" % (self.cmd, self.err_code, self.output)
class Ipset(object):
ipset = IPSET_PATH
def __str__(self):
return self.set
def __init__(self, set, type, typeopt=''):
self.set = set
self.type = type
self.typeopt = typeopt
try:
self.create()
except IpsetError as error:
if error.err_code != 256:
raise
elif not "already exists" in error.output:
raise
pass
def call(self, cmd, arg=''):
"""Appel système à ipset"""
cmd_line = "%s %s %s %s" % (self.ipset, cmd, self.set, arg)
status, output = commands.getstatusoutput(cmd_line)
if status:
raise IpsetError(cmd_line, status, output)
return output
def create(self, opt=''):
self.call("create", "%s %s" % (self.type, self.typeopt))
def add(self, arg):
self.call("add", arg)
def list(self):
output = self.call("list").splitlines()
list = []
for line in output[6:]:
if line == 'Bindings:':
break
list.append(line)
return list
def delete(self, ip):
"""Delete an IP"""
self.call("del", ip)
def restore(self, rules):
""" restore le set courrant"""
rules_str = self.restore_format(rules)
str = "%s\nCOMMIT\n" % rules_str
path = '/tmp/ipset_%s' % self.set
f = open(path, 'w+')
f.write(str)
f.close()
try:
self.flush()
except IpsetError as error:
sys.stderr.write("%s\n" % error)
cmd = "cat %s | %s -R" % (path, self.ipset)
status, output = commands.getstatusoutput(cmd)
if status:
raise IpsetError(cmd, status, output)
return output
def flush(self):
self.call("flush")
def destroy(self):
self.call("destroy")
def restore_format(self, rules):
return '\n'.join(["add %s %s" % (self.set, data) for data in rules])