CAT-SOOP is a flexible, programmable learning management system based on the Python programming language. https://catsoop.mit.edu
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

185 lines
7.3 KiB

  1. # This file is part of CAT-SOOP
  2. # Copyright (c) 2011-2017 Adam Hartz <hartz@mit.edu>
  3. #
  4. # This program is free software: you can redistribute it and/or modify it under
  5. # the terms of the GNU Affero General Public License as published by the Free
  6. # Software Foundation, either version 3 of the License, or (at your option) any
  7. # later version.
  8. #
  9. # This program is distributed in the hope that it will be useful, but WITHOUT
  10. # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
  11. # FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
  12. # details.
  13. #
  14. # You should have received a copy of the GNU Affero General Public License
  15. # along with this program. If not, see <http://www.gnu.org/licenses/>.
  16. import json
  17. import base64
  18. import urllib.request, urllib.parse, urllib.error
  19. stored_state = cs_session_data.get('_openid_state', None)
  20. state = cs_form.get('state', None)
  21. stored_nonce = cs_session_data.get('_openid_nonce', None)
  22. error = None
  23. if stored_state is None:
  24. error = "No state stored in session."
  25. elif state is None:
  26. error = "No state provided by server."
  27. elif stored_state != state:
  28. error = ("Suspected tampering! "
  29. "State from server does not match local state.")
  30. if error is None:
  31. # we should have course information in the session data, so we can see if
  32. # we need to do a preload.
  33. ctx = {}
  34. csm_loader.load_global_data(ctx)
  35. session = cs_session_data
  36. if '_openid_course' in session:
  37. ctx['cs_course'] = cs_session_data['_openid_course']
  38. ctx['cs_path_info'] = [ctx['cs_course']]
  39. cfile = csm_dispatch.content_file_location(ctx, [ctx['cs_course']])
  40. csm_loader.do_early_load(ctx, ctx['cs_course'], [], ctx, cfile)
  41. # if we're here, we know we got back something reasonable.
  42. # now, need to send POST request
  43. id = ctx.get('cs_openid_client_id', '')
  44. secret = ctx.get('cs_openid_client_secret', '')
  45. redir_url = '%s/__AUTH__/openid_connect/callback' % ctx['cs_url_root']
  46. data = urllib.parse.urlencode({'grant_type': 'authorization_code',
  47. 'code': cs_form['code'],
  48. 'redirect_uri': redir_url,
  49. 'client_id': id,
  50. 'client_secret': secret}).encode()
  51. request = urllib.request.Request('%s/token' % ctx['cs_openid_server'], data)
  52. try:
  53. resp = urllib.request.urlopen(request, data).read()
  54. except:
  55. error = "Server rejected Token request."
  56. if error is None:
  57. try:
  58. resp = json.loads(resp.decode())
  59. except:
  60. error = "Token request response was not valid JSON."
  61. if error is None:
  62. # make sure we have been given authorization to access the proper
  63. # information
  64. desired_scope = ctx.get('cs_openid_scope', 'openid profile email')
  65. desired_scope = desired_scope.split()
  66. scope_error = ('You must provide CAT-SOOP access '
  67. 'to the following scopes: %r') % desired_scope
  68. if ('id_token' not in resp or
  69. any(i not in resp.get('scope', '').split()
  70. for i in desired_scope)):
  71. error = scope_error
  72. if error is None:
  73. # verify JWT signature
  74. id_token, sig = resp['id_token'].rsplit('.', 1)
  75. if ctx.get('cs_openid_verify_signature', True):
  76. try:
  77. import jose
  78. except:
  79. error = ('Verifying JWT signatures requires installing '
  80. 'python-jose (<tt>pip install python-jose</tt>).')
  81. if error is None:
  82. from jose import jwk
  83. from jose.utils import base64url_decode
  84. # get JWK from the web
  85. url = '%s/jwk' % ctx.get('cs_openid_server', '')
  86. try:
  87. key = json.loads(urllib.request.urlopen(url).read().decode())['keys'][0]
  88. except:
  89. error = 'Server rejected request for JWK'
  90. if 'alg' not in key:
  91. key['alg'] = ctx.get('cs_openid_default_algorithm', 'RS256')
  92. key = jwk.construct(key)
  93. decoded_sig = base64url_decode(sig.encode())
  94. if not key.verify(id_token.encode(), decoded_sig):
  95. error = 'Invalid signature on JWS.'
  96. if error is None:
  97. # check information from ID Token
  98. def _b64_pad(s, char='='):
  99. missing = len(s) % 4
  100. if not missing:
  101. return s
  102. return s + char*(4-missing)
  103. header, body = id_token.split('.')
  104. header = _b64_pad(header)
  105. body = _b64_pad(body)
  106. try:
  107. header = json.loads(base64.b64decode(header).decode())
  108. body = json.loads(base64.b64decode(body).decode())
  109. except:
  110. error = "Malformed header and/or body of ID token"
  111. if error is None:
  112. now = time.time.time()
  113. if body['iss'].rstrip('/') != ctx.get('cs_openid_server', None):
  114. error = 'Invalid ID Token issuer.'
  115. elif body['nonce'] != stored_nonce:
  116. error = ('Suspected tampering!'
  117. 'Nonce from server does not match local nonce.')
  118. elif body['iat'] > now + 60:
  119. error = 'ID Token is from the future. %r' % ((body['iat'], now),)
  120. elif now > body['exp'] + 60:
  121. error = 'ID Token has expired.'
  122. elif ctx.get('cs_openid_client_id', None) not in body['aud']:
  123. error = 'ID Token is not intended for CAT-SOOP.'
  124. if error is None:
  125. # get user information from server
  126. access_tok = resp['access_token']
  127. redir = '%s/userinfo' % ctx.get('cs_openid_server', '')
  128. headers = {'Authorization': 'Bearer %s' % access_tok}
  129. request2 = urllib.request.Request(redir, headers=headers)
  130. try:
  131. resp = json.loads(urllib.request.urlopen(request2).read().decode())
  132. except:
  133. error = 'Server rejected request for User Information'
  134. if error is None:
  135. # try to set usert information in session
  136. def get_username(idtoken, userinfo):
  137. return userinfo['preferred_username']
  138. def get_email(idtoken, userinfo):
  139. return userinfo['email']
  140. try:
  141. get_username = ctx.get('cs_openid_username_generator', get_username)
  142. get_email = ctx.get('cs_openid_email_generator', get_email)
  143. openid_info = {'username': get_username(body, resp),
  144. 'email': get_email(body, resp),
  145. 'name': resp['name']}
  146. session.update(openid_info)
  147. session['course'] = cs_session_data['_openid_course']
  148. except:
  149. error = "Error setting user information."
  150. path = [csm_base_context.cs_url_root] + session['_openid_path']
  151. redirect_location = '/'.join(path)
  152. if error is None:
  153. # we made it! set session data and redirect to original page
  154. csm_session.set_session_data(globals(), cs_sid, session)
  155. csm_cslog.overwrite_log(None, 'extra_info', session['username'], openid_info)
  156. cs_handler = 'redirect'
  157. else:
  158. cs_handler = 'passthrough'
  159. cs_content_header = 'Could Not Log You In'
  160. cs_content = 'You could not be logged in to the system because of the following error:<br/><font color="red">%s</font><p>Click <a href="%s?loginaction=redirect">here</a> to try again.' % (error, redirect_location)
  161. cs_footer = cs_footer.replace(cs_base_logo_text, csm_errors.error_500_logo)