#! /usr/bin/env python # -*- encoding: iso-8859-15 -*- ## (C) Samuel Krempp 2001 ## krempp@crans.ens-cachan.fr ## Permission to copy, use, modify, sell and ## distribute this software is granted provided this copyright notice appears ## in all copies. This software is provided "as is" without express or implied ## warranty, and with no claim as to its suitability for any purpose. import os,sys,string,re,getopt,time import cPickle # load/dump python objects import socket """ nacct.py : parse the logs+dump of net-acct, then print a summary of the last 24h Usage : nacct.py [-N 20 ] : display the top-20 [-c 1] : sort by the first column. [-n] : display numeric IP (instead of resolved hostnames) [-p nacct.pickle ] : where to put the persistence file [-f net-acct.log] [-f net.acct.log.0] [-u net-acct.dump] [-T = 24 ] : analyse data of the last hours [-T = 2 ] : store data of the last days. [-h host] : prints details for given host, from the *persistent* file only. [-t =current time] : choose current time e.g. : nacct.py -h toto.crans.org -t '2002 12 31 23:59' -T 48 will print details for toto on the 48 hours before given time. """ def isExempt(exempts, src_ip, dst_ip) : is_exempt=0 for r in exempts['dst'] : if r.search(dst_ip) : is_exempt = 1 break if is_exempt ==0 : if exempts['src_dst'].has_key(src_ip) : tmp=exempts['src_dst'][src_ip] for r in tmp : if r.search(dst_ip) : is_exempt=1 break return is_exempt def parseInputUntil ( f, prevline, conns_DB, exempts, end_time) : """ reads lines of the file f _until_ the timestamp is >= end_time data structure : . exempts [ src ] [ dst ] : if exists, count this traffic separately ('exempted' bytes) . conns_DB [ IP ] = [ PURE_upload_bytes, upload_bytes, download_bytes # normal bytes , gPUL, gUL, gDL ] # 'exempted' bytes (i.e. there are 6 counters, 3 for normal bytes, 3 for exempted bytes) optionnally, prev_line is a line to be parsed before processing the file (used because we read next line's timestamps) Returns : (nextline, last_time, got_nothing) . nextLine : similar to prevLine . last_time : a timestamp such that : . all read timeStamps are <= last_time . all unread timeStamps are > last_time in practice, last_time is either the greatest read timeStamp, or (nextline's timestamp) - 1 . got_nothing : true iff the file was completely empty. """ got_nothing = 1 nextline = "" # in case a line needs to be parsed at next call.. last_time = 0 t=0 src_ip="" dst_ip="" size=0 # local variable aliases (optimising lookup..) lsplit = string.split; llong=long; lregLAN=regLAN end_time=repr(end_time) (prev_stime, prev_proto, prev_src_ip, prev_src_port, prev_dst_ip, prev_dst_port, prev_size, pd)\ = ["" ] * 8 prev_is_symmetric = 1 prev_m_src = 0 lineN=0 while(1) : if not prevline : line = f.readline() lineN += 1 else : line = prevline prevline="" if not line : break got_nothing = 0 (stime, proto, src_ip, src_port, dst_ip, dst_port, size, pd) = lsplit(line, '\t', 7) if stime >= end_time : nextline=line # if a whole slice is absent in logs, we need to set last_time here : if last_time =="" : last_time = int(stime) - 1 break else : last_time = stime if 1 : # now really PARSE the line : try: size=llong(size) except ValueError: raise ValueError("INCORRECT size \"%s\" at line %d : %s " % (size, lineN, line) ) # Upload : is_exempt=0 if isExempt(exempts, src_ip, dst_ip) : is_exempt = 3 try: conns_DB[src_ip ][is_exempt +1] += size except KeyError: conns_DB[src_ip ] = [long(0)]*6 conns_DB[src_ip ][is_exempt +1] = long(size) # PURE Upload : is_symmetric = ( prev_src_ip == dst_ip and prev_src_port== dst_port and \ prev_dst_ip == src_ip and prev_dst_port== src_port and \ prev_stime == stime and prev_proto==proto ) if is_symmetric : try : if prev_size > size : conns_DB[prev_src_ip ][ prev_is_exempt + 0] += prev_size else: conns_DB[src_ip ][ is_exempt +0] += size except KeyError: print "proto=%s %s, src_ip=%s %s" % (prev_proto, proto, prev_src_ip, src_ip) else : if prev_is_symmetric == 0 : # previous line has no symetrical transfer, assume PURE upload conns_DB[prev_src_ip ][ prev_is_exempt + 0] += prev_size # Download : #m=lregLAN.search(dst_ip) if 1: dst_is_exempt=0 if isExempt(exempts, dst_ip, src_ip) : dst_is_exempt = 3 try: conns_DB[dst_ip ][dst_is_exempt +2] += size except KeyError: conns_DB[dst_ip ] = [long(0)]*6 conns_DB[dst_ip ][dst_is_exempt +2] = long(size) (prev_stime, prev_proto, prev_src_ip, prev_src_port) = (stime, proto, src_ip, src_port) (prev_dst_ip, prev_dst_port, prev_size) = (dst_ip, dst_port, size) (prev_is_exempt, prev_is_symmetric) = (is_exempt, is_symmetric) return (nextline, int(last_time), got_nothing) def readSlices(inFile, db, exempts, slice0) : """Loop on time slices, and parse the file step by step""" prevLine=""; last_time=0 slice= slice0 while 1 : # loop on time_slice end_time = (slice+1) * timeStep u=db[slice] (prevLine, lTime, got_nothing) = parseInputUntil(inFile, prevLine, db [slice], exempts, end_time) if got_nothing : break if lTime != 0 : last_time = lTime slice = max ( 1+slice, last_time / timeStep) if not db.has_key(slice) : db[slice]={} return (last_time) def readFile(file_info, db, exempts ) : """ reads -completely, partially, or not at all- a list of rotated files. 1/ find the file in the list that is the first that contains new data 2/ seek the position where we stopped and read the file, and the newer ones. file_info fields used here : ['fnames'] : list of the rotated-files for one log, e.g. ['net-acct.log', 'net-acct.log.0'] must be in anti-chronological order (else the script aborts) ['prev_pos'] : offset-position pointig where we stopped reading at previous call (because of log-rotates, we have to guess for which file this offset is..) ['last_time'] : timestamp of the last read entry of this log used to guess which file was opened previously, and which are new. """ if debug : print "VeryBeg: lasttime = %d" % file_info.get('last_time', 777) file_info.setdefault('last_time',0) # 1. # Where did we stop, on the previous execution ? # in the list of files, find which need update => [0, end_of_new[ : times = [0]*len(file_info['fnames']) min_time=0 i=-1 for name in file_info['fnames'] : i += 1 try : inFile=open(name,"rb") s = inFile.readline() inFile.close() except IOError : continue if not s : continue t = int( string.split(s, '\t')[0] ) assert t > 1 if min_time != 0 : assert t <= min_time min_time = t times[i] = t end_of_new=0 if file_info['last_time']==0 : # first time we read those files file_info['last_time'] = min_time-1 end_of_new = len(times) else : # we have archives about those files, see which files are new/updated for t in times : end_of_new += 1 if t <= file_info['last_time'] : break # the ones before are old ones. this is last updated one. FileNames=file_info['fnames'][0:end_of_new] if debug : print "first data at %s(%d), fileTimes= %s" % \ (time.asctime(time.localtime(file_info['last_time'])), file_info['last_time'], times) print "We need to read/update %s" % (FileNames) if file_info['last_time'] < min_time : file_info['prev_pos'] = 0 if file_info.get('reset_if_new', 0) : # erase counters, they are no longer wanted. for k in db.keys() : del db[k] slice0= file_info['last_time'] / timeStep # 2. # everything's ready, loop on files, and parse them. FileNames.reverse() # start with the oldest times=times[0:end_of_new] Files_and_pos= zip( FileNames, [file_info['prev_pos']] + [0]*(end_of_new-1) ) last_time=0; last_pos =0 i=len(FileNames) for (fname, pos) in Files_and_pos : i -= 1 if debug : print " read %s => Seek to pos %d" % (fname, pos ) try: inFile = open(fname, "rb") except IOError: continue inFile.seek(pos) db.setdefault(slice0, {} ) last_time = readSlices(inFile, db, exempts, slice0) if last_time != 0 : # we advanced in this file. slice0= last_time / timeStep elif i>= 1 : # guess an adequate slice0 to start with for next file : slice0= times[i-1]/timeStep last_pos = inFile.tell() assert last_pos >= pos inFile.close() # 3. # Update file_info variables : if 1: if last_time != 0 : assert file_info.get('last_time', 0) <= last_time file_info['last_time'] = last_time if last_pos > 0 : file_info['prev_pos'] = last_pos if debug and file_info.has_key('last_time') : print "VeryLast: lasttime = %d" % file_info['last_time'] def loadPersist() : data = {} try: data = cPickle.load( open(pickleName, "rb") ) except IOError: print "[can not load persistent data. Will need to read all the log-file.]" return data def updateData() : """ structure of data : data['counts'] : the actual counters, split in separate databases : ['dump'] : for bytes read in the dump ['log'] : for bytes read in the log each is a 'conns_DB', that holds one database per timeslice : [] [IP] : 6-uple (see parseInputUntil) data['files'] ['ledump'] : is the file_info for the dump files. ['lelog' ] : is the file_info for the regular log files each 'file_info' DB has following keys : 'fnames', 'prev_pos', 'last_time' (see readFile) 'dbName' (the corresponding key into data['counts']) 'reset_if_new' (optionnal, designed for the dump file) """ data = loadPersist() try: exempts = cPickle.load( open(pickleExemptsName, "rb") ) except IOError: print "[can not load exmpts data. assuming no exempt]" exempts = { 'dst' : [], 'src_dst' : {} } # initialise each database if needed : for k in ['files', 'counts' ]: data.setdefault(k, {} ) Files=data['files'] Files.setdefault('ledump', { 'dbName':'dump', 'fnames': dumpFNames, 'reset_if_new':1 }) Files.setdefault('lelog', {'dbName':'log', 'fnames': logFNames } ) # overwrite the filenames stored in pickle with those from the command-line. Files['ledump'] ['fnames'] = dumpFNames Files['lelog'] ['fnames'] = logFNames for k in Files.keys(): data['counts'].setdefault(Files[k]['dbName'], {} ) for key in data['files'].keys() : file_info = data['files'][key] if debug: print "file_info : %s " % file_info print "Parsing %s into data['counts'][ %s ]" % ( file_info['fnames'], file_info['dbName']) readFile( file_info, data['counts'] [file_info['dbName'] ], exempts ) return data def printCounters(counts, mkHeaders=0) : unit = 1e3 if megas : unit = 1e6 if mkHeaders : return "%9s|%9s|%9s | %10s|%9s|%9s" % ('Pure UL ', 'Upload ', 'Download', 'Exempt PUL', 'Exempt U', 'Exempt D' ) s="%9.3f|%9.3f|%9.3f | %9.3f|%9.3f|%9.3f" % (counts[0]/(unit), counts[1]/(unit), counts[2]/(unit), counts[3]/unit, counts[4]/unit, counts[5]/unit) return s def bilan(DB, dbNames, duree, cur_time, disp_all = 0) : slice0=int( (cur_time-duree) / timeStep ) by_host={} Nslices = {} for db_key in dbNames : Nslices[db_key] = 0 for slice in DB[db_key].keys() : if slice >= slice0 : Nslices[db_key] += 1 for host in DB[db_key][slice].keys() : if disp_all or regLAN.search(host): counts=DB[db_key][slice][host] cur = by_host.setdefault(host, [long(0)] *len(counts) + [host] ) for i in range(len(counts)): cur[i] += counts[i] liste=by_host.values() liste.sort( lambda x, y: -cmp(x[sort_column], y[sort_column] )) # tri décroissant sur le N° champ print " %5.1f h stats since %s. %d hour-slices found " % (duree/3600.0, time.asctime(time.localtime(slice0*timeStep)), max(Nslices.values()) ) print printCounters(0, 1) + " | HOST" print "="*77 for l in liste[0:top10_length] : # Test si le DNS de la machine existe (donc si la machine est inscrite au crans) try: host = socket.gethostbyaddr( l[-1] ) [0] bad = 0 except socket.error : host = l[-1] bad = 1 if not resolve_names : # On veut l'IP host = l[-1] if bad : host = "NoDNS_" + host print printCounters(l)+ (" |%s" % host) def detail_bilan(DB, hostName, IP, duree, cur_time) : slice0 = int( (cur_time-duree) / timeStep ) slice1 = slice0 + int( duree/timeStep) slicePrints={} Nslices = {} db_key = 'log' Nslices[db_key] = 0 for slice in range(slice0, slice1+1) : pref = time.strftime("%Hh%M", time.localtime(slice*timeStep) ) str = " (No record of this time-slice at all)" if slice in DB[db_key].keys() : str = " (No activity for this host in this time-slice)" Nslices[db_key] += 1 if IP in DB[db_key][slice].keys() : str = printCounters( DB[db_key][slice][IP]) slicePrints[slice] = "%s|%s" %(pref,str) print "Comptes par tranches de %ds pour la machine %s" % (timeStep, hostName) print "début : %s" % (time.asctime(time.localtime( slice0 * timeStep) ) ) print ("%5s|" % 'time') + printCounters(0,1) print "="*77 for slice in range(slice0, slice1+1) : l=slicePrints[slice] print l print "Fin : %s" % (time.asctime(time.localtime( -1 + (slice1+1) * timeStep) ) ) def main(cur_time) : data=updateData() bilan(data['counts'], ['log', 'dump'], duree, cur_time, disp_all) # make persistent data as small as possible : del data['counts'][ data['files']['ledump']['dbName'] ] del data['files']['ledump'] cur_t = time.time() del_slices=[] # -> get rid of old slices for slice in data['counts']['log'].keys() : if slice < (cur_t-sduree)/timeStep : del_slices.append(slice) for slice in del_slices : del data['counts']['log'][slice] # get rid of useless extern hosts : for slice in data['counts']['log'].keys() : d=data['counts']['log'][slice] del_hosts=[] for host in d.keys() : m= store_all or regLAN.search(host) # keep extern hosts that were used as big upload targets : download >= 1 Mo if not m and d[host][2]< 1e6 : del_hosts.append( host) for host in del_hosts : del d[host] cPickle.dump(data, open(pickleName,"wb") ) ################# # global vars : # timeStep=3600 # 1h slices ################# optlist, args = getopt.getopt(sys.argv[1:], "dkDsnc:p:f:h::u:L:N:T:t:") lock_name = "/var/lock/nacct.py" # Fichier de lock store_all = 0 # if false, store only hosts matching regLAN disp_all = 0 # if false, display only .. .. sduree = 48*3600 # delete slices when they are that much old duree = 0 # display the stats over this period top10_length = 30 sort_column= 0 # 0 : sort by PURE, 1 : by upload, 2: by download.. up to 6 (cf parseInputUntil) resolve_names = 1 # resolve hostnames logFNames= [] dumpFNames=[] debug=0 megas=1 detail_host="" cur_time=time.time() network="" pickleName="" pickleExemptsName = "" if os.path.isfile(lock_name) : # Ya le lock print "Lock existant (%s)" % lock_name fd = open(lock_name, "r") msg=fd.readlines() fd.close() pid=string.split(msg[0],'\n')[0] msg=string.strip(string.join(msg[1:], '') ) q = os.system("ps -o pid,tty,user,etime,command " +pid) if q==256: print "PID lock no trouvé => delock" try : os.remove(lock_name) except : None else : print "Script lockant en activité, sortie" sys.exit(255) #Locking lock_fd=open(lock_name, "w") lock_comment = "%s" % os.getpid() lock_fd.write(lock_comment) lock_fd.close() for [key, val] in optlist : if key == '-f' : logFNames.append(val) if key == '-u' : dumpFNames.append(val) if key == '-d' : debug = 1 if key == '-D' : disp_all = 1 if key == '-L' : network = val if key == '-h' : detail_host = val if key == '-t' : cur_time = int( time.mktime(time.strptime(val,"%Y %m %d %H:%M")) ) if key == '-N' : top10_length = int(val) if key == '-k' : megas = 0 # use kilos instead of Megs if key == '-p' : pickleName=val if key == '-s' : store_all = 1 if key == '-n' : resolve_names = 0 if key == '-T' : if duree == 0 : duree = int( float(val) * 3600 ) else: sduree = int( float(val) * 3600 * 24 ) if key == '-c' : sort_column = int(val) -1 if duree == 0: duree = 24*3600 if not logFNames : logFNames = ["/var/log/net-acct/net-acct.log", "/var/log/net-acct/net-acct.log.0" ] if not dumpFNames : dumpFNames = ["/var/log/net-acct/dump" ] if not network : network = "^138\.231\.1((3[6-9]|4[0-9]|50|51).*)$" regLAN=re.compile(network) if not pickleName : pickleName="/tmp/nacct.pickle" if not pickleExemptsName : pickleExemptsName="/tmp/exempts.pickle" # launch : if detail_host : data=loadPersist() IP = socket.gethostbyname( detail_host) detail_bilan(data['counts'], detail_host, IP, duree, cur_time) else : data = main(cur_time) # Supression du lock try : os.remove(lock_name) except : None