#!/usr/bin/env python # -*- coding: utf-8 -*- # The authors of this code are # # Permission to use, copy, and modify this software with or without fee # is hereby granted, provided that this entire notice is included in # all source code copies of any software which is or includes a copy or # modification of this software. # # THIS SOFTWARE IS BEING PROVIDED "AS IS", WITHOUT ANY EXPRSS OR # IMPLIED WARRANTY. IN PARTICULAR, NONE OF THE AUTHORS MAKES ANY # REPRESENTATION OR WARRANTY OF ANY KIND CONCERNING THE # MERCHANTABILITY OF THIS SOFTWARE OR ITS FITNESS FOR ANY PARTICULAR # PURPOSE. import sys sys.path.append('/usr/scripts/gestion') import commands import os class IpsetError(Exception): # Gestion des erreurs d'ipset def __init__(self,cmd,err_code,output): self.cmd=cmd self.err_code=err_code self.output=output def __str__(self): return "%s\n status : %s\n %s" % (self.cmd,self.err_code,self.output) class Ipset(object): ipset="/usr/sbin/ipset" def __str__(self): return self.set def __init__(self,set,type,typeopt=''): self.set=set self.type=type self.typeopt=typeopt self.squeeze = os.uname()[2] < '3' try: self.create() except IpsetError as error: if error.err_code != 256: raise elif not "already exists" in error.output: raise pass def call(self,cmd,arg=''): """Appel système à ipset""" cmd_line="%s %s %s %s" % (self.ipset,cmd,self.set,arg) status,output=commands.getstatusoutput(cmd_line) if status: raise IpsetError(cmd_line,status,output) return output def create(self,opt=''): self.call("-N","%s %s" % (self.type, self.typeopt)) def add(self,arg): self.call("-A",arg) def list(self): output=self.call("-L").splitlines() list=[] for line in output[6:]: if line=='Bindings:': break list.append(line) return list def delete(self,ip): """Delete an IP""" self.call("-D",ip) def restore(self,rules): """ restore le set courrant""" rules_str=self.restore_format(rules) if self.squeeze: create_str="-N %s %s %s" % (self.set,self.type,self.typeopt) str="%s\n%s\nCOMMIT\n" % (create_str,rules_str) else: str="%s\nCOMMIT\n" % rules_str path='/tmp/ipset_%s' % self.set f=open(path, 'w+') f.write(str) f.close() try: self.flush() if self.squeeze: self.destroy() except IpsetError as error: sys.stderr.write("%s\n" % error) cmd="cat %s | %s -R" % (path,self.ipset) status,output=commands.getstatusoutput(cmd) if status: raise IpsetError(cmd,status,output) return output def flush(self): self.call("-F") def destroy(self): self.call("-X") def restore_format(self,rules): return '\n'.join(["-A %s %s" % (self.set,data) for data in rules])