diff --git a/sip/asterisk.py b/sip/asterisk.py index f76cfa26..dc87d20a 100644 --- a/sip/asterisk.py +++ b/sip/asterisk.py @@ -1,19 +1,14 @@ # -*- coding: utf-8 - # liste d'event http://www.voip-info.org/wiki/view/asterisk+manager+events # liste d'action http://www.voip-info.org/wiki/view/Asterisk+manager+API -import os import sys import time -import errno -import shutil import syslog import socket import base64 -import syslog import psycopg2 import threading import psycopg2.extras -from datetime import datetime if '/usr/scripts' not in sys.path: sys.path.append('/usr/scripts') @@ -22,47 +17,58 @@ import lc_ldap.objets import gestion.secrets_new as secrets + class NullRecv(EnvironmentError): pass - + + class AsteriskError(ValueError): def __init__(self, message, action, params): - self.message=message - self.action=action - self.params=params + self.message = message + self.action = action + self.params = params def __str__(self): return '%s, Action:%s, params:%s' % (self.message, self.action, self.params) + class Profile(object): def __init__(self, sql_params=None, database=None): self.sql_params = sql_params - self.database =database - + self.database = database + def update_pin(self, num, pin): conn = psycopg2.connect(self.sql_params) cur = conn.cursor() - cur.execute("UPDATE %s SET voicemail_password=%%s WHERE num=%%s" % self.database, (pin, num)) + cur.execute( + "UPDATE %s SET voicemail_password=%%s WHERE num=%%s" % self.database, + (pin, num) + ) conn.commit() cur.close() conn.close() def right_to_nums(self, right): - conn=lc_ldap.shortcuts.lc_ldap_readonly() - ret=conn.search(u"(&(droits=%s)(!(chbre=EXT)))" % right) - return [ "1%04d" % adh['aid'][0].value for adh in ret] + conn = lc_ldap.shortcuts.lc_ldap_readonly() + ret = conn.search(u"(&(droits=%s)(!(chbre=EXT)))" % right) + return ["1%04d" % adh['aid'][0].value for adh in ret] def alias_to_num(self, alias): try: - conn=lc_ldap.shortcuts.lc_ldap_readonly() - ret=conn.search(u"(|(uid=%(alias)s)(mailAlias=%(alias)s@crans.org)(canonicalAlias=%(alias)s@crans.org))" % {'alias' : alias}) + conn = lc_ldap.shortcuts.lc_ldap_readonly() + ret = conn.search( + ( + u"(|(uid=%(alias)s)(mailAlias=%(alias)s@crans.org)" + + "(canonicalAlias=%(alias)s@crans.org))" + ) % {'alias': alias} + ) if len(ret) == 1: return "1%04d" % ret[0]['aid'][0].value else: return "NONE" except: return "NONE" - + def num_to_callerid(self, num): try: conn = psycopg2.connect(self.sql_params) @@ -73,22 +79,30 @@ class Profile(object): conn.close() if caller_id == 'full_name' or caller_id == 'both': - conn=lc_ldap.shortcuts.lc_ldap_readonly() - aid=int(num[1:]) - adh=conn.search(u'aid=%s' % aid)[0] - return '%s %s' % (adh['prenom'][0],adh['nom'][0]) + conn = lc_ldap.shortcuts.lc_ldap_readonly() + aid = int(num[1:]) + adh = conn.search(u'aid=%s' % aid)[0] + return '%s %s' % (adh['prenom'][0], adh['nom'][0]) else: return num except: return num + class Sms(object): def __init__(self, sql_params=None, database=None): self.sql_params = sql_params self.database = database - def sms_daemon(self, server,port,user,password, timeout=360): - manager = Manager(user, password, server=server, event=True, auto_connect=False, timeout=timeout) + def sms_daemon(self, server, port, user, password, timeout=360): + manager = Manager( + user, + password, + server=server, + event=True, + auto_connect=False, + timeout=timeout + ) manager.register_events_callback('PeerStatus', self._send_sms) while True: manager.connect() @@ -97,31 +111,42 @@ class Sms(object): manager.process_events() except (socket.error, NullRecv): pass - + def sms_delay(self, src, dst, body, user, body_type='str'): - if not body_type in ["str", "base64"]: + if body_type not in ["str", "base64"]: raise EnvironmentError("body_type sould be 'str' ou 'base64' not %r" % body_type) conn = psycopg2.connect(self.sql_params) cur = conn.cursor() - cur.execute('INSERT INTO %s (date, "from", "to", body, "user") VALUES (NOW(), %%s, %%s, %%s, %%s)' % self.database, (src, dst, base64.b64encode(body).strip() if body_type=='str' else body, user)) + cur.execute( + 'INSERT INTO %s (date, "from", "to", body, "user") VALUES (NOW(), %%s, %%s, %%s, %%s)' % + self.database, + (src, dst, base64.b64encode(body).strip() if body_type == 'str' else body, user) + ) conn.commit() cur.close() conn.close() - + def _send_sms(self, manager, params): - if params['PeerStatus'] in ['Reachable','Registered']: + if params['PeerStatus'] in ['Reachable', 'Registered']: num = params['Peer'].split('/')[1] conn = psycopg2.connect(self.sql_params) cur = conn.cursor(cursor_factory=psycopg2.extras.DictCursor) - cur.execute('SELECT * FROM %s WHERE "user"=%%s ORDER BY "date" ASC' % self.database, (num,)) + cur.execute( + 'SELECT * FROM %s WHERE "user"=%%s ORDER BY "date" ASC' % self.database, + (num,) + ) for sms in cur.fetchall(): try: manager.messageSend(sms['from'], sms['to'], sms['body'], body_type='base64') - syslog.syslog("Message from %s successfully delivered to %s" % (sms['from'], sms['to'])) + syslog.syslog( + "Message from %s successfully delivered to %s" % (sms['from'], sms['to']) + ) cur.execute('DELETE FROM %s WHERE id=%%s' % self.database, (sms['id'],)) conn.commit() - except AsteriskError as error: - syslog.syslog("Message from %s to %s : %s" % (sms['from'], sms['to'], params['Message'])) + except AsteriskError: + syslog.syslog( + "Message from %s to %s : %s" % (sms['from'], sms['to'], params['Message']) + ) cur.close() conn.close() @@ -129,7 +154,7 @@ class Sms(object): if isinstance(src, lc_ldap.objets.proprio): # rajouter @crans.org ne semble pas marcher, pourquoi ? - num="1%04d" % src['aid'][0].value + num = "1%04d" % src['aid'][0].value profile_manager = Profile(self.sql_params, "voip_profile") callerid = profile_manager.num_to_callerid(num) caller = '"%s" ' % (callerid, num) @@ -146,10 +171,16 @@ class Sms(object): ast_manager = Manager('sms', secrets.get('asterisk_sms_passwd')) try: - ast_manager.messageSend(caller, to, msg) + ast_manager.messageSend(caller, to, msg) except AsteriskError as error: if error.message == "Message failed to send.": - self.sms_delay(error.params['from'], error.params['to'], error.params['base64body'], error.params['to'].split(':',1)[1], body_type='base64') + self.sms_delay( + error.params['from'], + error.params['to'], + error.params['base64body'], + error.params['to'].split(':', 1)[1], + body_type='base64' + ) else: raise @@ -157,17 +188,20 @@ class Sms(object): class History(object): def __init__(self, sql_params, database, quota_limit): self.sql_params = sql_params - self.database =database + self.database = database self.quota_limit = quota_limit def add(self, id, src, dst): conn = psycopg2.connect(self.sql_params) cur = conn.cursor() - cur.execute("INSERT INTO %s (uniq_id,src,dst) VALUES (%%s, %%s, %%s)" % self.database, (id, src, dst)) + cur.execute( + "INSERT INTO %s (uniq_id,src,dst) VALUES (%%s, %%s, %%s)" % self.database, + (id, src, dst) + ) conn.commit() cur.close() conn.close() - + def delete(self, id): conn = psycopg2.connect(self.sql_params) cur = conn.cursor() @@ -175,24 +209,37 @@ class History(object): conn.commit() cur.close() conn.close() - + def update(self, id, duration): conn = psycopg2.connect(self.sql_params) cur = conn.cursor() - cur.execute("UPDATE %s SET duration=%%s WHERE uniq_id=%%s" % self.database, (int(duration), id)) + cur.execute( + "UPDATE %s SET duration=%%s WHERE uniq_id=%%s" % self.database, (int(duration), id) + ) conn.commit() cur.close() conn.close() - + def quota(self, number): allowed = False try: conn = psycopg2.connect(self.sql_params) cur = conn.cursor() - cur.execute("SELECT count(DISTINCT dst) FROM %s WHERE date>='%s' AND dst LIKE '+%%'" % (self.database, time.strftime('%Y-%m-01'))) + cur.execute( + "SELECT count(DISTINCT dst) FROM %s WHERE date>='%s' AND dst LIKE '+%%'" % ( + self.database, + time.strftime('%Y-%m-01') + ) + ) outgoing_call_num = cur.fetchall()[0][0] if outgoing_call_num >= self.quota_limit: - cur.execute("SELECT count(dst)>0 FROM %s WHERE date>'%s' AND dst=%%s" % (self.database, time.strftime('%Y-%m-01')), (number,)) + cur.execute( + "SELECT count(dst)>0 FROM %s WHERE date>'%s' AND dst=%%s" % ( + self.database, + time.strftime('%Y-%m-01') + ), + (number,) + ) allowed = cur.fetchall()[0][0] else: allowed = True @@ -200,26 +247,31 @@ class History(object): conn.close() except: pass - sys.stdout.write('ALLOWED' if allowed else 'DENY') + sys.stdout.write('ALLOWED' if allowed else 'DENY') + class Manager(object): - - def __init__(self, username, password, timeout=10, server='idefisk.adm.crans.org', port=5038, debug=False, event=False, auto_connect=True, agi=None, wait_fullybooted=True): + + def __init__( + self, username, password, timeout=10, server='idefisk.adm.crans.org', + port=5038, debug=False, event=False, auto_connect=True, agi=None, + wait_fullybooted=True + ): self.timeout = timeout self.server = server self.port = port self.socket = None self.debug = debug self.event = event - self._pending_action=[] - self._response={} - self._event=[] + self._pending_action = [] + self._response = {} + self._event = [] self._event_callback = {} self._toread = "" self._agi = agi self.fullybooted = False self.wait_fullybooted = wait_fullybooted - + self.username = username self.password = password @@ -227,7 +279,7 @@ class Manager(object): self.register_events_callback('FullyBooted', self._FullyBooted) if auto_connect: self.connect() - + def agi(self, *args, **kwargs): if not self._agi: self._agi = AGI(*args, manager=self, **kwargs) @@ -235,7 +287,7 @@ class Manager(object): def _FullyBooted(self, manager, params): manager.fullybooted = True - + def _recv(self): data = self.socket.recv(1024) if len(data) == 0: @@ -246,102 +298,107 @@ class Manager(object): if self.debug: print msg + "\n" self._parse_msg(msg) - self._toread=self._toread[-1] - + self._toread = self._toread[-1] + def _send(self, str): if self.debug: print str - self.socket.send ('%s\r\n' % (str)) - + self.socket.send('%s\r\n' % (str)) + def _parse_msg(self, msg): msg = msg.strip().split('\r\n') type, value = msg[0].split(': ', 1) - params = dict([line.split(': ', 1) if len(line.split(': ', 1))==2 else (line.split(': ', 1)[0], '') for line in msg[1:]]) + params = dict([ + line.split(': ', 1) if len(line.split(': ', 1)) == 2 else (line.split(': ', 1)[0], '') + for line in msg[1:] + ]) handler = getattr(self, "_do_"+type, None) handler(value, params) - + def _do_Response(self, status, params): id = params['ActionID'] del(params['ActionID']) self._response[id] = (status, params) - + def _do_Event(self, type, params): self._event.append((type, params)) - def _gen_actionID(self): - id=time.time() + id = time.time() while id in self._pending_action: - id=id+1 + id = id+1 return str(id) - + def _action(self, name, params): - self._send ('ACTION: %s' % name.upper()) + self._send('ACTION: %s' % name.upper()) for (key, value) in params.items(): - self._send ('%s: %s' % (key.upper(), value)) - id=self._gen_actionID() - self._send ('ActionID: %s' % id) + self._send('%s: %s' % (key.upper(), value)) + id = self._gen_actionID() + self._send('ActionID: %s' % id) self._pending_action.append(id) - self._send ('') - - while not id in self._response.keys(): + self._send('') + + while id not in self._response.keys(): self._recv() - + response = self._response[id] del(self._response[id]) self._pending_action.remove(id) if response[0] == 'Error': raise AsteriskError(response[1]['Message'], name, params) return response - + def action(self, name, **params): return self._action(name, params) - + def connect(self): - sock = socket.socket ( socket.AF_INET, socket.SOCK_STREAM ) + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.settimeout(self.timeout) - sock.connect ( ( self.server, self.port ) ) - self.socket=sock + sock.connect((self.server, self.port)) + self.socket = sock msg = self.socket.recv(1024).strip().split('\r\n', 1) self.version = msg[0] - if len(msg)>1: + if len(msg) > 1: self._toread += msg[1] self.login() if self.wait_fullybooted: while not self.fullybooted: self.process_events() self.events(self.event) - + def register_events_callback(self, event, func): self._event_callback[event] = self._event_callback.get(event, []) + [func] - + def process_events(self): if not self._event: - try: self._recv() - except socket.timeout: pass + try: + self._recv() + except socket.timeout: + pass for event in self._event: (type, params) = event for func in self._event_callback.get(type, []): func(self, params) - self._event=[] - - + self._event = [] + def login(self, username=None, secret=None): """Login Manager""" - if username is None: username = self.username - if secret is None: secret = self.password + if username is None: + username = self.username + if secret is None: + secret = self.password return self.action('login', username=username, secret=secret) - + def logoff(self): """ Logoff Manager""" response = self.action('logoff') if response[0] == 'Goodbye': self.fullybooted = False - + def events(self, param): """Control Event Flow - params should be a boolean or a list among system,call,log,verbose,command,agent,user to select - which flags events should have to be sent.""" + params should be a boolean or a list among system,call,log,verbose,command,agent,user + to select which flags events should have to be sent.""" if isinstance(param, list): eventmask = ','.join(param) elif isinstance(param, bool): @@ -349,7 +406,7 @@ class Manager(object): else: raise EnvironmentError("%r should be a list or a bool" % param) return self.action('events', eventmask=eventmask) - + def hangup(self, channel): """Hangup a channel""" return self.action('hangup', channel=channel) @@ -357,35 +414,33 @@ class Manager(object): def reload(self, module): """Synopsis: Send a reload event Privilege: system,config,all""" - privilege = ['system', 'config', 'all'] return self.action('reload', module=module) - + def messageSend(self, src, dst, body, body_type="str"): - if not body_type in ["str", "base64"]: + if body_type not in ["str", "base64"]: raise EnvironmentError("body_type sould be 'str' ou 'base64' not %r" % body_type) if body_type == "str": body = base64.b64encode(body).strip() - return self._action('messageSend', {'to':dst, 'from':src, 'base64body':body}) - + return self._action('messageSend', {'to': dst, 'from': src, 'base64body': body}) class AGI(object): """voir http://www.voip-info.org/wiki/view/Asterisk+AGI""" def __init__(self, read=sys.stdin, write=sys.stdout, manager=None, **params): self.debug = True - self.read=read - self.write=write - self.params=params - self._manager=manager - self._lock=False + self.read = read + self.write = write + self.params = params + self._manager = manager + self._lock = False self._locklock = threading.Lock() self._read_params() def manager(self, *args, **kwargs): - if not self._manager: - self._manager=Manager(*args, agi=self, **kwargs) - return self._manager + if not self._manager: + self._manager = Manager(*args, agi=self, **kwargs) + return self._manager def __getitem__(self, key): try: @@ -404,39 +459,42 @@ class AGI(object): return result def _read_params(self): - line=self.read.readline() + line = self.read.readline() while line.strip(): syslog.syslog(line) if line.startswith('agi_'): - (key, data) = line[4:].split(':',1) - self.params[key.strip()]=data.strip() + (key, data) = line[4:].split(':', 1) + self.params[key.strip()] = data.strip() line = self.read.readline() def command(self, name, *params): with self._locklock: if self._lock: - raise AsteriskError("Cannot lauch AGI command %s, an other command is processing" % name, name, params) + raise AsteriskError( + "Cannot lauch AGI command %s, an other command is processing" % name, + name, + params + ) else: self._lock = True - cmd=' '.join([name] + ["%s" % p for p in params]) + cmd = ' '.join([name] + ["%s" % p for p in params]) if self.debug: syslog.syslog("%s\n" % cmd) self.write.write("%s\n" % cmd) self.write.flush() - line=self.read.readline() + line = self.read.readline() if self.debug: syslog.syslog(line) - lines=[line] - code=int(line[0:3]) - type=line[3] + lines = [line] + code = int(line[0:3]) + type = line[3] if type == '-': while not "%s End of " % code not in line: - line=self.read.readline() + line = self.read.readline() if self.debug: syslog.syslog("%s\n" % cmd) lines.append(line) - - self._lock=False + self._lock = False if code != 200: raise AsteriskError((code, '\n'.join(lines)), name, params) @@ -444,27 +502,33 @@ class AGI(object): (result, data) = lines[0][4:].split(' ', 1) except ValueError: result = lines[0][4:] - data="" - result=result.split('=',1)[1].strip() - return (int(result), data) - + data = "" + result = result.split('=', 1)[1].strip() + return (int(result), data) + def hangup(self): self.command("hangup") + def set_callerid(self, callerid): self.command("set_callerid", callerid) + def noop(self, str): self.command("noop", str) + def launch_app(self, app, *params): self.command("exec", app, *params) + def dial(self, to): self.launch_app("dial", to) + def answer(self): self.launch_app("Answer") + def goto(self, arg): self.launch_app("goto", arg) -### TODO +# TODO class FastAGI(object): - def __init__(self, bind, port, *args,**kwargs): + def __init__(self, bind, port, *args, **kwargs): pass