#!/usr/bin/env python # -*- coding: utf-8 - # liste d'event http://www.voip-info.org/wiki/view/asterisk+manager+events # liste d'action http://www.voip-info.org/wiki/view/Asterisk+manager+API import os import sys import time import errno import shutil import syslog import socket import base64 import psycopg2 import psycopg2.extras from datetime import datetime sys.path.append('/usr/scripts/') import lc_ldap.shortcuts import lc_ldap.objets import gestion.secrets_new as secrets class NullRecv(EnvironmentError): pass class AsteriskError(ValueError): def __init__(self, message, action, params): self.message=message self.action=action self.params=params def __str__(self): return '%s, Action:%s, params:%s' % (self.message, self.action, self.params) class Profile(object): def __init__(self, sql_params=None, database=None): self.sql_params = sql_params self.database =database def update_pin(self, num, pin): conn = psycopg2.connect(self.sql_params) cur = conn.cursor() cur.execute("UPDATE %s SET voicemail_password=%%s WHERE num=%%s" % self.database, (pin, num)) conn.commit() cur.close() conn.close() def right_to_nums(self, right): conn=lc_ldap.shortcuts.lc_ldap_readonly() ret=conn.search(u"(&(droits=%s)(!(chbre=EXT)))" % right) return [ "1%04d" % adh['aid'][0].value for adh in ret] def alias_to_num(self, alias): try: conn=lc_ldap.shortcuts.lc_ldap_readonly() ret=conn.search(u"(|(uid=%(alias)s)(mailAlias=%(alias)s@crans.org)(canonicalAlias=%(alias)s@crans.org))" % {'alias' : alias}) if len(ret) == 1: return "1%04d" % ret[0]['aid'][0].value else: return "NONE" except: return "NONE" def num_to_callerid(self, num): try: conn = psycopg2.connect(self.sql_params) cur = conn.cursor() cur.execute("SELECT caller_id from %s WHERE num=%%s" % self.database, (num,)) caller_id = cur.fetchall()[0][0] cur.close() conn.close() if caller_id == 'full_name' or caller_id == 'both': conn=lc_ldap.shortcuts.lc_ldap_readonly() aid=int(num[1:]) adh=conn.search(u'aid=%s' % aid)[0] return '%s %s' % (adh['prenom'][0],adh['nom'][0]) else: return num except: return num class Sms(object): def __init__(self, sql_params=None, database=None): self.sql_params = sql_params self.database = database def sms_daemon(self, server,port,user,password, timeout=360): manager = Manager(user, password, server=server, event=True, auto_connect=False, timeout=timeout) manager.register_events_callback('PeerStatus', self._send_sms) while True: manager.connect() try: while True: manager.process_events() except (socket.error, NullRecv): pass def sms_delay(self, src, dst, body, user, body_type='str'): if not body_type in ["str", "base64"]: raise EnvironmentError("body_type sould be 'str' ou 'base64' not %r" % body_type) conn = psycopg2.connect(self.sql_params) cur = conn.cursor() cur.execute('INSERT INTO %s (date, "from", "to", body, "user") VALUES (NOW(), %%s, %%s, %%s, %%s)' % self.database, (src, dst, base64.encodestring(body).strip() if body_type=='str' else body, user)) conn.commit() cur.close() conn.close() def _send_sms(self, manager, params): if params['PeerStatus'] in ['Reachable','Registered']: num = params['Peer'].split('/')[1] conn = psycopg2.connect(self.sql_params) cur = conn.cursor(cursor_factory=psycopg2.extras.DictCursor) cur.execute('SELECT * FROM %s WHERE "user"=%%s ORDER BY "date" ASC' % self.database, (num,)) for sms in cur.fetchall(): try: manager.messageSend(sms['from'], sms['to'], sms['body'], body_type='base64') syslog.syslog("Message from %s successfully delivered to %s" % (sms['from'], sms['to'])) cur.execute('DELETE FROM %s WHERE id=%%s' % self.database, (sms['id'],)) conn.commit() except AsteriskError as error: syslog.syslog("Message from %s to %s : %s" % (sms['from'], sms['to'], params['Message'])) cur.close() conn.close() def send(self, dst, msg, src=None): if isinstance(src, lc_ldap.objets.proprio): # rajouter @crans.org ne semble pas marcher, pourquoi ? num="1%04d" % src['aid'][0].value profile_manager = Profile(self.sql_params, "voip_profile") callerid = profile_manager.num_to_callerid(num) caller = '"%s" ' % (callerid, num) elif src: caller = src else: caller = "ServiceCenter" if isinstance(dst, lc_ldap.objets.proprio): to = "sip:1%04d" % dst['aid'][0].value else: to = dst ast_manager = Manager('sms', secrets.get('asterisk_sms_passwd')) try: ast_manager.messageSend(caller, to, msg) except AsteriskError as error: if error.message == "Message failed to send.": self.sms_delay(error.params['from'], error.params['to'], error.params['base64body'], error.params['to'].split(':',1)[1], body_type='base64') else: raise class History(object): def __init__(self, sql_params, database, quota_limit): self.sql_params = sql_params self.database =database self.quota_limit = quota_limit def add(self, id, src, dst): conn = psycopg2.connect(self.sql_params) cur = conn.cursor() cur.execute("INSERT INTO %s (uniq_id,src,dst) VALUES (%%s, %%s, %%s)" % self.database, (id, src, dst)) conn.commit() cur.close() conn.close() def delete(self, id): conn = psycopg2.connect(self.sql_params) cur = conn.cursor() cur.execute("DELETE FROM %s WHERE uniq_id=%%s" % self.database, (id,)) conn.commit() cur.close() conn.close() def update(self, id, duration): conn = psycopg2.connect(self.sql_params) cur = conn.cursor() cur.execute("UPDATE %s SET duration=%%s WHERE uniq_id=%%s" % self.database, (int(duration), id)) conn.commit() cur.close() conn.close() def quota(self, number): allowed = False try: conn = psycopg2.connect(self.sql_params) cur = conn.cursor() cur.execute("SELECT count(DISTINCT dst) FROM %s WHERE date>='%s' AND dst LIKE '+%%'" % (self.database, time.strftime('%Y-%m-01'))) outgoing_call_num = cur.fetchall()[0][0] if outgoing_call_num >= self.quota_limit: cur.execute("SELECT count(dst)>0 FROM %s WHERE date>'%s' AND dst=%%s" % (self.database, time.strftime('%Y-%m-01')), (number,)) allowed = cur.fetchall()[0][0] else: allowed = True cur.close() conn.close() except: pass sys.stdout.write('ALLOWED' if allowed else 'DENY') class Manager(object): def __init__(self, username, password, timeout=10, server='asterisk.adm.crans.org', port=5038, debug=False, event=False, auto_connect=True): self.timeout = timeout self.server = server self.port = port self.socket = None self.debug = debug self.event = event self._pending_action=[] self._response={} self._event=[] self._event_callback = {} self._toread = "" self.fullybooted = False self.username = username self.password = password self.register_events_callback('FullyBooted', self._FullyBooted) if auto_connect: self.connect() def _FullyBooted(self, manager, params): manager.fullybooted = True def _recv(self): data = self.socket.recv(1024) if len(data) == 0: raise NullRecv("Got null response") self._toread += data self._toread = self._toread.split('\r\n\r\n') for msg in self._toread[:-1]: if self.debug: print msg + "\n" self._parse_msg(msg) self._toread=self._toread[-1] def _send(self, str): if self.debug: print str self.socket.send ('%s\r\n' % (str)) def _parse_msg(self, msg): msg = msg.strip().split('\r\n') type, value = msg[0].split(': ', 1) params = dict([line.split(': ', 1) for line in msg[1:]]) handler = getattr(self, "_do_"+type, None) handler(value, params) def _do_Response(self, status, params): id = params['ActionID'] del(params['ActionID']) self._response[id] = (status, params) def _do_Event(self, type, params): self._event.append((type, params)) def _gen_actionID(self): id=time.time() while id in self._pending_action: id=id+1 return str(id) def _action(self, name, params): self._send ('ACTION: %s' % name.upper()) for (key, value) in params.items(): self._send ('%s: %s' % (key.upper(), value)) id=self._gen_actionID() self._send ('ActionID: %s' % id) self._pending_action.append(id) self._send ('') while not id in self._response.keys(): self._recv() response = self._response[id] del(self._response[id]) self._pending_action.remove(id) if response[0] == 'Error': raise AsteriskError(response[1]['Message'], name, params) return response def action(self, name, **params): return self._action(name, params) def connect(self): sock = socket.socket ( socket.AF_INET, socket.SOCK_STREAM ) sock.settimeout(self.timeout) sock.connect ( ( self.server, self.port ) ) self.socket=sock msg = self.socket.recv(1024).strip().split('\r\n', 1) self.version = msg[0] if len(msg)>1: self._toread += msg[1] self.login() while not self.fullybooted: self.process_events() self.events(self.event) def register_events_callback(self, event, func): self._event_callback[event] = self._event_callback.get(event, []) + [func] def process_events(self): if not self._event: try: self._recv() except socket.timeout: pass for event in self._event: (type, params) = event for func in self._event_callback.get(type, []): func(self, params) self._event=[] def login(self, username=None, secret=None): """Login Manager""" if username is None: username = self.username if secret is None: secret = self.password return self.action('login', username=username, secret=secret) def logoff(self): """ Logoff Manager""" response = self.action('logoff') if response[0] == 'Goodbye': self.fullybooted = False def events(self, param): """Control Event Flow params should be a boolean or a list among system,call,log,verbose,command,agent,user to select which flags events should have to be sent.""" if isinstance(param, list): eventmask = ','.join(param) elif isinstance(param, bool): eventmask = 'on' if param else 'off' else: raise EnvironmentError("%r should be a list or a bool" % param) return self.action('events', eventmask=eventmask) def reload(self, module): """Synopsis: Send a reload event Privilege: system,config,all""" privilege = ['system', 'config', 'all'] return self.action('reload', module=module) def messageSend(self, src, dst, body, body_type="str"): if not body_type in ["str", "base64"]: raise EnvironmentError("body_type sould be 'str' ou 'base64' not %r" % body_type) if body_type == "str": body = base64.b64encode(body).strip() return self._action('messageSend', {'to':dst, 'from':src, 'base64body':body})