CAT-SOOP is a flexible, programmable learning management system based on the Python programming language. https://catsoop.mit.edu
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

377 lines
14 KiB

  1. # This file is part of CAT-SOOP
  2. # Copyright (c) 2011-2020 by The CAT-SOOP Developers <catsoop-dev@mit.edu>
  3. #
  4. # This program is free software: you can redistribute it and/or modify it under
  5. # the terms of the GNU Affero General Public License as published by the Free
  6. # Software Foundation, either version 3 of the License, or (at your option) any
  7. # later version.
  8. #
  9. # This program is distributed in the hope that it will be useful, but WITHOUT
  10. # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
  11. # FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
  12. # details.
  13. #
  14. # You should have received a copy of the GNU Affero General Public License
  15. # along with this program. If not, see <http://www.gnu.org/licenses/>.
  16. """
  17. LTI Tool Provider interface
  18. """
  19. import uuid
  20. import urllib
  21. import pylti.common
  22. from lxml import etree
  23. from lxml.builder import ElementMaker
  24. from oauthlib.oauth1 import Client
  25. from . import auth
  26. from . import session
  27. from . import debug_log
  28. LOGGER = debug_log.LOGGER
  29. _nodoc = {"Client", "ElementMaker", "etree", "LOGGER"}
  30. class lti4cs(pylti.common.LTIBase):
  31. """
  32. LTI object representation for CAT-SOOP: validation and data receipt
  33. """
  34. def __init__(self, context, session, lti_args, lti_kwargs):
  35. self.session = session
  36. self.lti_data = {}
  37. pylti.common.LTIBase.__init__(self, lti_args, lti_kwargs)
  38. self.consumers = context.get("cs_lti_config")["consumers"]
  39. self.lti_session_key = context.get("cs_lti_config")["session_key"]
  40. self.base_url = context.get("cs_lti_config", {}).get("base_url")
  41. def verify_request(self, params, environment):
  42. try:
  43. base_url_default = "%s://%s" % (
  44. environment["wsgi.url_scheme"],
  45. environment["HTTP_HOST"],
  46. )
  47. url = "%s/%s" % (
  48. self.base_url or base_url_default,
  49. environment["REQUEST_URI"][1:],
  50. )
  51. method = environment["REQUEST_METHOD"]
  52. LOGGER.info("[lti.lti4cs.verify_request] method=%s, url=%s" % (method, url))
  53. pylti.common.verify_request_common(
  54. self.consumers, url, method, environment, params
  55. )
  56. LOGGER.info("[lti.lti4cs.verify_request] verify_request success")
  57. for prop in pylti.common.LTI_PROPERTY_LIST:
  58. if params.get(prop, None):
  59. LOGGER.info(
  60. "[lti.lti4cs.verify_request] params %s=%s",
  61. prop,
  62. params.get(prop, None),
  63. )
  64. self.lti_data[prop] = params[prop]
  65. self.session["lti_data"] = self.lti_data
  66. return True
  67. except Exception as err:
  68. LOGGER.error(
  69. "[lti.lti4cs.verify_request] verify_request failed, err=%s" % str(err)
  70. )
  71. self.session["lti_data"] = {}
  72. self.session["is_lti_user"] = False
  73. return False
  74. def save_lti_data(self, context):
  75. """
  76. Save LTI data locally (e.g. so that the checker can send grades back to the LTI tool consumer)
  77. """
  78. logging = context["csm_cslog"]
  79. uname = context["cs_user_info"]["username"]
  80. db_name = "_lti_data"
  81. logging.overwrite_log(db_name, [], uname, self.lti_data)
  82. lfn = logging.get_log_filename(db_name, [], uname)
  83. LOGGER.info("[lti] saved lti_data for user %s in file %s" % (uname, lfn))
  84. class lti4cs_response(object):
  85. """
  86. LTI handler for responses from CAT-SOOP to tool consumer
  87. """
  88. def __init__(self, context, lti_data=None):
  89. """
  90. Load LTI data from logs (cs database) if available
  91. """
  92. if lti_data:
  93. self.lti_data = (
  94. lti_data
  95. ) # use provided LTI data (e.g. for asynchronous grading response)
  96. else:
  97. logging = context["csm_cslog"]
  98. uname = context["cs_user_info"]["username"]
  99. db_name = "_lti_data"
  100. self.lti_data = logging.most_recent(db_name, [], uname) # retrieve LTI data
  101. self.consumers = context.get("cs_lti_config")["consumers"]
  102. self.pylti_url_fix = context.get("cs_lti_config").get("pylti_url_fix", {})
  103. def to_dict(self):
  104. """
  105. Return dict representation of this LTI response handler
  106. """
  107. return self.lti_data
  108. @property
  109. def have_data(self):
  110. return bool(self.lti_data)
  111. def send_outcome(self, data):
  112. """
  113. Send outcome (ie grade) to LTI tool consumer (XML as defined in LTI v1.1)
  114. """
  115. url = self.response_url
  116. result_sourcedid = self.lti_data.get("lis_result_sourcedid", None)
  117. consumer_key = self.lti_data.get("oauth_consumer_key")
  118. xml_body = self.generate_result_xml(result_sourcedid, data)
  119. LOGGER.info(
  120. "[lti.lti4cs_response.send_outcome] sending grade=%s to %s" % (data, url)
  121. )
  122. success = pylti.common.post_message(self.consumers, consumer_key, url, xml_body)
  123. if success:
  124. LOGGER.info("[lti.lti4cs_response.send_outcome] outcome sent successfully")
  125. else:
  126. LOGGER.error("[lti.lti4cs_response.send_outcome] outcome sending FAILED")
  127. def generate_result_xml(self, result_sourcedid, score):
  128. """
  129. Create the XML document that contains the new score to be sent to the LTI
  130. consumer. The format of this message is defined in the LTI 1.1 spec.
  131. """
  132. elem = ElementMaker(
  133. nsmap={None: "http://www.imsglobal.org/services/ltiv1p1/xsd/imsoms_v1p0"}
  134. )
  135. xml = elem.imsx_POXEnvelopeRequest(
  136. elem.imsx_POXHeader(
  137. elem.imsx_POXRequestHeaderInfo(
  138. elem.imsx_version("V1.0"),
  139. elem.imsx_messageIdentifier(str(uuid.uuid4())),
  140. )
  141. ),
  142. elem.imsx_POXBody(
  143. elem.replaceResultRequest(
  144. elem.resultRecord(
  145. elem.sourcedGUID(elem.sourcedId(result_sourcedid)),
  146. elem.result(
  147. elem.resultScore(
  148. elem.language("en"), elem.textString(str(score))
  149. )
  150. ),
  151. )
  152. )
  153. ),
  154. )
  155. xml = etree.tostring(xml, xml_declaration=True, encoding="UTF-8") # bytes
  156. xml = xml.decode("utf-8")
  157. return xml
  158. @property
  159. def response_url(self):
  160. """
  161. Returns remapped lis_outcome_service_url
  162. uses pylti_url_fix map to support edX dev-stack
  163. :return: remapped lis_outcome_service_url
  164. """
  165. url = ""
  166. url = self.lti_data["lis_outcome_service_url"]
  167. urls = self.pylti_url_fix
  168. # url remapping is useful for using edX devstack
  169. # edX devstack reports httpS://localhost:8000/ and listens on HTTP
  170. for prefix, mapping in urls.items():
  171. if url.startswith(prefix):
  172. for _from, _to in mapping.items():
  173. url = url.replace(_from, _to)
  174. return url
  175. # -----------------------------------------------------------------------------
  176. class LTI_Consumer(object):
  177. """
  178. Simple LTI tool consumer (useful for unit-tests of CAT-SOOP acting as an LTI tool provider)
  179. """
  180. def __init__(
  181. self,
  182. lti_url="",
  183. username="lti_user",
  184. service_url="http://localhost",
  185. consumer_key="consumer_key",
  186. secret="secret_key",
  187. ):
  188. self.lti_url = lti_url
  189. self.username = username
  190. self.service_url = service_url
  191. self.consumer_key = consumer_key
  192. self.lti_context = self._get_lti_context(self.lti_url, secret=secret)
  193. def _get_lti_context(self, lti_url, secret=None):
  194. """
  195. Generate the LTI context for a specific LTI url request
  196. lti_url: (str) URL to the LTI tool provider
  197. secret: (str) shared secret with the LTI tool provder
  198. """
  199. body = {
  200. "tool_consumer_instance_guid": u"lti_test_%s" % self.lti_url,
  201. "user_id": self.username + "__LTI__1234",
  202. "roles": u"[student]",
  203. "context_id": u"catsoop_test",
  204. "lti_url": self.lti_url,
  205. "lti_version": u"LTI-1p0",
  206. "lis_result_sourcedid": self.username,
  207. "lis_person_sourcedid": self.username,
  208. "lis_outcome_service_url": self.service_url,
  209. "lti_message_type": "basic-lti-launch-request", # required, see https://www.imsglobal.org/specs/ltiv2p0/implementation-guide#toc-21
  210. "resource_link_id": "123",
  211. }
  212. retdat = body.copy()
  213. key = self.consumer_key
  214. self._sign_lti_message(body, key, secret, lti_url)
  215. LOGGER.info(
  216. "[unit_tests] signing OAUTH with key=%s, secret=%s, url=%s"
  217. % (key, secret, lti_url)
  218. )
  219. retdat.update(
  220. dict(
  221. lti_url=lti_url,
  222. oauth_consumer_key=key,
  223. oauth_timestamp=body["oauth_timestamp"],
  224. oauth_nonce=body["oauth_nonce"],
  225. # oauth_signature=urllib.parse.unquote(body['oauth_signature']).encode('utf8'),
  226. oauth_signature=urllib.parse.unquote(body["oauth_signature"]),
  227. oauth_signature_method=body["oauth_signature_method"],
  228. oauth_version=body["oauth_version"],
  229. )
  230. )
  231. return retdat
  232. def _sign_lti_message(self, body, key, secret, url):
  233. client = Client(client_key=key, client_secret=secret)
  234. __, headers, __ = client.sign(
  235. url,
  236. http_method=u"POST",
  237. body=body,
  238. headers={"Content-Type": "application/x-www-form-urlencoded"},
  239. )
  240. auth_header = headers["Authorization"][len("OAuth ") :]
  241. auth = dict(
  242. [
  243. param.strip().replace('"', "").split("=")
  244. for param in auth_header.split(",")
  245. ]
  246. )
  247. body["oauth_nonce"] = auth["oauth_nonce"]
  248. body["oauth_signature"] = auth["oauth_signature"]
  249. body["oauth_timestamp"] = auth["oauth_timestamp"]
  250. body["oauth_signature_method"] = auth["oauth_signature_method"]
  251. body["oauth_version"] = auth["oauth_version"]
  252. # -----------------------------------------------------------------------------
  253. def serve_lti(context, path_info, environment, params, dispatch_main, return_context):
  254. """
  255. **Parameters**:
  256. * `context`: dictionary, the context associated with this request
  257. * `path_info`: list of URL path components
  258. * `environment`: dictionary containing web server data, such as form input
  259. * `dispatch_main`: function of the same form as `catsoop.dispatch.main`
  260. """
  261. if not "cs_lti_config" in context:
  262. msg = "[lti] LTI not configured - missing cs_lti_config in config.py"
  263. LOGGER.error(msg)
  264. raise Exception(msg)
  265. LOGGER.info("[lti] parameters=%s" % params)
  266. lti_action = path_info[0]
  267. LOGGER.info("[lti] lti_action=%s, path_info=%s" % (lti_action, path_info))
  268. session_data = context["cs_session_data"]
  269. if "is_lti_user" in session_data: # needed to handle form POSTS to _lti/course/...
  270. lti_ok = True # already authenticated
  271. l4c = None
  272. else:
  273. l4c = lti4cs(context, session_data, {}, {}) # not yet authenticated; check now
  274. lti_ok = l4c.verify_request(params, environment)
  275. if not lti_ok:
  276. msg = "LTI verification failed"
  277. elif l4c is not None:
  278. lti_data = session_data["lti_data"]
  279. lup = context["cs_lti_config"].get("lti_username_prefix", "lti_")
  280. lti_uname = lti_data["user_id"]
  281. if not context["cs_lti_config"].get("force_username_from_id"):
  282. lti_uname = lti_data.get(
  283. "lis_person_sourcedid", lti_uname
  284. ) # prefer username to user_id
  285. uname = "%s%s" % (lup, lti_uname)
  286. email = lti_data.get("lis_person_contact_email_primary", "%s@unknown" % uname)
  287. name = lti_data.get("lis_person_name_full", uname)
  288. lti_data["cs_user_info"] = {
  289. "username": uname,
  290. "name": name,
  291. "email": email,
  292. "lti_role": lti_data.get("roles"),
  293. "is_lti_user": True,
  294. } # save LTI user data in session for auth.py
  295. session_data.update(lti_data["cs_user_info"])
  296. session.set_session_data(
  297. context, context["cs_sid"], session_data
  298. ) # save session data
  299. user_info = auth.get_logged_in_user(
  300. context
  301. ) # saves user_info in context["cs_user_info"]
  302. LOGGER.info("[lti] auth user_info=%s" % user_info)
  303. l4c.save_lti_data(context) # save lti data, e.g. for later use by the checker
  304. if lti_ok:
  305. uname = session_data["username"]
  306. if lti_action == "course":
  307. LOGGER.info("[lti] rendering course page for %s" % uname)
  308. sub_path_info = path_info[1:] # path without _lti/course prefix
  309. sub_path = "/".join(sub_path_info)
  310. LOGGER.info("[lti] sub_path=%s" % sub_path)
  311. environment["PATH_INFO"] = sub_path
  312. environment["session_id"] = context[
  313. "cs_sid"
  314. ] # so that a new session ID isn't generated
  315. return dispatch_main(
  316. environment, return_context=return_context, form_data=params
  317. )
  318. msg = "Hello LTI"
  319. if return_context:
  320. LOGGER.info("[lti] Returning context instead of HTML response")
  321. return context
  322. return (
  323. ("200", "Ok"),
  324. {"Content-type": "text/plain", "Content-length": str(len(msg))},
  325. msg,
  326. )