scripts/wiki/auth/cas.py
2014-02-05 18:34:38 +01:00

183 lines
7.1 KiB
Python

# -*- coding: iso-8859-1 -*-
"""
MoinMoin - CAS authentication
Jasig CAS (see http://www.jasig.org/cas) authentication module.
@copyright: 2012 MoinMoin:RichardLiao
@license: GNU GPL, see COPYING for details.
"""
import sys
import os
import time
import re
import urlparse
import urllib, urllib2
from lxml import etree
from lxml.etree import XMLSyntaxError
from MoinMoin import log
logging = log.getLogger(__name__)
from MoinMoin.auth import BaseAuth
from MoinMoin import user, wikiutil
from MoinMoin.theme import load_theme_fallback
class PyCAS(object):
"""A class for working with a CAS server."""
def __init__(self, server_url, renew=False, login_path='/login', logout_path='/logout',
validate_path='/validate', coding='utf-8'):
self.server_url = server_url
self.renew = renew
self.login_path = login_path
self.logout_path = logout_path
self.validate_path = validate_path
self.coding = coding
def login_url(self, service):
"""Return the login URL for the given service."""
url = self.server_url + self.login_path + '?service=' + urllib.quote_plus(service)
if self.renew:
url += "&renew=true"
return url
def logout_url(self, redirect_url=None):
"""Return the logout URL."""
url = self.server_url + self.logout_path
if redirect_url:
url += '?url=' + urllib.quote_plus(redirect_url)
url += '&service=' + urllib.quote_plus(redirect_url)
return url
def validate_url(self, service, ticket):
"""Return the validation URL for the given service. (For CAS 1.0)"""
url = self.server_url + self.validate_path + '?service=' + urllib.quote_plus(service) + '&ticket=' + urllib.quote_plus(ticket)
if self.renew:
url += "&renew=true"
return url
def singlesignout(self, callback, body):
try:
nodes = etree.fromstring(body).xpath("/samlp:LogoutRequest/samlp:SessionIndex", namespaces={'samlp' : 'urn:oasis:names:tc:SAML:2.0:protocol'})
for node in nodes:
callback(node.text)
except XMLSyntaxError:
pass
def validate_ticket(self, service, ticket):
"""Validate the given ticket against the given service."""
f = urllib2.urlopen(self.validate_url(service, ticket))
valid = f.readline()
valid = valid.strip() == 'yes'
user = f.readline().strip()
user = user.decode(self.coding)
return valid, user
class CASAuth(BaseAuth):
""" handle login from CAS """
name = 'CAS'
login_inputs = []
logout_possible = True
def __init__(self, auth_server, login_path="/login", logout_path="/logout", validate_path="/validate", action="login_cas", create_user=False, fallback_url=None, ticket_path=None):
BaseAuth.__init__(self)
self.cas = PyCAS(auth_server, login_path=login_path,
validate_path=validate_path, logout_path=logout_path)
self.action = action
self.create_user = create_user
self.fallback_url = fallback_url
self.ticket_path = ticket_path
def request(self, request, user_obj, **kw):
ticket = request.args.get("ticket", "")
action = request.args.get("action", "")
force = request.args.get("force", None) is not None
logoutRequest = request.args.get("logoutRequest", [])
p = urlparse.urlparse(request.url)
url = urlparse.urlunparse(('https', p.netloc, p.path, "", "", ""))
def store_ticket(ticket, username):
with open(self.ticket_path + ticket, 'w') as f:
f.write(username)
def username_of_ticket(ticket):
try:
with open(self.ticket_path + ticket) as f:
username = f.read()
os.remove(self.ticket_path + ticket)
return username
except IOError:
return None
def logout_user(ticket):
username = username_of_ticket(ticket)
if username:
u = user.User(request, None, username)
checks = []
if u.exists():
def user_matches(session):
try:
return session['user.id'] == u.id
except KeyError:
return False
session_service = request.cfg.session_service
for sid in session_service.get_all_session_ids(request):
session = session_service.get_session(request, sid)
if user_matches(session):
session_service.destroy_session(request, session)
# authenticated user
if not force and user_obj and user_obj.valid:
if (action == self.action or (ticket and ticket.startswith('ST-'))) and user_obj.auth_method == self.name:
request.http_redirect(url)
if self.ticket_path and request.method == 'POST':
logoutRequest=request.form.get('logoutRequest', None)
if logoutRequest is not None:
sys.stderr.write("Tentative de deconnexion du CAS : %s\n" % logoutRequest)
self.cas.singlesignout(logout_user, logoutRequest)
# valid ticket on CAS
if ticket and ticket.startswith('ST-'):
valid, username = self.cas.validate_ticket(url, ticket)
if valid:
sys.stderr.write("Authentifiaction de %s sur le CAS\n" % username)
u = user.User(request, auth_username=username, auth_method=self.name)
# auto create user ?
if self.create_user:
u.valid = valid
u.create_or_update(True)
else:
u.valid = u.exists()
if self.fallback_url and not u.valid:
request.http_redirect("%s?action=%s&wiki_url=%s" % (self.fallback_url, self.action, url))
if u.valid:
store_ticket(ticket, username)
load_theme_fallback(request, u.theme_name)
return u, True
else:
request.http_redirect(self.cas.login_url(url))
elif self.action == action: # Redirect login
request.http_redirect(self.cas.login_url(url))
return user_obj, True
def logout(self, request, user_obj, **kw):
if self.name and user_obj and user_obj.auth_method == self.name:
user_obj.valid = False
request.cfg.session_service.destroy_session(request, request.session)
p = urlparse.urlparse(request.url)
url = urlparse.urlunparse((p.scheme, p.netloc, p.path, "", "", ""))
request.http_redirect(self.cas.logout_url(url))
return user_obj, False
def login_hint(self, request):
p = urlparse.urlparse(request.url)
url = urlparse.urlunparse(('https', p.netloc, p.path, "", "", ""))
_ = request.getText
msg = _('<p><a href="%s">Se connecter via le CAS</a> (vous devez disposer d\'un compte Cr@ns pour cela)</p>' % self.cas.login_url(url))
return msg