Ajout du script pour le service alertsms

This commit is contained in:
Hamza Dely 2015-03-26 23:44:21 +01:00 committed by Gabriel Detraz
parent d31c24bcb9
commit 1793b26ae2

305
alertsms/alertsms.py Executable file
View file

@ -0,0 +1,305 @@
#!/bin/bash /usr/scripts/python.sh
# -*- coding: utf-8 -*-
# Module pour la téléphonie
from gammu import smsd
from gammu.exception import GSMError
# Modules intrinsèques Python
import sys
import pwd
import os
import re
import socket
from email.message import Message
from email.parser import FeedParser
# Modules made in Cr@ns
from gestion.config import dns
from lc_ldap import shortcuts
import lc_ldap.filter2 as filter
#Exécuter le script sans essayer de contacter la clé 3G
if '--dry-run' in sys.argv[1:]:
DRY_RUN = True
print("Mode dry-run\n")
else:
DRY_RUN = False
#-------------------------------------------------------------------------------
# 0) Initialisation de LDAP,objets + fcts utiles
#-------------------------------------------------------------------------------
# Taille maximale du corps d'un message SMS
if '--unicode' in sys.argv[1:]:
MAX_CHAR = 70
else:
MAX_CHAR = 160
# Connexion à la base LDAP
ldap = shortcuts.lc_ldap_test()
# Serveur boite aux lettres
MAILBOX_SERVER = 'zamok.crans.org'
# Serveurs pouvant distribuer les mails de manière légitime
TRUSTED_SERVERS = [ (
name.replace('freebox.crans.org','titanic.crans.org'),
name.replace('freebox.crans.org','titanic.crans.org').replace('.','.adm.',1),
{ 'ipv4' : '', 'ipv6' : '' }
) for name in dns.MXs.keys() ]
for _,server,ip_dict in TRUSTED_SERVERS:
list_ip = socket.getaddrinfo(server,None,0,0,socket.IPPROTO_TCP)
for _,_,_,_,ip in list_ip:
if ':' in ip[0]:
ip_dict['ipv6'] = ip[0]
else:
ip_dict['ipv4'] = ip[0]
# IP spéciales autorisées à envoyer des messages
SPECIAL_AUTH_IPv4 = []
SPECIAL_AUTH_IPv6 = []
# Expressions régulières pour le traitement du header des mails
catch_from_info = re.compile('from \[?(?P<sender>[a-zA-Z0-9.-]*)\]? \((?P<real_sender>[a-zA-Z0-9.-]*) ?\[((IPv6:(?P<ipv6>[0-9a-f:]*))|(?P<ipv4>[0-9.]*))\]\).*')
catch_by_info = re.compile('.*by (?P<who>[a-zA-Z0-9.]*) .*',flags=re.DOTALL)
def get_sender_info(line):
"""
Extrait les infos d'un élément Received du header
(prétendu nom de l'expéditeur, son nom d'après le DNS, son adresse IP)
"""
m = catch_from_info.match(line)
if m is None:
return (None,None,None,None)
return (m.group('sender'),m.group('real_sender'),m.group('ipv4'),m.group('ipv6'))
def get_all_sender_info(message):
"""
Extrait les infos de tous les éléments Received du header d'un mail
et en fait une liste (en conservant l'ordre des éléments)
"""
return [ get_sender_info(line) for line in message.get_all('Received') ]
def get_by_info(line):
"""
Extrait le nom du serveur ayant ajouté la ligne Received donnée
"""
m = catch_by_info.match(line)
if m is None:
return None
return m.group('who')
def get_all_by_info(message):
"""
Extrait le nom de tous les serveurs ayant ajouté une ligne Received
(en conservant l'ordre)
"""
return [ get_by_info(line) for line in message.get_all('Received') ]
def get_trusted_servers_v4():
"""
Renvoie la liste des serveurs de confiance avec leur IPv4
"""
return [(name,adm_name,ip_dict['ipv4']) for name,adm_name,ip_dict in TRUSTED_SERVERS ]
def get_trusted_servers_v6():
"""
Renvoie la liste des serveurs de confiance avec leur IPv6
"""
return [(name,adm_name,ip_dict['ipv6']) for name,adm_name,ip_dict in TRUSTED_SERVERS ]
#-------------------------------------------------------------------------------
# 1) Lire le mail dans le flux stdin
#-------------------------------------------------------------------------------
# Ouverture de stdin
stream = sys.stdin
# Récupération du mail
mail = stream.readlines()
fp_mail = FeedParser()
fp_mail.feed("".join(mail))
msg = fp_mail.close()
# On le décortique
received_chain = get_all_sender_info(msg)
by_chain = get_all_by_info(msg)
mail_subject = msg['Subject']
mail_content = msg.get_payload().replace('\n',' ')
#-------------------------------------------------------------------------------
# 2) Ce message est-il destiné à être envoyé par SMS ?
#-------------------------------------------------------------------------------
if '[SMS]' not in mail_subject:
print("Message non destiné au service SMS")
sys.exit(200)
#-------------------------------------------------------------------------------
# 3) Analyse de la chaine de réception du message
#-------------------------------------------------------------------------------
# Il faut qu'il y a au moins 2 serveurs dans la chaîne (boite aux lettres + serveur de mail)
# Sinon, cela signifie a priori que le message à été déposé directement dans la boite
# Il faut aussi vérifier qui a distribué le mail
if len(received_chain) < 2:
print("La chaine de réception n'est pas assez longue")
sys.exit(300)
if by_chain[0] != MAILBOX_SERVER:
print("La boîte aux lettres est suspecte :\
{0} au lieu de {1}".format(repr(by_chain[0]),MAILBOX_SERVER))
sys.exit(301)
print("Boîte aux lettres : " + MAILBOX_SERVER)
trusted_server = False
sender,real_sender,ipv4,ipv6 = received_chain[0]
if ipv4:
server = (sender,real_sender,ipv4)
trusted_server = server in get_trusted_servers_v4()
elif ipv6:
server = (sender,real_sender,ipv6)
trusted_server = server in get_trusted_servers_v6()
if not trusted_server:
print("Le serveur de distribution du courrier est suspect")
sys.exit(302)
print("Serveur de distribution : " + repr(server) + "\n")
# On fait confiance à la boîte aux lettres et au serveur de distribution
# Le serveur de distribution est le premier serveur Crans à recevoir le mail
# Au-delà, on a aucun contrôle : On est obligé de faire confiance
#-------------------------------------------------------------------------------
# 4) Vérifier l'identité de l'expéditeur
#-------------------------------------------------------------------------------
# On récupère l'adresse IP de l'expéditeur
_,_,sender_ipv4,sender_ipv6 = received_chain.pop()
# S'agit-il d'un expéditeur spécial ?
special_auth = sender_ipv4 in SPECIAL_AUTH_IPv4 or sender_ipv6 in SPECIAL_AUTH_IPv6
pseudo = 'Special'
# Si ce n'est pas une adresse spéciale, on fait une recherche LDAP
if not special_auth:
if sender_ipv4:
sender_ip = sender_ipv4
f = filter.human_to_ldap('ipHostNumber=' + sender_ipv4)
elif sender_ipv6:
sender_ip = sender_ipv6
f = filter.human_to_ldap('ip6HostNumber=' + sender_ipv6)
else:
print("Identification impossible : L'expéditeur n'a ni IPv4 ni IPv6")
sys.exit(402)
sender = ldap.search(f)
# --> Si il n'y a aucun résultat, on quitte le script
if not sender:
print('Aucun résultat pour l\'expéditeur dans la base LDAP')
sys.exit(400)
sender = sender[0]
# --> Dans le cas contraire, on regarde qui c'est
if u'machineCrans' in [ objet.value for objet in sender['objectClass'] ]:
is_authorized = True # C'est une machine Crans
pseudo = sender['host'][0].value.split('.',1)[0]
elif u'machineFixe' in [ objet.value for objet in sender['objectClass'] ] \
or u'machineWifi' in [ objet.value for objet in sender['objectClass'] ]:
if u'Nounou' in [ droit.value for droit in sender.proprio()['droits'] ]:
is_authorized = True # C'est une nounou
pseudo = sender.proprio()['uid'][0].value
else:
is_authorized = False
else:
is_authorized = False
# On quitte le script si on a pas l'autorisation
if not is_authorized:
print("Impossible d'envoyer le SMS (Autorisation refusée)")
sys.exit(401)
print(u"Expéditeur : " + pseudo + " [" + sender_ip + "]" + "\n")
#-------------------------------------------------------------------------------
# 5) Contacter le démon sms
#-------------------------------------------------------------------------------
if not DRY_RUN:
try:
# On essaie de contacter le démon SMS
daemon = smsd.SMSD('/etc/gammu-smsdrc')
except GSMError:
# On quitte si l'initialisation a échoué
print("Impossible de contacter le démon SMS")
sys.exit(500)
#-------------------------------------------------------------------------------
# 6) Ecrire le SMS
#-------------------------------------------------------------------------------
# On récupère l'utilisateur courant
user = pwd.getpwuid(os.getuid())[0]
# On trouve le numéro de l'adhérent associé dans la base LDAP
f = filter.human_to_ldap('uid='+user)
number = str(ldap.search(f)[0]['tel'][0])
# On fabrique une regex pour la forme du numéro de téléphone
tel_pattern = re.compile('^((336)|(337)|(06)|(07))[0-9]{8}$')
# On vérifie que ce soit un numéro de téléphone mobile valide
if not tel_pattern.match(number):
print("Numéro du destinataire invalide")
sys.exit(600)
# Si le numéro commence par l'indicatif 33, on rajoute le +
if number.startswith('33'):
number = '+' + number
# On écrit le message
text = (pseudo + u' : ' + mail_content)[0:MAX_CHAR-1]
message = {
'Text' : text,
'SMSC' : { 'Location' : 1 },
'Number' : number,
}
print(u"Destinataire : " + user)
print(u"Contenu du message : " + text + "\n")
#-------------------------------------------------------------------------------
# 7) L'envoyer
#-------------------------------------------------------------------------------
if not DRY_RUN:
try:
# On envoie le message
daemon.InjectSMS([message])
except GSMError:
print("Le message n'a pas pu être placé dans la file d'attente")
sys.exit(700)