#!/usr/bin/env python # -*- coding: utf-8 -*- # The authors of this code are # # Permission to use, copy, and modify this software with or without fee # is hereby granted, provided that this entire notice is included in # all source code copies of any software which is or includes a copy or # modification of this software. # # THIS SOFTWARE IS BEING PROVIDED "AS IS", WITHOUT ANY EXPRSS OR # IMPLIED WARRANTY. IN PARTICULAR, NONE OF THE AUTHORS MAKES ANY # REPRESENTATION OR WARRANTY OF ANY KIND CONCERNING THE # MERCHANTABILITY OF THIS SOFTWARE OR ITS FITNESS FOR ANY PARTICULAR # PURPOSE. import sys if '/usr/scripts' not in sys.path: sys.path.append('/usr/scripts') import commands import os IPSET_PATH = '/sbin/ipset' # Avant jessie: ipset était dans /usr/sbin if not os.path.exists(IPSET_PATH): IPSET_PATH = '/usr' + IPSET_PATH class IpsetError(Exception): # Gestion des erreurs d'ipset def __init__(self, cmd, err_code, output): self.cmd = cmd self.err_code = err_code self.output = output def __str__(self): return "%s\n status : %s\n %s" % (self.cmd, self.err_code, self.output) class Ipset(object): ipset = IPSET_PATH def __str__(self): return self.set def __init__(self, set, type, typeopt=''): self.set = set self.type = type self.typeopt = typeopt try: self.create() except IpsetError as error: if error.err_code != 256: raise elif not "already exists" in error.output: raise pass def call(self, cmd, arg=''): """Appel système à ipset""" cmd_line = "%s %s %s %s" % (self.ipset, cmd, self.set, arg) status, output = commands.getstatusoutput(cmd_line) if status: raise IpsetError(cmd_line, status, output) return output def create(self, opt=''): self.call("create", "%s %s" % (self.type, self.typeopt)) def add(self, arg): self.call("add", arg) def list(self): output = self.call("list").splitlines() list = [] for line in output[6:]: if line == 'Bindings:': break list.append(line) return list def delete(self, ip): """Delete an IP""" self.call("del", ip) def restore(self, rules): """ restore le set courrant""" rules_str = self.restore_format(rules) str = "%s\nCOMMIT\n" % rules_str path = '/tmp/ipset_%s' % self.set f = open(path, 'w+') f.write(str) f.close() try: self.flush() except IpsetError as error: sys.stderr.write("%s\n" % error) cmd = "cat %s | %s -R" % (path, self.ipset) status, output = commands.getstatusoutput(cmd) if status: raise IpsetError(cmd, status, output) return output def flush(self): self.call("flush") def destroy(self): self.call("destroy") def restore_format(self, rules): return '\n'.join(["add %s %s" % (self.set, data) for data in rules])