
le paramètre ticket (ticket cas) interfère avec la suppression des fichiers, qui utilise aussi un ticket (ticket moinmoin) pour prévenir des csrf.
184 lines
6.9 KiB
Python
184 lines
6.9 KiB
Python
# -*- coding: iso-8859-1 -*-
|
|
"""
|
|
MoinMoin - CAS authentication
|
|
|
|
Jasig CAS (see http://www.jasig.org/cas) authentication module.
|
|
|
|
@copyright: 2012 MoinMoin:RichardLiao
|
|
@license: GNU GPL, see COPYING for details.
|
|
"""
|
|
|
|
import sys
|
|
import os
|
|
import time, re
|
|
import urlparse
|
|
import urllib, urllib2
|
|
from lxml import etree
|
|
from lxml.etree import XMLSyntaxError
|
|
|
|
from MoinMoin import log
|
|
logging = log.getLogger(__name__)
|
|
|
|
from MoinMoin.auth import BaseAuth
|
|
from MoinMoin import user, wikiutil
|
|
from MoinMoin.theme import load_theme_fallback
|
|
|
|
class PyCAS(object):
|
|
"""A class for working with a CAS server."""
|
|
|
|
def __init__(self, server_url, renew=False, login_path='/login', logout_path='/logout',
|
|
validate_path='/validate', coding='utf-8'):
|
|
self.server_url = server_url
|
|
self.renew = renew
|
|
self.login_path = login_path
|
|
self.logout_path = logout_path
|
|
self.validate_path = validate_path
|
|
self.coding = coding
|
|
|
|
def login_url(self, service):
|
|
"""Return the login URL for the given service."""
|
|
url = self.server_url + self.login_path + '?service=' + urllib.quote_plus(service)
|
|
if self.renew:
|
|
url += "&renew=true"
|
|
return url
|
|
def logout_url(self, redirect_url=None):
|
|
"""Return the logout URL."""
|
|
url = self.server_url + self.logout_path
|
|
if redirect_url:
|
|
url += '?url=' + urllib.quote_plus(redirect_url)
|
|
url += '&service=' + urllib.quote_plus(redirect_url)
|
|
return url
|
|
|
|
def validate_url(self, service, ticket):
|
|
"""Return the validation URL for the given service. (For CAS 1.0)"""
|
|
url = self.server_url + self.validate_path + '?service=' + urllib.quote_plus(service) + '&ticket=' + urllib.quote_plus(ticket)
|
|
if self.renew:
|
|
url += "&renew=true"
|
|
return url
|
|
|
|
def singlesignout(self, callback, body):
|
|
try:
|
|
nodes = etree.fromstring(body).xpath("/samlp:LogoutRequest/samlp:SessionIndex", namespaces={'samlp' : 'urn:oasis:names:tc:SAML:2.0:protocol'})
|
|
for node in nodes:
|
|
callback(node.text)
|
|
except XMLSyntaxError:
|
|
pass
|
|
|
|
def validate_ticket(self, service, ticket):
|
|
"""Validate the given ticket against the given service."""
|
|
f = urllib2.urlopen(self.validate_url(service, ticket))
|
|
valid = f.readline()
|
|
valid = valid.strip() == 'yes'
|
|
user = f.readline().strip()
|
|
user = user.decode(self.coding)
|
|
return valid, user
|
|
|
|
class CASAuth(BaseAuth):
|
|
""" handle login from CAS """
|
|
name = 'CAS'
|
|
login_inputs = []
|
|
logout_possible = True
|
|
|
|
def __init__(self, auth_server, login_path="/login", logout_path="/logout", validate_path="/validate", action="login_cas", create_user=False, fallback_url=None, ticket_path=None):
|
|
BaseAuth.__init__(self)
|
|
self.cas = PyCAS(auth_server, login_path=login_path,
|
|
validate_path=validate_path, logout_path=logout_path)
|
|
self.action = action
|
|
self.create_user = create_user
|
|
self.fallback_url = fallback_url
|
|
self.ticket_path = ticket_path
|
|
|
|
def request(self, request, user_obj, **kw):
|
|
ticket = request.args.get("ticket", "")
|
|
action = request.args.get("action", "")
|
|
force = request.args.get("force", None) is not None
|
|
logoutRequest = request.args.get("logoutRequest", [])
|
|
p = urlparse.urlparse(request.url)
|
|
url = urlparse.urlunparse(('https', p.netloc, p.path, "", "", ""))
|
|
|
|
def store_ticket(ticket, username):
|
|
with open(self.ticket_path + ticket, 'w') as f:
|
|
f.write(username)
|
|
|
|
def username_of_ticket(ticket):
|
|
try:
|
|
with open(self.ticket_path + ticket) as f:
|
|
username = f.read()
|
|
os.remove(self.ticket_path + ticket)
|
|
return username
|
|
except IOError:
|
|
return None
|
|
|
|
def logout_user(ticket):
|
|
username = username_of_ticket(ticket)
|
|
if username:
|
|
u = user.User(request, None, username)
|
|
checks = []
|
|
if u.exists():
|
|
def user_matches(session):
|
|
try:
|
|
return session['user.id'] == u.id
|
|
except KeyError:
|
|
return False
|
|
session_service = request.cfg.session_service
|
|
for sid in session_service.get_all_session_ids(request):
|
|
session = session_service.get_session(request, sid)
|
|
|
|
if user_matches(session):
|
|
session_service.destroy_session(request, session)
|
|
|
|
# authenticated user
|
|
if not force and user_obj and user_obj.valid:
|
|
if self.action == action:
|
|
request.http_redirect(url)
|
|
return user_obj, True
|
|
|
|
if self.ticket_path and request.method == 'POST':
|
|
logoutRequest=request.form.get('logoutRequest', None)
|
|
if logoutRequest is not None:
|
|
sys.stderr.write("Tentative de deconnexion du CAS : %s\n" % logoutRequest)
|
|
self.cas.singlesignout(logout_user, logoutRequest)
|
|
|
|
# anonymous
|
|
if not ticket and not self.action == action:
|
|
return user_obj, True
|
|
|
|
# valid ticket on CAS
|
|
if ticket:
|
|
valid, username = self.cas.validate_ticket(url, ticket)
|
|
if valid:
|
|
sys.stderr.write("Authentifiaction de %s sur le CAS\n" % username)
|
|
u = user.User(request, auth_username=username, auth_method=self.name)
|
|
# auto create user ?
|
|
if self.create_user:
|
|
u.valid = valid
|
|
u.create_or_update(True)
|
|
else:
|
|
u.valid = u.exists()
|
|
if self.fallback_url and not u.valid:
|
|
request.http_redirect("%s?action=%s&wiki_url=%s" % (self.fallback_url, self.action, url))
|
|
if u.valid:
|
|
store_ticket(ticket, username)
|
|
load_theme_fallback(request, u.theme_name)
|
|
return u, True
|
|
|
|
# login
|
|
request.http_redirect(self.cas.login_url(url))
|
|
|
|
return user_obj, True
|
|
|
|
def logout(self, request, user_obj, **kw):
|
|
if self.name and user_obj and user_obj.auth_method == self.name:
|
|
user_obj.valid = False
|
|
request.cfg.session_service.destroy_session(request, request.session)
|
|
|
|
p = urlparse.urlparse(request.url)
|
|
url = urlparse.urlunparse((p.scheme, p.netloc, p.path, "", "", ""))
|
|
request.http_redirect(self.cas.logout_url(url))
|
|
return user_obj, False
|
|
|
|
|
|
def login_hint(self, request):
|
|
_ = request.getText
|
|
msg = _('<p><a href="?action=login_cas">Se connecter via le CAS</a> (vous devez disposer d\'un compte Cr@ns pour cela)</p>')
|
|
return msg
|