diff --git a/.gitignore b/.gitignore index f09233f1d..c2b970435 100644 --- a/.gitignore +++ b/.gitignore @@ -29,3 +29,4 @@ learning_observer/passwd.lo build/ dist/ node_modules +learning_observer/learning_observer/learning_tools/config.ini diff --git a/learning_observer/learning_observer/auth/__init__.py b/learning_observer/learning_observer/auth/__init__.py index 5f0c37b9f..772058cdf 100644 --- a/learning_observer/learning_observer/auth/__init__.py +++ b/learning_observer/learning_observer/auth/__init__.py @@ -57,6 +57,7 @@ # Utility functions from learning_observer.auth.utils import fernet_key from learning_observer.auth.utils import google_id_to_user_id +from learning_observer.auth.utils import canvas_id_to_user_id # Utility handlers from learning_observer.auth.handlers import logout_handler diff --git a/learning_observer/learning_observer/auth/utils.py b/learning_observer/learning_observer/auth/utils.py index ec9f92e75..bb41ebedb 100644 --- a/learning_observer/learning_observer/auth/utils.py +++ b/learning_observer/learning_observer/auth/utils.py @@ -38,6 +38,19 @@ def google_id_to_user_id(google_id): except ValueError: debug_log("Error handling:", google_id) raise + +def canvas_id_to_user_id(google_id): + ''' + Convert a Google ID like: + `72635729500910017892163494291` + to: + `gc-72635729500910017892163494291` + ''' + try: + return "gc-" + str(int(google_id)) + except ValueError: + debug_log("Error handling:", google_id) + raise def fernet_key(secret_string): diff --git a/learning_observer/learning_observer/canvas.py b/learning_observer/learning_observer/canvas.py new file mode 100644 index 000000000..f382a618e --- /dev/null +++ b/learning_observer/learning_observer/canvas.py @@ -0,0 +1,308 @@ +import os +import json +import string +import recordclass +import configparser +import aiohttp +import aiohttp.web + +import learning_observer.settings as settings +import learning_observer.log_event +import learning_observer.util +import learning_observer.auth +import learning_observer.runtime + +cache = None + +class Endpoint(recordclass.make_dataclass("Endpoint", ["name", "remote_url", "doc", "cleaners"], defaults=["", None])): + def arguments(self): + return extract_parameters_from_format_string(self.remote_url) + + def _local_url(self): + parameters = "}/{".join(self.arguments()) + base_url = f"/canvas/{self.name}" + if len(parameters) == 0: + return base_url + else: + return base_url + "/{" + parameters + "}" + + def _add_cleaner(self, name, cleaner): + if self.cleaners is None: + self.cleaners = dict() + self.cleaners[name] = cleaner + if 'local_url' not in cleaner: + cleaner['local_url'] = self._local_url + "/" + name + + def _cleaners(self): + if self.cleaners is None: + return [] + else: + return self.cleaners + +ENDPOINTS = list(map(lambda x: Endpoint(*x), [ + ("course_list", "/courses"), + ("course_roster", "/courses/{courseId}/students"), + ("course_work", "/courses/{courseId}/assignments"), + ("coursework_submissions", "/courses/{courseId}/assignments/{assignmentId}/submissions"), +])) + +def extract_parameters_from_format_string(format_string): + ''' + Extracts parameters from a format string. E.g. + + >>> ("hello {hi} my {bye}")] + ['hi', 'bye'] + ''' + # The parse returns a lot of context, which we discard. In particular, the + # last item is often about the suffix after the last parameter and may be + # `None` + return [f[1] for f in string.Formatter().parse(format_string) if f[1] is not None] + +class Canvas: + def __init__(self, config_path='./config.ini'): + script_dir = os.path.dirname(os.path.abspath(__file__)) + config_path = os.path.join(script_dir, config_path) + + self.config = configparser.ConfigParser() + self.config.read(config_path) + + # Check if 'SCHOOLOGY_CONFIG' section is present + if 'CANVAS_CONFIG' not in self.config: + raise KeyError("The configuration file does not contain 'CANVAS_CONFIG' section") + + try: + self.defaultServer = self.config['CANVAS_CONFIG']['DEFAULT_SERVER'] + self.access_token = self.config['CANVAS_CONFIG']['ACCESS_TOKEN'] + self.refresh_token = self.config['CANVAS_CONFIG']['REFRESH_TOKEN'] + self.client_id = self.config['CANVAS_CONFIG']['CLIENT_ID'] + self.client_secret = self.config['CANVAS_CONFIG']['CLIENT_SECRET'] + except KeyError as e: + raise KeyError(f"Missing required configuration key: {e}") + + self.default_version = 'v1' + self.defaultPerPage = 10000 + self.base_url = f'https://{self.defaultServer}/api/{self.default_version}' + + def update_access_tokens(self, access_token): + self.config['CANVAS_CONFIG']['ACCESS_TOKEN'] = access_token + self.access_token = access_token + with open('./config.ini', 'w') as configfile: + self.config.write(configfile) + + async def api_call(self, method, endpoint, params=None, data=None, absolute_url=False, retry=True, **kwargs): + if absolute_url: + url = endpoint + else: + url = self.base_url + endpoint + if params: + url += '?' + '&'.join(f"{k}={v}" for k, v in params.items()) + + url = url.format(**kwargs) + + headers = { + 'Authorization': f'Bearer {self.access_token}', + 'Content-Type': 'application/json' + } + + async with aiohttp.ClientSession() as client: + response_func = getattr(client, method.lower()) + async with response_func(url, headers=headers, params=params, json=data) as response: + if response.status == 401 and retry: + new_tokens = self.refresh_tokens() + if 'access_token' in new_tokens: + self.update_access_tokens(new_tokens['access_token']) + return await self.api_call(method, endpoint, params, data, absolute_url, retry=False, **kwargs) + + if response.status != 200: + response.raise_for_status() + + return await response.json() + + async def refresh_tokens(self): + url = f'https://{self.defaultServer}/login/oauth2/token' + params = { + "grant_type": "refresh_token", + "client_id": self.client_id, + "client_secret": self.client_secret, + "refresh_token": self.refresh_token + } + return await self.api_call('POST', url, params=params, absolute_url=True) + +async def raw_canvas_ajax(runtime, target_url, retry=False, **kwargs): + ''' + Make an AJAX call to Canvas, managing auth + auth. + + * runtime is a Runtime class containing request information. + * target_url is typically grabbed from ENDPOINTS + * ... and we pass the named parameters + ''' + canvas = Canvas() + + params = {k: v for k, v in kwargs.items() if v is not None} + try: + response = await canvas.api_call('GET', target_url, params=params, **kwargs) + response["kwargs"] = kwargs + except aiohttp.ClientResponseError as e: + if e.status == 401 and retry: + new_tokens = await canvas.refresh_tokens() + if 'access_token' in new_tokens: + canvas.update_access_tokens(new_tokens['access_token']) + return await raw_canvas_ajax(runtime, target_url, retry=False, **kwargs) + raise + + return response + +def raw_access_partial(remote_url, name=None): + ''' + This is a helper which allows us to create a function which calls specific + Canvas APIs. + ''' + async def caller(request, **kwargs): + ''' + Make an AJAX request to Canvas + ''' + return await raw_canvas_ajax(request, remote_url, **kwargs) + setattr(caller, "__qualname__", name) + + return caller + + +def initialize_and_register_routes(app): + ''' + This is a big 'ol function which might be broken into smaller ones at some + point. We: + + - Created debug routes to pass through AJAX requests to Google + - Created production APIs to have access to cleaned versions of said data + - Create local function calls to call from other pieces of code + within process + + We probably don't need all of this in production, but a lot of this is + very important for debugging. Having APIs is more useful than it looks, since + making use of Google APIs requires a lot of infrastructure (registering + apps, auth/auth, etc.) which we already have in place on dev / debug servers. + ''' + app.add_routes([ + aiohttp.web.get("/canvas", api_docs_handler) + ]) + + def make_ajax_raw_handler(remote_url): + async def ajax_passthrough(request): + runtime = learning_observer.runtime.Runtime(request) + response = await raw_canvas_ajax(runtime, remote_url, retry=True, **request.match_info) + return aiohttp.web.json_response(response) + return ajax_passthrough + + def make_cleaner_handler(raw_function, cleaner_function, name=None): + async def cleaner_handler(request): + response = cleaner_function(await raw_function(request, **request.match_info)) + if isinstance(response, dict) or isinstance(response, list): + return aiohttp.web.json_response(response) + elif isinstance(response, str): + return aiohttp.web.Response(text=response) + else: + raise AttributeError(f"Invalid response type: {type(response)}") + if name is not None: + setattr(cleaner_handler, "__qualname__", name + "_handler") + + return cleaner_handler + + def make_cleaner_function(raw_function, cleaner_function, name=None): + async def cleaner_local(request, **kwargs): + canvas_response = await raw_function(request, **kwargs) + clean = cleaner_function(canvas_response) + return clean + if name is not None: + setattr(cleaner_local, "__qualname__", name) + return cleaner_local + + for e in ENDPOINTS: + function_name = f"raw_{e.name}" + raw_function = raw_access_partial(remote_url=e.remote_url, name=e.name) + globals()[function_name] = raw_function + cleaners = e._cleaners() + for c in cleaners: + app.add_routes([ + aiohttp.web.get( + cleaners[c]['local_url'], + make_cleaner_handler( + raw_function, + cleaners[c]['function'], + name=cleaners[c]['name'] + ) + ) + ]) + globals()[cleaners[c]['name']] = make_cleaner_function( + raw_function, + cleaners[c]['function'], + name=cleaners[c]['name'] + ) + app.add_routes([ + aiohttp.web.get(e._local_url(), make_ajax_raw_handler(e.remote_url)) + ]) + +def api_docs_handler(request): + response = "URL Endpoints:\n\n" + for endpoint in ENDPOINTS: + response += f"{endpoint._local_url()}\n" + cleaners = endpoint._cleaners() + for c in cleaners: + response += f" {cleaners[c]['local_url']}\n" + response += "\n\n Globals:" + return aiohttp.web.Response(text=response) + +def register_cleaner(data_source, cleaner_name): + def decorator(f): + found = False + for endpoint in ENDPOINTS: + if endpoint.name == data_source: + found = True + endpoint._add_cleaner( + cleaner_name, + { + 'function': f, + 'local_url': f'{endpoint._local_url()}/{cleaner_name}', + 'name': cleaner_name + } + ) + if not found: + raise AttributeError(f"Data source {data_source} invalid; not found in endpoints.") + return f + return decorator + +@register_cleaner("course_roster", "roster") +def clean_course_roster(canvas_json): + students = canvas_json + students_updated = [] + #students.sort(key=lambda x: x.get('name', {}).get('fullName', 'ZZ')) + for student_json in students: + canvas_id = student_json['id'] + student = { + "course_id": "65166371789", + "user_id": canvas_id, + "profile": { + "id": canvas_id, + "name": { + "given_name": student_json['name'], + "family_name": student_json['name'], + "full_name": student_json['name'] + } + } + } + #local_id = learning_observer.auth.canvas_id_to_user_id(canvas_id) + #student_json['user_id'] = local_id + if 'external_ids' not in student: + student['external_ids'] = [] + student['external_ids'].append({"source": "canvas", "id": canvas_id}) + students_updated.append(student) + return students_updated + +@register_cleaner("course_list", "courses") +def clean_course_list(canvas_json): + courses = canvas_json + courses.sort(key=lambda x: x.get('name', 'ZZ')) + return courses + +if __name__ == '__main__': + output = clean_course_roster({}) + print(json.dumps(output, indent=2)) diff --git a/learning_observer/learning_observer/learning_tools/README.md b/learning_observer/learning_observer/learning_tools/README.md new file mode 100644 index 000000000..e4af4ca18 --- /dev/null +++ b/learning_observer/learning_observer/learning_tools/README.md @@ -0,0 +1,8 @@ +Get your Consumer Key & Consumer Secret by: +Going to https://(DISTRICT NAME HERE).schoology.com/api + + +Get your user ID by: +Going to your profile. +Look at the search bar. +https://(DISTRICT NAME HERE).schoology.com/user/(THE NUMBER HERE IS YOUR USER ID)/info \ No newline at end of file diff --git a/learning_observer/learning_observer/learning_tools/canvas copy.py b/learning_observer/learning_observer/learning_tools/canvas copy.py new file mode 100644 index 000000000..3fa359bc7 --- /dev/null +++ b/learning_observer/learning_observer/learning_tools/canvas copy.py @@ -0,0 +1,107 @@ +import json +import requests +import configparser + +# import aiohttp +# import aiohttp.web + + +class Canvas: + def __init__(self, config_path='./config.ini'): + self.config = configparser.ConfigParser() + self.config.read(config_path) + self.defaultServer = self.config['CANVAS_CONFIG']['DEFAULT_SERVER'] + self.access_token = self.config['CANVAS_CONFIG']['ACCESS_TOKEN'] + self.refresh_token = self.config['CANVAS_CONFIG']['REFRESH_TOKEN'] + self.client_id = self.config['CANVAS_CONFIG']['CLIENT_ID'] + self.client_secret = self.config['CANVAS_CONFIG']['CLIENT_SECRET'] + self.default_version = 'v1' + self.defaultPerPage = 10000 + self.base_url = f'https://{self.defaultServer}/api/{self.default_version}' + + def update_access_tokens(self, access_token): + self.config['CANVAS_CONFIG']['ACCESS_TOKEN'] = access_token + self.access_token = access_token + with open('./config.ini', 'w') as configfile: + self.config.write(configfile) + + def api_call(self, method, endpoint, params=None, data=None, absolute_url=False, retry=True): + if absolute_url: + url = endpoint + else: + url = self.base_url + endpoint + # Append params key/values if added + if params: + url += '?' + '&'.join(f"{k}={v}" for k, v in params.items()) + + headers = { + 'Authorization': f'Bearer {self.access_token}', + 'Content-Type': 'application/json' + } + + response_func = getattr(requests, method.lower()) + response = response_func(url, headers=headers, params=params, data=json.dumps(data)) + + # Handle invalid/expired token + if response.status_code == 401 and retry: + new_tokens = self.refresh_tokens() + if 'access_token' in new_tokens: + self.update_access_tokens(new_tokens['access_token']) + return self.api_call(method, endpoint, params, data, absolute_url, retry=False) + + if response.status_code != 200: + response.raise_for_status() + + return response.json() + + def refresh_tokens(self): + url = f'https://{self.defaultServer}/login/oauth2/token' + params = { + "grant_type": "refresh_token", + "client_id": self.client_id, + "client_secret": self.client_secret, + "refresh_token": self.refresh_token + } + return self.api_call('POST', url, params=params, absolute_url=True) + + def get_users(self): + return self.api_call('GET', '/accounts/self/users') + + def get_courses(self): + return self.api_call('GET', '/courses', params={'per_page': self.defaultPerPage}) + + def get_course_enrollment(self, course_id): + return self.api_call('GET', f'/courses/{course_id}/students', params={'per_page': self.defaultPerPage}) + + def get_course_teachers(self, course_id): + return self.api_call('GET', f'/courses/{course_id}/users', params={'per_page': self.defaultPerPage, 'enrollment_type[]': 'teacher'}) + + def get_course_assignments(self, course_id): + return self.api_call('GET', f'/courses/{course_id}/assignments', params={'per_page': self.defaultPerPage}) + +# def initialize_and_register_routes(app): +# app.add_routes([ +# aiohttp.web.get("/google", api_docs_handler) +# ]) + +def test(): + canvas = Canvas() + + users = canvas.get_users() + print(f'Users: {users} \n') + + courses = canvas.get_courses() + print(f'Courses: {courses} \n') + + for course in courses: + students = canvas.get_course_enrollment(course.get('id')) + print(f'Students taking {course.get("name")} course are: {students} \n') + + teachers = canvas.get_course_teachers(course.get('id')) + print(f'Teachers managing {course.get("name")} course are: {teachers} \n') + + assignments = canvas.get_course_assignments(course.get('id')) + print(f'Assignments under {course.get("name")} course are: {assignments} \n') + +if __name__ == '__main__': + test() diff --git a/learning_observer/learning_observer/learning_tools/canvas.py b/learning_observer/learning_observer/learning_tools/canvas.py new file mode 100644 index 000000000..f21f8f0f4 --- /dev/null +++ b/learning_observer/learning_observer/learning_tools/canvas.py @@ -0,0 +1,162 @@ +import json +import requests +import configparser + +class Canvas: + def __init__(self, config_path='./config.ini'): + self.config = configparser.ConfigParser() + self.config.read(config_path) + self.defaultServer = self.config['CANVAS_CONFIG']['DEFAULT_SERVER'] + self.access_token = self.config['CANVAS_CONFIG']['ACCESS_TOKEN'] + self.refresh_token = self.config['CANVAS_CONFIG']['REFRESH_TOKEN'] + self.client_id = self.config['CANVAS_CONFIG']['CLIENT_ID'] + self.client_secret = self.config['CANVAS_CONFIG']['CLIENT_SECRET'] + self.default_version = 'v1' + self.defaultPerPage = 10000 + self.base_url = f'https://{self.defaultServer}/api/{self.default_version}' + + def update_access_tokens(self, access_token): + self.config['CANVAS_CONFIG']['ACCESS_TOKEN'] = access_token + self.access_token = access_token + with open('./config.ini', 'w') as configfile: + self.config.write(configfile) + + def api_call(self, method, endpoint, params=None, data=None, absolute_url=False, retry=True): + if absolute_url: + url = endpoint + else: + url = self.base_url + endpoint + # Append params key/values if added + if params: + url += '?' + '&'.join(f"{k}={v}" for k, v in params.items()) + + headers = { + 'Authorization': f'Bearer {self.access_token}', + 'Content-Type': 'application/json' + } + + response_func = getattr(requests, method.lower()) + response = response_func(url, headers=headers, params=params, data=json.dumps(data)) + + # Handle invalid/expired token + if response.status_code == 401 and retry: + new_tokens = self.refresh_tokens() + if 'access_token' in new_tokens: + self.update_access_tokens(new_tokens['access_token']) + return self.api_call(method, endpoint, params, data, absolute_url, retry=False) + + if response.status_code != 200: + response.raise_for_status() + + return response.json() + + def refresh_tokens(self): + url = f'https://{self.defaultServer}/login/oauth2/token' + params = { + "grant_type": "refresh_token", + "client_id": self.client_id, + "client_secret": self.client_secret, + "refresh_token": self.refresh_token + } + return self.api_call('POST', url, params=params, absolute_url=True) + + def get_account(self): + return self.api_call('GET', '/accounts/self') + + def get_users(self): + return self.api_call('GET', '/accounts/self/users') + + def get_courses(self): + return self.api_call('GET', '/courses', params={'per_page': self.defaultPerPage}) + + def get_course_enrollment(self, course_id): + return self.api_call('GET', f'/courses/{course_id}/students', params={'per_page': self.defaultPerPage}) + + def get_course_teachers(self, course_id): + return self.api_call('GET', f'/courses/{course_id}/users', params={'per_page': self.defaultPerPage, 'enrollment_type[]': 'teacher'}) + + def get_course_assignments(self, course_id): + return self.api_call('GET', f'/courses/{course_id}/assignments', params={'per_page': self.defaultPerPage}) + + def create_user(self, account_id, user_data): + endpoint = f'/accounts/{account_id}/users' + data = { + 'user': { + 'name': user_data['name'], + 'sortable_name': user_data.get('sortable_name', user_data['name']), + 'terms_of_use': True + }, + 'pseudonym': { + 'unique_id': user_data['email'], + 'password': user_data['password'], + 'sis_user_id': user_data.get('sis_user_id'), + 'integration_id': user_data.get('integration_id') + }, + 'communication_channel': { + 'type': 'email', + 'address': user_data['email'], + 'skip_confirmation': True + } + } + return self.api_call('POST', endpoint, data=data) + + def enroll_user(self, course_id, user_id, enrollment_type='StudentEnrollment'): + endpoint = f'/courses/{course_id}/enrollments' + data = { + 'enrollment': { + 'user_id': user_id, + 'type': enrollment_type, + 'enrollment_state': 'active' + } + } + return self.api_call('POST', endpoint, data=data) + +def test(): + import pandas + canvas = Canvas() + + account = canvas.get_account() + print(f'Account: {account}, {type(account)} \n') + + # if account: + # roster_data = pandas.read_csv('./roster-file.csv') + + # for index, row in roster_data.iterrows(): + # user_data = { + # 'name': row['FirstName'] + " " + row['LastName'], + # 'email': row['EmailAddress'], + # 'password': "password123" + # } + # print(user_data) + + # print(f"Adding {index}th user...") + + # new_user = canvas.create_user(account_id=account['id'], user_data=user_data) + # print(f'New User: {new_user}') + + # user_id = new_user['id'] + # course_id = 1 + # response = canvas.enroll_user(course_id, user_id) + # print(f'Enrollment response: {response}') + + users = canvas.get_users() + print(f'Users: {users} \n') + + courses = canvas.get_courses() + print(f'Courses: {courses} \n') + + for course in courses: + students = canvas.get_course_enrollment(course.get('id')) + print(f'Students taking {course.get("name")} course are: {students} \n') + + teachers = canvas.get_course_teachers(course.get('id')) + print(f'Teachers managing {course.get("name")} course are: {teachers} \n') + + assignments = canvas.get_course_assignments(course.get('id')) + print(f'Assignments under {course.get("name")} course are: {assignments} \n') + + +if __name__ == '__main__': + import json + + test() \ No newline at end of file diff --git a/learning_observer/learning_observer/learning_tools/schoology.py b/learning_observer/learning_observer/learning_tools/schoology.py new file mode 100644 index 000000000..223b5d700 --- /dev/null +++ b/learning_observer/learning_observer/learning_tools/schoology.py @@ -0,0 +1,152 @@ +import json +import requests +import configparser +from requests_oauthlib import OAuth1 + +class Schoology: + def __init__(self, config_path='./config.ini'): + self.config = configparser.ConfigParser() + self.config.read(config_path) + API_KEY = self.config['SCHOOLOGY_CONFIG']['API_KEY'] + SECRET = self.config['SCHOOLOGY_CONFIG']['SECRET'] + self.auth = OAuth1(API_KEY, SECRET, "", "") + self.base_url = f'https://api.schoology.com/v1/' + + # REST Methods + def _get(self,uri): + return requests.get(uri, auth=self.auth) + + def _post(self,uri,data): + return requests.post(uri, json=data, auth=self.auth) + + def _put(self,uri,data): + return requests.post(uri, json=data, auth=self.auth) + + def _delete(self,uri): + return requests.delete(uri, auth=self.auth) + + def api_call(self, method, endpoint, params=None, data=None, absolute_url=False, retry=True): + if absolute_url: + url = endpoint + else: + url = self.base_url + endpoint + # Append params key/values if added + if params: + url += '?' + '&'.join(f"{k}={v}" for k, v in params.items()) + + response_func = getattr(requests, method.lower()) + response = response_func(url, auth=self.auth, params=params, data=json.dumps(data) if data else None) + + if response.status_code != 200: + response.raise_for_status() + + return response.json() + + + # Schoology API calls + + #can be used to retrieve user information if email of the student is known + def get_user_id(self,email,users=None): + if(not users): users = self.list_users() + for user in users: + if(user['primary_email'] == email): return user['id'] + return "No Schoology ID found for: {}".format(email) + + #returns user information if the user id is avaialable + def view_user(self,id): + uri = 'https://api.schoology.com/v1/users/{0}'.format(id) + return self._get(uri) + + #can be used to delete the user if the user id is known + def delete_user(self,id): + uri = 'https://api.schoology.com/v1/users/{0}?email_notification=0'.format(id) + return self._delete(uri) + + #used to retireve school ID if the email and user id or email is known + def get_school_uid(self,email,users=None): + if(not users): users = self.list_users() + for user in users: + if(user['primary_email'] == email): return user['school_uid'] + return "No Schoology ID found for: {}".format(email) + + def list_users(self,role_id=None): + uri = 'https://api.schoology.com/v1/users?limit=150' + if(role_id): uri += "&role_ids={}".format(role_id) + response = self._get(uri).json() + if('do not exist' not in response): + users = response['user'] + links = response['links'] + try: + while(links['next'] != ''): + uri = links['next'] + response = self._get(uri).json() + links = response['links'] + u = response['user'] + users += u # append paginated results to users json + except KeyError: pass # no next page will throw KeyError + return users + else: + return "Role {} does not exist.".format(role_id) + + #can be used to create a user + def create_user(self, user): + uri = 'https://api.schoology.com/v1/users' + return self._post(uri,user) + + #lists all the users in the system + def list_all_users(self): + return self.api_call('GET', 'users') + + #lists all the sections in the system + def list_all_sections(self): + return self.api_call('GET', 'sections') + + #In progress + def get_section(self,userid): + return self.api_call('GET', 'users/{0}/sections') + + def get_courses_for_students(self, userid): + uri = 'https://api.schoology.com/v1/[realm]/enrollments/'.format(userid) + + #returns all the courses in the system + def get_courses(self): + uri =' https://api.schoology.com/v1/courses' + return self._get(uri) + + def get_events(self): + self.get_sections() + today = date.today().isoformat() + events = self.make_api_request("events", {"start_date": today}).get("event", []) + temp = [] + + for event in events: + try: + temp.append( + { + "name": event["title"], + "description": event["description"], + "course_id": event["section_id"], + "assignment_id": event["assignment_id"], + "date": event["start"], + "all_day": event["all_day"], + } + ) + except KeyError: + continue + + self.events = temp + + +def test(): + schoology = Schoology() + + all_users = schoology.list_all_users() + print(f"All Users: {all_users}") + + all_sections = schoology.list_all_sections() + print(f"All Sections: {all_sections}") + + +if __name__ == '__main__': + test() + \ No newline at end of file diff --git a/learning_observer/learning_observer/learning_tools/schoology/auth.py b/learning_observer/learning_observer/learning_tools/schoology/auth.py new file mode 100644 index 000000000..b3b9fe2c6 --- /dev/null +++ b/learning_observer/learning_observer/learning_tools/schoology/auth.py @@ -0,0 +1,112 @@ +import requests +import configparser +import schoolopy + +try: + from urllib.parse import urlencode +except ImportError: + from urllib import urlencode + +class Auth: + def __init__(self, client_id, client_secret, domain='https://www.schoology.com'): + self.API_ROOT = 'https://api.schoology.com/v1' + self.DOMAIN_ROOT = domain + + self.client_id = client_id + self.client_secret = client_secret + + self.access_token = None + + def _get_authorization_header(self): + return f"Bearer {self.access_token}" + + def _request_header(self): + return { + 'Authorization': self._get_authorization_header(), + 'Accept': 'application/json', + 'Host': 'api.schoology.com', + 'Content-Type': 'application/json' + } + + def request_authorization(self, redirect_uri=None, scope=None): + if redirect_uri is None: + redirect_uri = 'https://gayaan.csc.ncsu.edu/callback' + + if scope is None: + scope = 'user:read' # Add your desired scopes here + + params = { + 'response_type': 'code', + 'client_id': self.client_id, + 'redirect_uri': redirect_uri, + 'scope': scope + } + + authorization_url = f"{self.DOMAIN_ROOT}/oauth/authorize?{urlencode(params)}" + return authorization_url + + def exchange_authorization_code(self, authorization_code, redirect_uri=None): + if redirect_uri is None: + redirect_uri = 'https://gayaan.csc.ncsu.edu/callback' + + data = { + 'grant_type': 'authorization_code', + 'code': authorization_code, + 'client_id': self.client_id, + 'client_secret': self.client_secret, + 'redirect_uri': redirect_uri + } + + response = requests.post(f"{self.API_ROOT}/oauth2/token", data=data) + if response.status_code == 200: + token_info = response.json() + self.access_token = token_info['access_token'] + return token_info + else: + raise Exception("Failed to exchange authorization code for access token.") + + # Additional methods for making API requests can be implemented here + +def test(): + # Get client ID and secret from config file + config = configparser.ConfigParser() + config.read('config.ini') + CLIENT_ID = config['SCHOOLOGY_CLIENT']['API_KEY'] + CLIENT_SECRET = config['SCHOOLOGY_CLIENT']['SECRET'] + + auth = Auth(CLIENT_ID, CLIENT_SECRET) + + sc = schoolopy.Schoology(schoolopy.Auth(CLIENT_ID, CLIENT_SECRET)) + sc.get_feed() + + # Request authorization URL + authorization_url = auth.request_authorization(redirect_uri='https://gayaan.csc.ncsu.edu') + print("Authorization URL:", authorization_url) + + # Prompt the user to visit the authorization URL and grant access + # After granting access, the user will be redirected to the callback URL + # Make sure to handle the callback URL in your application to retrieve the authorization code + + # Simulate the retrieval of the authorization code from the callback URL + authorization_code = input("Enter the authorization code from the callback URL: ") + + # # Exchange the authorization code for an access token + # token_info = auth.exchange_authorization_code(authorization_code, redirect_uri='https://gayaan.csc.ncsu.edu') + # print("Access Token:", token_info['access_token']) + + # # Use the access token to make API requests + # # Example API request + # url = auth.API_ROOT + '/users/me' + # headers = auth._request_header() + # response = requests.get(url, headers=headers) + # if response.status_code == 200: + # print("API request successful.") + # data = response.json() + # # Process the API response data + # else: + # print("API request failed with status code:", response.status_code) + + +if __name__ == '__main__': + test() + \ No newline at end of file diff --git a/learning_observer/learning_observer/rosters.py b/learning_observer/learning_observer/rosters.py index 2505f8728..b4251c823 100644 --- a/learning_observer/learning_observer/rosters.py +++ b/learning_observer/learning_observer/rosters.py @@ -70,6 +70,8 @@ import learning_observer.auth as auth import learning_observer.google +import learning_observer.canvas +import learning_observer.schoology import learning_observer.kvs import learning_observer.log_event as log_event from learning_observer.log_event import debug_log @@ -346,6 +348,8 @@ def init(): ajax = google_ajax elif settings.settings['roster_data']['source'] in ["all"]: ajax = all_ajax + elif settings.settings['roster_data']['source'] in ["canvas", "schoology"]: + ajax = google_ajax else: raise learning_observer.prestartup.StartupCheck( "Settings file `roster_data` element should have `source` field\n" @@ -396,6 +400,12 @@ async def courselist(request): if settings.settings['roster_data']['source'] in ["google_api"]: runtime = learning_observer.runtime.Runtime(request) return await learning_observer.google.courses(runtime) + elif settings.settings['roster_data']['source'] in ["canvas"]: + runtime = learning_observer.runtime.Runtime(request) + return await learning_observer.canvas.courses(runtime) + elif settings.settings['roster_data']['source'] in ["schoology"]: + runtime = learning_observer.runtime.Runtime(request) + return await learning_observer.schoology.courses(runtime) # Legacy code course_list = await ajax( @@ -423,6 +433,12 @@ async def courseroster(request, course_id): if settings.settings['roster_data']['source'] in ["google_api"]: runtime = learning_observer.runtime.Runtime(request) return await learning_observer.google.roster(runtime, courseId=course_id) + elif settings.settings['roster_data']['source'] in ["canvas"]: + runtime = learning_observer.runtime.Runtime(request) + return await learning_observer.canvas.roster(runtime, courseId=course_id) + elif settings.settings['roster_data']['source'] in ["schoology"]: + runtime = learning_observer.runtime.Runtime(request) + return await learning_observer.schoology.roster(runtime, courseId=course_id) roster = await ajax( request, diff --git a/learning_observer/learning_observer/routes.py b/learning_observer/learning_observer/routes.py index b94d91713..71ab97b4b 100644 --- a/learning_observer/learning_observer/routes.py +++ b/learning_observer/learning_observer/routes.py @@ -17,6 +17,8 @@ import learning_observer.admin as admin import learning_observer.auth import learning_observer.auth.http_basic +import learning_observer.canvas +import learning_observer.schoology import learning_observer.client_config import learning_observer.incoming_student_event as incoming_student_event import learning_observer.dashboard @@ -65,6 +67,8 @@ def tracemalloc_handler(request): register_incoming_event_views(app) register_debug_routes(app) learning_observer.google.initialize_and_register_routes(app) + learning_observer.canvas.initialize_and_register_routes(app) + learning_observer.schoology.initialize_and_register_routes(app) app.add_routes([ aiohttp.web.get( diff --git a/learning_observer/learning_observer/schoology.py b/learning_observer/learning_observer/schoology.py new file mode 100644 index 000000000..f3e9670db --- /dev/null +++ b/learning_observer/learning_observer/schoology.py @@ -0,0 +1,266 @@ +import os +import json +import recordclass +import string +from requests_oauthlib import OAuth1 +import configparser +import aiohttp +import aiohttp.web +from requests import Request + +import learning_observer.settings as settings +import learning_observer.log_event +import learning_observer.util +import learning_observer.auth +import learning_observer.runtime + +cache = None + +class Endpoint(recordclass.make_dataclass("Endpoint", ["name", "remote_url", "doc", "cleaners"], defaults=["", None])): + def arguments(self): + return extract_parameters_from_format_string(self.remote_url) + + def _local_url(self): + parameters = "}/{".join(self.arguments()) + base_url = f"/schoology/{self.name}" + if len(parameters) == 0: + return base_url + else: + return base_url + "/{" + parameters + "}" + + def _add_cleaner(self, name, cleaner): + if self.cleaners is None: + self.cleaners = dict() + self.cleaners[name] = cleaner + if 'local_url' not in cleaner: + cleaner['local_url'] = self._local_url() + "/" + name + + def _cleaners(self): + if self.cleaners is None: + return [] + else: + return self.cleaners + +ENDPOINTS = list(map(lambda x: Endpoint(*x), [ + ("course_list", "/sections"), + ("course_roster", "/sections/{sectionId}/enrollments"), + ("course_work", "/sections/{sectionId}/assignments"), + ("coursework_submissions", "/sections/{courseId}/assignments/{assignmentId}/submissions/{gradeItemId}"), +])) + +def extract_parameters_from_format_string(format_string): + ''' + Extracts parameters from a format string. E.g. + + >>> extract_parameters_from_format_string("hello {hi} my {bye}") + ['hi', 'bye'] + ''' + return [f[1] for f in string.Formatter().parse(format_string) if f[1] is not None] + +class Schoology: + def __init__(self, config_path='./config.ini'): + # Get the absolute path to the configuration file + script_dir = os.path.dirname(os.path.abspath(__file__)) + config_path = os.path.join(script_dir, config_path) + + self.config = configparser.ConfigParser() + self.config.read(config_path) + + # Check if 'SCHOOLOGY_CONFIG' section is present + if 'SCHOOLOGY_CONFIG' not in self.config: + raise KeyError("The configuration file does not contain 'SCHOOLOGY_CONFIG' section") + + try: + self.api_key = self.config['SCHOOLOGY_CONFIG']['API_KEY'] + self.secret = self.config['SCHOOLOGY_CONFIG']['SECRET'] + except KeyError as e: + raise KeyError(f"Missing required configuration key: {e}") + + self.auth = OAuth1(self.api_key, self.secret, "", "") + self.base_url = f'https://api.schoology.com/v1/' + + async def api_call(self, method, endpoint, params=None, data=None, absolute_url=False, retry=True): + if absolute_url: + url = endpoint + else: + url = self.base_url + endpoint + if params: + url += '?' + '&'.join(f"{k}={v}" for k, v in params.items()) + + headers = { + 'Content-Type': 'application/json', + 'Accept': 'application/json' + } + + signed_url, signed_headers, signed_data = self._sign_request(url, headers, data, method) + + # Convert headers to str + signed_headers = {str(k): str(v) for k, v in signed_headers.items()} + + async with aiohttp.ClientSession() as client: + response_func = getattr(client, method.lower()) + #url, headers, body = self.auth.sign(url, headers=headers, data=data, http_method=method.upper()) + + #async with response_func(url, headers=headers, params=params, json=data) as response: + async with response_func(signed_url, headers=signed_headers, data=signed_data) as response: + #async with response_func(url, auth=self.auth, params=params, json=data) as response: + if response.status != 200: + response.raise_for_status() + + return await response.json() + + def _sign_request(self, url, headers, data, method): + req = Request(method, url, headers=headers, json=data) + prepared = req.prepare() + + # Sign the request + oauth = OAuth1( + self.api_key, + client_secret=self.secret, + resource_owner_key=None, + resource_owner_secret=None, + signature_type='auth_header' + ) + + prepared = oauth(prepared) + + return prepared.url, prepared.headers, prepared.body + +async def raw_schoology_ajax(runtime, target_url, retry=False, **kwargs): + ''' + Make an AJAX call to Schoology, managing auth + auth. + + * runtime is a Runtime class containing request information. + * target_url is typically grabbed from ENDPOINTS + * ... and we pass the named parameters + ''' + schoology = Schoology() + + params = {k: v for k, v in kwargs.items() if v is not None} + try: + response = await schoology.api_call('GET', target_url, params=params) + except aiohttp.ClientResponseError as e: + raise + + return response + +def raw_access_partial(remote_url, name=None): + ''' + This is a helper which allows us to create a function which calls specific + Schoology APIs. + ''' + async def caller(request, **kwargs): + ''' + Make an AJAX request to Schoology + ''' + return await raw_schoology_ajax(request, remote_url, **kwargs) + setattr(caller, "__qualname__", name) + + return caller + +def initialize_and_register_routes(app): + ''' + Initialize and register routes for the application. + ''' + app.add_routes([ + aiohttp.web.get("/schoology", api_docs_handler) + ]) + + def make_ajax_raw_handler(remote_url): + async def ajax_passthrough(request): + runtime = learning_observer.runtime.Runtime(request) + response = await raw_schoology_ajax(runtime, remote_url, retry=True, **request.match_info) + return aiohttp.web.json_response(response) + return ajax_passthrough + + def make_cleaner_handler(raw_function, cleaner_function, name=None): + async def cleaner_handler(request): + response = cleaner_function(await raw_function(request, **request.match_info)) + if isinstance(response, dict) or isinstance(response, list): + return aiohttp.web.json_response(response) + elif isinstance(response, str): + return aiohttp.web.Response(text=response) + else: + raise AttributeError(f"Invalid response type: {type(response)}") + if name is not None: + setattr(cleaner_handler, "__qualname__", name + "_handler") + + return cleaner_handler + + def make_cleaner_function(raw_function, cleaner_function, name=None): + async def cleaner_local(request, **kwargs): + schoology_response = await raw_function(request, **kwargs) + clean = cleaner_function(schoology_response) + return clean + if name is not None: + setattr(cleaner_local, "__qualname__", name) + return cleaner_local + + for e in ENDPOINTS: + function_name = f"raw_{e.name}" + raw_function = raw_access_partial(remote_url=e.remote_url, name=e.name) + globals()[function_name] = raw_function + cleaners = e._cleaners() + for c in cleaners: + app.add_routes([ + aiohttp.web.get( + cleaners[c]['local_url'], + make_cleaner_handler(raw_function, cleaners[c]['function'], name=cleaners[c]['name']) + ) + ]) + globals()[cleaners[c]['name']] = make_cleaner_function(raw_function, cleaners[c]['function'], name=cleaners[c]['name']) + app.add_routes([ + aiohttp.web.get(e._local_url(), make_ajax_raw_handler(e.remote_url)) + ]) + +def api_docs_handler(request): + response = "URL Endpoints:\n\n" + for endpoint in ENDPOINTS: + response += f"{endpoint._local_url()}\n" + cleaners = endpoint._cleaners() + for c in cleaners: + response += f" {cleaners[c]['local_url']}\n" + response += "\n\n Globals:" + return aiohttp.web.Response(text=response) + +def register_cleaner(data_source, cleaner_name): + def decorator(f): + found = False + for endpoint in ENDPOINTS: + if endpoint.name == data_source: + found = True + endpoint._add_cleaner( + cleaner_name, + { + 'function': f, + 'local_url': f'{endpoint._local_url()}/{cleaner_name}', + 'name': cleaner_name + } + ) + if not found: + raise AttributeError(f"Data source {data_source} invalid; not found in endpoints.") + return f + return decorator + +@register_cleaner("course_roster", "roster") +def clean_course_roster(schoology_json): + students = schoology_json.get('enrollments', []) + students.sort(key=lambda x: x.get('name', 'ZZ')) + for student_json in students: + schoology_id = student_json['id'] + local_id = learning_observer.auth.schoology_id_to_user_id(schoology_id) + student_json['user_id'] = local_id + if 'external_ids' not in student_json: + student_json['external_ids'] = [] + student_json['external_ids'].append({"source": "schoology", "id": schoology_id}) + return students + +@register_cleaner("course_list", "courses") +def clean_course_list(schoology_json): + courses = schoology_json.get('courses', []) + courses.sort(key=lambda x: x.get('title', 'ZZ')) + return courses + +if __name__ == '__main__': + output = clean_course_roster({}) + print(json.dumps(output, indent=2))