310 lines
11 KiB
Python
310 lines
11 KiB
Python
#!/usr/bin/env python
|
|
# -*- coding: utf-8 -
|
|
# liste d'event http://www.voip-info.org/wiki/view/asterisk+manager+events
|
|
# liste d'action http://www.voip-info.org/wiki/view/Asterisk+manager+API
|
|
import os
|
|
import sys
|
|
import time
|
|
import errno
|
|
import shutil
|
|
import syslog
|
|
import socket
|
|
import base64
|
|
import psycopg2
|
|
import psycopg2.extras
|
|
from datetime import datetime
|
|
|
|
sys.path.append('/usr/scripts/')
|
|
import lc_ldap.shortcuts
|
|
|
|
class NullRecv(EnvironmentError):
|
|
pass
|
|
|
|
class AsteriskError(ValueError):
|
|
pass
|
|
|
|
class Profile(object):
|
|
def __init__(self, sql_params=None, database=None):
|
|
self.sql_params = sql_params
|
|
self.database =database
|
|
|
|
def update_pin(self, num, pin):
|
|
conn = psycopg2.connect(self.sql_params)
|
|
cur = conn.cursor()
|
|
cur.execute("UPDATE %s SET voicemail_password=%%s WHERE num=%%s" % self.database, (pin, num))
|
|
conn.commit()
|
|
cur.close()
|
|
conn.close()
|
|
|
|
def alias_to_num(self, alias):
|
|
try:
|
|
conn=lc_ldap.shortcuts.lc_ldap_readonly()
|
|
ret=conn.search(u"(|(uid=%(alias)s)(mailAlias=%(alias)s@crans.org)(canonicalAlias=%(alias)s@crans.org))" % {'alias' : alias})
|
|
if len(ret) == 1:
|
|
return "1" + str(ret[0]['aid'][0])
|
|
else:
|
|
return "NONE"
|
|
except:
|
|
return "NONE"
|
|
|
|
def num_to_callerid(self, num):
|
|
try:
|
|
conn = psycopg2.connect(self.sql_params)
|
|
cur = conn.cursor()
|
|
cur.execute("SELECT caller_id from %s WHERE num=%%s" % self.database, (num,))
|
|
caller_id = cur.fetchall()[0][0]
|
|
cur.close()
|
|
conn.close()
|
|
|
|
if caller_id == 'full_name' or caller_id == 'both':
|
|
conn=lc_ldap.shortcuts.lc_ldap_readonly()
|
|
aid=int(num[1:])
|
|
adh=conn.search(u'aid=%s' % aid)[0]
|
|
return '%s %s' % (adh['prenom'][0],adh['nom'][0])
|
|
else:
|
|
return num
|
|
except:
|
|
return num
|
|
|
|
class Sms(object):
|
|
def __init__(self, sql_params=None, database=None):
|
|
self.sql_params = sql_params
|
|
self.database = database
|
|
|
|
def sms_daemon(self, server,port,user,password, timeout=360):
|
|
manager = Manager(user, password, server=server, event=True, auto_connect=False, timeout=timeout)
|
|
manager.register_events_callback('PeerStatus', self._send_sms)
|
|
while True:
|
|
manager.connect()
|
|
try:
|
|
while True:
|
|
manager.process_events()
|
|
except (socket.error, NullRecv):
|
|
pass
|
|
|
|
def sms_delay(self, src, dst, body, user, body_type='str'):
|
|
if not body_type in ["str", "base64"]:
|
|
raise EnvironmentError("body_type sould be 'str' ou 'base64' not %r" % body_type)
|
|
conn = psycopg2.connect(self.sql_params)
|
|
cur = conn.cursor()
|
|
cur.execute('INSERT INTO %s (date, "from", "to", body, "user") VALUES (NOW(), %%s, %%s, %%s, %%s)' % self.database, (src, dst, base64.encodestring(body).strip() if body_type=='str' else body, user))
|
|
conn.commit()
|
|
cur.close()
|
|
conn.close()
|
|
|
|
def _send_sms(self, manager, params):
|
|
if params['PeerStatus'] in ['Reachable','Registered']:
|
|
num = params['Peer'].split('/')[1]
|
|
conn = psycopg2.connect(self.sql_params)
|
|
cur = conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
|
|
cur.execute('SELECT * FROM %s WHERE "user"=%%s' % self.database, (num,))
|
|
for sms in cur.fetchall():
|
|
status, params = manager.messageSend(sms['from'], sms['to'], sms['body'], body_type='base64')
|
|
if status == 'Success':
|
|
syslog.syslog("Message from %s successfully delivered to %s" % (sms['from'], sms['to']))
|
|
cur.execute('DELETE FROM %s WHERE id=%%s' % self.database, (sms['id'],))
|
|
conn.commit()
|
|
elif status == 'Error':
|
|
syslog.syslog("Message from %s to %s : %s" % (sms['from'], sms['to'], params['Message']))
|
|
cur.close()
|
|
conn.close()
|
|
|
|
class History(object):
|
|
def __init__(self, sql_params, database, quota_limit):
|
|
self.sql_params = sql_params
|
|
self.database =database
|
|
self.quota_limit = quota_limit
|
|
|
|
def add(self, id, src, dst):
|
|
conn = psycopg2.connect(self.sql_params)
|
|
cur = conn.cursor()
|
|
cur.execute("INSERT INTO %s (uniq_id,src,dst) VALUES (%%s, %%s, %%s)" % self.database, (id, src, dst))
|
|
conn.commit()
|
|
cur.close()
|
|
conn.close()
|
|
|
|
def delete(self, id):
|
|
conn = psycopg2.connect(self.sql_params)
|
|
cur = conn.cursor()
|
|
cur.execute("DELETE FROM %s WHERE uniq_id=%%s" % self.database, (id,))
|
|
conn.commit()
|
|
cur.close()
|
|
conn.close()
|
|
|
|
def update(self, id, duration):
|
|
conn = psycopg2.connect(self.sql_params)
|
|
cur = conn.cursor()
|
|
cur.execute("UPDATE %s SET duration=%%s WHERE uniq_id=%%s" % self.database, (int(duration), id))
|
|
conn.commit()
|
|
cur.close()
|
|
conn.close()
|
|
|
|
def quota(self, number):
|
|
allowed = False
|
|
try:
|
|
conn = psycopg2.connect(self.sql_params)
|
|
cur = conn.cursor()
|
|
cur.execute("SELECT count(DISTINCT dst) FROM %s WHERE date>='%s' AND dst LIKE '+%%'" % (self.database, time.strftime('%Y-%m-01')))
|
|
outgoing_call_num = cur.fetchall()[0][0]
|
|
if outgoing_call_num >= self.quota_limit:
|
|
cur.execute("SELECT count(dst)>0 FROM %s WHERE date>'%s' AND dst=%%s" % (self.database, time.strftime('%Y-%m-01')), (number,))
|
|
allowed = cur.fetchall()[0][0]
|
|
else:
|
|
allowed = True
|
|
cur.close()
|
|
conn.close()
|
|
except:
|
|
pass
|
|
sys.stdout.write('ALLOWED' if allowed else 'DENY')
|
|
|
|
class Manager(object):
|
|
|
|
def __init__(self, username, password, timeout=10, server='asterisk.adm.crans.org', port=5038, debug=False, event=False, auto_connect=True):
|
|
self.timeout = timeout
|
|
self.server = server
|
|
self.port = port
|
|
self.socket = None
|
|
self.debug = debug
|
|
self.event = event
|
|
self._pending_action=[]
|
|
self._response={}
|
|
self._event=[]
|
|
self._event_callback = {}
|
|
self._toread = ""
|
|
self.fullybooted = False
|
|
|
|
self.username = username
|
|
self.password = password
|
|
|
|
self.register_events_callback('FullyBooted', self._FullyBooted)
|
|
if auto_connect:
|
|
self.connect()
|
|
|
|
def _FullyBooted(self, manager, params):
|
|
manager.fullybooted = True
|
|
|
|
def _recv(self):
|
|
data = self.socket.recv(1024)
|
|
if len(data) == 0:
|
|
raise NullRecv("Got null response")
|
|
self._toread += data
|
|
self._toread = self._toread.split('\r\n\r\n')
|
|
for msg in self._toread[:-1]:
|
|
if self.debug:
|
|
print msg + "\n"
|
|
self._parse_msg(msg)
|
|
self._toread=self._toread[-1]
|
|
|
|
def _send(self, str):
|
|
if self.debug:
|
|
print str
|
|
self.socket.send ('%s\r\n' % (str))
|
|
|
|
def _parse_msg(self, msg):
|
|
msg = msg.strip().split('\r\n')
|
|
type, value = msg[0].split(': ', 1)
|
|
params = dict([line.split(': ', 1) for line in msg[1:]])
|
|
handler = getattr(self, "_do_"+type, None)
|
|
handler(value, params)
|
|
|
|
def _do_Response(self, status, params):
|
|
id = params['ActionID']
|
|
del(params['ActionID'])
|
|
self._response[id] = (status, params)
|
|
|
|
def _do_Event(self, type, params):
|
|
self._event.append((type, params))
|
|
|
|
|
|
def _gen_actionID(self):
|
|
id=time.time()
|
|
while id in self._pending_action:
|
|
id=id+1
|
|
return str(id)
|
|
|
|
def _action(self, name, params):
|
|
self._send ('ACTION: %s' % name.upper())
|
|
for (key, value) in params.items():
|
|
self._send ('%s: %s' % (key.upper(), value))
|
|
id=self._gen_actionID()
|
|
self._send ('ActionID: %s' % id)
|
|
self._pending_action.append(id)
|
|
self._send ('')
|
|
|
|
while not id in self._response.keys():
|
|
self._recv()
|
|
|
|
response = self._response[id]
|
|
del(self._response[id])
|
|
self._pending_action.remove(id)
|
|
if response[0] == 'Error':
|
|
raise AsteriskError('%s. Action:%s params:%s' % (response[1]['Message'], name, params))
|
|
return response
|
|
|
|
def action(self, name, **params):
|
|
return self._action(name, params)
|
|
|
|
def connect(self):
|
|
sock = socket.socket ( socket.AF_INET, socket.SOCK_STREAM )
|
|
sock.settimeout(self.timeout)
|
|
sock.connect ( ( self.server, self.port ) )
|
|
self.socket=sock
|
|
msg = self.socket.recv(1024).strip().split('\r\n', 1)
|
|
self.version = msg[0]
|
|
if len(msg)>1:
|
|
self._toread += msg[1]
|
|
self.login()
|
|
while not self.fullybooted:
|
|
self.process_events()
|
|
self.events(self.event)
|
|
|
|
def register_events_callback(self, event, func):
|
|
self._event_callback[event] = self._event_callback.get(event, []) + [func]
|
|
|
|
def process_events(self):
|
|
if not self._event:
|
|
try: self._recv()
|
|
except socket.timeout: pass
|
|
for event in self._event:
|
|
(type, params) = event
|
|
for func in self._event_callback.get(type, []):
|
|
func(self, params)
|
|
self._event=[]
|
|
|
|
|
|
def login(self, username=None, secret=None):
|
|
"""Login Manager"""
|
|
if username is None: username = self.username
|
|
if secret is None: secret = self.password
|
|
return self.action('login', username=username, secret=secret)
|
|
|
|
def logoff(self):
|
|
""" Logoff Manager"""
|
|
response = self.action('logoff')
|
|
if response[0] == 'Goodbye':
|
|
self.fullybooted = False
|
|
|
|
def events(self, param):
|
|
"""Control Event Flow
|
|
params should be a boolean or a list among system,call,log,verbose,command,agent,user to select
|
|
which flags events should have to be sent."""
|
|
if isinstance(param, list):
|
|
eventmask = ','.join(param)
|
|
elif isinstance(param, bool):
|
|
eventmask = 'on' if param else 'off'
|
|
else:
|
|
raise EnvironmentError("%r should be a list or a bool" % param)
|
|
return self.action('events', eventmask=eventmask)
|
|
|
|
def reload(self, module):
|
|
"""Synopsis: Send a reload event
|
|
Privilege: system,config,all"""
|
|
privilege = ['system', 'config', 'all']
|
|
return self.action('reload', module=module)
|
|
|
|
def messageSend(self, src, dst, body, body_type="str"):
|
|
if not body_type in ["str", "base64"]:
|
|
raise EnvironmentError("body_type sould be 'str' ou 'base64' not %r" % body_type)
|
|
if body_type == "str":
|
|
body = base64.b64encode(body).strip()
|
|
return self._action('messageSend', {'to':dst, 'from':src, 'base64body':body})
|