114 lines
3.1 KiB
Python
114 lines
3.1 KiB
Python
#!/usr/bin/env python
|
|
# -*- coding: utf-8 -*-
|
|
# The authors of this code are
|
|
#
|
|
# Permission to use, copy, and modify this software with or without fee
|
|
# is hereby granted, provided that this entire notice is included in
|
|
# all source code copies of any software which is or includes a copy or
|
|
# modification of this software.
|
|
#
|
|
# THIS SOFTWARE IS BEING PROVIDED "AS IS", WITHOUT ANY EXPRSS OR
|
|
# IMPLIED WARRANTY. IN PARTICULAR, NONE OF THE AUTHORS MAKES ANY
|
|
# REPRESENTATION OR WARRANTY OF ANY KIND CONCERNING THE
|
|
# MERCHANTABILITY OF THIS SOFTWARE OR ITS FITNESS FOR ANY PARTICULAR
|
|
# PURPOSE.
|
|
|
|
|
|
|
|
import sys
|
|
sys.path.append('/usr/scripts/gestion')
|
|
|
|
import commands
|
|
import lock
|
|
import os
|
|
|
|
|
|
|
|
class IpsetError(Exception):
|
|
# Gestion des erreurs d'ipset
|
|
def __init__(self,cmd,err_code,output):
|
|
self.cmd=cmd
|
|
self.err_code=err_code
|
|
self.output=output
|
|
def __str__(self):
|
|
return "%s\n status : %s\n %s" % (self.cmd,self.err_code,self.output)
|
|
|
|
class Ipset(object):
|
|
ipset="/usr/sbin/ipset"
|
|
|
|
def __str__(self):
|
|
return self.set
|
|
|
|
def __init__(self,set,type,typeopt=''):
|
|
self.set=set
|
|
self.type=type
|
|
self.typeopt=typeopt
|
|
self.squeeze = os.uname()[2] < '3'
|
|
try:
|
|
self.create()
|
|
except IpsetError as error:
|
|
if error.err_code != 256:
|
|
raise
|
|
elif not "already exists" in error.output:
|
|
raise
|
|
pass
|
|
|
|
def call(self,cmd,arg=''):
|
|
"""Appel système à ipset"""
|
|
cmd_line="%s %s %s %s" % (self.ipset,cmd,self.set,arg)
|
|
status,output=commands.getstatusoutput(cmd_line)
|
|
if status:
|
|
raise IpsetError(cmd_line,status,output)
|
|
return output
|
|
|
|
def create(self,opt=''):
|
|
self.call("-N","%s %s" % (self.type, self.typeopt))
|
|
|
|
def add(self,arg):
|
|
self.call("-A",arg)
|
|
|
|
def list(self):
|
|
output=self.call("-L").splitlines()
|
|
list=[]
|
|
for line in output[6:]:
|
|
if line=='Bindings:':
|
|
break
|
|
list.append(line)
|
|
return list
|
|
|
|
def delete(self,ip):
|
|
"""Delete an IP"""
|
|
self.call("-D",ip)
|
|
|
|
def restore(self,rules):
|
|
""" restore le set courrant"""
|
|
rules_str=self.restore_format(rules)
|
|
if self.squeeze:
|
|
create_str="-N %s %s %s" % (self.set,self.type,self.typeopt)
|
|
str="%s\n%s\nCOMMIT\n" % (create_str,rules_str)
|
|
else:
|
|
str="%s\nCOMMIT\n" % rules_str
|
|
path='/tmp/ipset_%s' % self.set
|
|
f=open(path, 'w+')
|
|
f.write(str)
|
|
f.close()
|
|
try:
|
|
self.flush()
|
|
if self.squeeze:
|
|
self.destroy()
|
|
except IpsetError as error: sys.stderr.write("%s\n" % error)
|
|
cmd="cat %s | %s -R" % (path,self.ipset)
|
|
status,output=commands.getstatusoutput(cmd)
|
|
if status:
|
|
raise IpsetError(cmd,status,output)
|
|
return output
|
|
|
|
def flush(self):
|
|
self.call("-F")
|
|
|
|
def destroy(self):
|
|
self.call("-X")
|
|
|
|
def restore_format(self,rules):
|
|
return '\n'.join(["-A %s %s" % (self.set,data) for data in rules])
|
|
|