182 lines
7.1 KiB
Python
182 lines
7.1 KiB
Python
# -*- coding: iso-8859-1 -*-
|
|
"""
|
|
MoinMoin - CAS authentication
|
|
|
|
Jasig CAS (see http://www.jasig.org/cas) authentication module.
|
|
|
|
@copyright: 2012 MoinMoin:RichardLiao
|
|
@license: GNU GPL, see COPYING for details.
|
|
"""
|
|
|
|
import sys
|
|
import os
|
|
import time, re
|
|
import urlparse
|
|
import urllib, urllib2
|
|
from lxml import etree
|
|
from lxml.etree import XMLSyntaxError
|
|
|
|
from MoinMoin import log
|
|
logging = log.getLogger(__name__)
|
|
|
|
from MoinMoin.auth import BaseAuth
|
|
from MoinMoin import user, wikiutil
|
|
from MoinMoin.theme import load_theme_fallback
|
|
|
|
class PyCAS(object):
|
|
"""A class for working with a CAS server."""
|
|
|
|
def __init__(self, server_url, renew=False, login_path='/login', logout_path='/logout',
|
|
validate_path='/validate', coding='utf-8'):
|
|
self.server_url = server_url
|
|
self.renew = renew
|
|
self.login_path = login_path
|
|
self.logout_path = logout_path
|
|
self.validate_path = validate_path
|
|
self.coding = coding
|
|
|
|
def login_url(self, service):
|
|
"""Return the login URL for the given service."""
|
|
url = self.server_url + self.login_path + '?service=' + urllib.quote_plus(service)
|
|
if self.renew:
|
|
url += "&renew=true"
|
|
return url
|
|
def logout_url(self, redirect_url=None):
|
|
"""Return the logout URL."""
|
|
url = self.server_url + self.logout_path
|
|
if redirect_url:
|
|
url += '?url=' + urllib.quote_plus(redirect_url)
|
|
url += '&service=' + urllib.quote_plus(redirect_url)
|
|
return url
|
|
|
|
def validate_url(self, service, ticket):
|
|
"""Return the validation URL for the given service. (For CAS 1.0)"""
|
|
url = self.server_url + self.validate_path + '?service=' + urllib.quote_plus(service) + '&ticket=' + urllib.quote_plus(ticket)
|
|
if self.renew:
|
|
url += "&renew=true"
|
|
return url
|
|
|
|
def singlesignout(self, callback, body):
|
|
try:
|
|
nodes = etree.fromstring(body).xpath("/samlp:LogoutRequest/samlp:SessionIndex", namespaces={'samlp' : 'urn:oasis:names:tc:SAML:2.0:protocol'})
|
|
for node in nodes:
|
|
callback(node.text)
|
|
except XMLSyntaxError:
|
|
pass
|
|
|
|
def validate_ticket(self, service, ticket):
|
|
"""Validate the given ticket against the given service."""
|
|
f = urllib2.urlopen(self.validate_url(service, ticket))
|
|
valid = f.readline()
|
|
valid = valid.strip() == 'yes'
|
|
user = f.readline().strip()
|
|
user = user.decode(self.coding)
|
|
return valid, user
|
|
|
|
class CASAuth(BaseAuth):
|
|
""" handle login from CAS """
|
|
name = 'CAS'
|
|
login_inputs = []
|
|
logout_possible = True
|
|
|
|
def __init__(self, auth_server, login_path="/login", logout_path="/logout", validate_path="/validate", action="login_cas", create_user=False, fallback_url=None, ticket_path=None):
|
|
BaseAuth.__init__(self)
|
|
self.cas = PyCAS(auth_server, login_path=login_path,
|
|
validate_path=validate_path, logout_path=logout_path)
|
|
self.action = action
|
|
self.create_user = create_user
|
|
self.fallback_url = fallback_url
|
|
self.ticket_path = ticket_path
|
|
|
|
def request(self, request, user_obj, **kw):
|
|
ticket = request.args.get("ticket", "")
|
|
action = request.args.get("action", "")
|
|
force = request.args.get("force", None) is not None
|
|
logoutRequest = request.args.get("logoutRequest", [])
|
|
p = urlparse.urlparse(request.url)
|
|
url = urlparse.urlunparse(('https', p.netloc, p.path, "", "", ""))
|
|
|
|
def store_ticket(ticket, username):
|
|
with open(self.ticket_path + ticket, 'w') as f:
|
|
f.write(username)
|
|
|
|
def username_of_ticket(ticket):
|
|
try:
|
|
with open(self.ticket_path + ticket) as f:
|
|
username = f.read()
|
|
os.remove(self.ticket_path + ticket)
|
|
return username
|
|
except IOError:
|
|
return None
|
|
|
|
def logout_user(ticket):
|
|
username = username_of_ticket(ticket)
|
|
if username:
|
|
u = user.User(request, None, username)
|
|
checks = []
|
|
if u.exists():
|
|
def user_matches(session):
|
|
try:
|
|
return session['user.id'] == u.id
|
|
except KeyError:
|
|
return False
|
|
session_service = request.cfg.session_service
|
|
for sid in session_service.get_all_session_ids(request):
|
|
session = session_service.get_session(request, sid)
|
|
|
|
if user_matches(session):
|
|
session_service.destroy_session(request, session)
|
|
|
|
# authenticated user
|
|
if not force and user_obj and user_obj.valid:
|
|
if (action == self.action or (ticket and ticket.startswith('ST-'))) and user_obj.auth_method == self.name:
|
|
request.http_redirect(url)
|
|
|
|
if self.ticket_path and request.method == 'POST':
|
|
logoutRequest=request.form.get('logoutRequest', None)
|
|
if logoutRequest is not None:
|
|
sys.stderr.write("Tentative de deconnexion du CAS : %s\n" % logoutRequest)
|
|
self.cas.singlesignout(logout_user, logoutRequest)
|
|
|
|
# valid ticket on CAS
|
|
if ticket and ticket.startswith('ST-'):
|
|
valid, username = self.cas.validate_ticket(url, ticket)
|
|
if valid:
|
|
sys.stderr.write("Authentifiaction de %s sur le CAS\n" % username)
|
|
u = user.User(request, auth_username=username, auth_method=self.name)
|
|
# auto create user ?
|
|
if self.create_user:
|
|
u.valid = valid
|
|
u.create_or_update(True)
|
|
else:
|
|
u.valid = u.exists()
|
|
if self.fallback_url and not u.valid:
|
|
request.http_redirect("%s?action=%s&wiki_url=%s" % (self.fallback_url, self.action, url))
|
|
if u.valid:
|
|
store_ticket(ticket, username)
|
|
load_theme_fallback(request, u.theme_name)
|
|
return u, True
|
|
else:
|
|
request.http_redirect(self.cas.login_url(url))
|
|
elif self.action == action: # Redirect login
|
|
request.http_redirect(self.cas.login_url(url))
|
|
|
|
return user_obj, True
|
|
|
|
def logout(self, request, user_obj, **kw):
|
|
if self.name and user_obj and user_obj.auth_method == self.name:
|
|
user_obj.valid = False
|
|
request.cfg.session_service.destroy_session(request, request.session)
|
|
|
|
p = urlparse.urlparse(request.url)
|
|
url = urlparse.urlunparse((p.scheme, p.netloc, p.path, "", "", ""))
|
|
request.http_redirect(self.cas.logout_url(url))
|
|
return user_obj, False
|
|
|
|
|
|
def login_hint(self, request):
|
|
p = urlparse.urlparse(request.url)
|
|
url = urlparse.urlunparse(('https', p.netloc, p.path, "", "", ""))
|
|
_ = request.getText
|
|
msg = _('<p><a href="%s">Se connecter via le CAS</a> (vous devez disposer d\'un compte Cr@ns pour cela)</p>' % self.cas.login_url(url))
|
|
return msg
|