From 3fcf944e91bf507fe3a745f3a7496dbffdebbf81 Mon Sep 17 00:00:00 2001 From: Ash Date: Wed, 25 Sep 2024 20:06:11 -0400 Subject: [PATCH] s/parms/params --- src/spaceandtime/sxtbaseapi.py | 88 +++++++++++++++++----------------- tests/test_sxtbaseapi.py | 8 ++-- 2 files changed, 48 insertions(+), 48 deletions(-) diff --git a/src/spaceandtime/sxtbaseapi.py b/src/spaceandtime/sxtbaseapi.py index 0e688fd..3010b86 100644 --- a/src/spaceandtime/sxtbaseapi.py +++ b/src/spaceandtime/sxtbaseapi.py @@ -118,10 +118,10 @@ def prep_sql(self, sql_text:str) -> str: def call_api(self, endpoint: str, auth_header:bool = True, request_type:str = SXTApiCallTypes.POST, - header_parms: dict = {}, - data_parms: dict = {}, - query_parms: dict = {}, - path_parms: dict = {} ): + header_params: dict = {}, + data_params: dict = {}, + query_params: dict = {}, + path_params: dict = {} ): """-------------------- Generic function to call and return SxT API. @@ -133,10 +133,10 @@ def call_api(self, endpoint: str, endpoint (str): URL endpoint, after the version. Final structure is: [api_url/version/endpoint] request_type (SXTApiCallTypes): Type of request. [POST, GET, PUT, DELETE] auth_header (bool): flag indicator whether to append the Bearer token to the header. - header_parms: (dict): Name/Value pair to add to request header, except for bearer token. {Name: Value} - query_parms: (dict): Name/value pairs to be added to the query string. {Name: Value} - data_parms (dict): Dictionary to be used holistically for --data json object. - path_parms (dict): Pattern to replace placeholders in URL. {Placeholder_in_URL: Replace_Value} + header_params: (dict): Name/Value pair to add to request header, except for bearer token. {Name: Value} + query_params: (dict): Name/value pairs to be added to the query string. {Name: Value} + data_params (dict): Dictionary to be used holistically for --data json object. + path_params (dict): Pattern to replace placeholders in URL. {Placeholder_in_URL: Replace_Value} Results: bool: Indicating request success @@ -170,18 +170,18 @@ def __handle_errors__(txt, ex, statuscode, responseobject, loggerobject): msg = f'request_type must be of type SXTApiCallTypes, not { type(request_type) }' raise SxTArgumentError(msg, logger=self.logger) - # Path parms - for name, value in path_parms.items(): + # Path params + for name, value in path_params.items(): endpoint = endpoint.replace(f'{{{name}}}', value) - # Query parms - if query_parms !={}: - endpoint = f'{endpoint}?' + '&'.join([f'{n}={v}' for n,v in query_parms.items()]) + # Query params + if query_params !={}: + endpoint = f'{endpoint}?' + '&'.join([f'{n}={v}' for n,v in query_params.items()]) - # Header parms + # Header params headers = {k:v for k,v in self.standard_headers.items()} # get new object if auth_header: headers['authorization'] = f'Bearer {self.access_token}' - headers.update(header_parms) + headers.update(header_params) # final URL url = f'{self.api_url}/{version}/{endpoint}' @@ -194,7 +194,7 @@ def __handle_errors__(txt, ex, statuscode, responseobject, loggerobject): case _: raise SxTArgumentError('Call type must be SXTApiCallTypes enum.', logger=self.logger) # Call API function as defined above - response = callfunc(url=url, data=json.dumps(data_parms), headers=headers) + response = callfunc(url=url, data=json.dumps(data_params), headers=headers) txt = response.text statuscode = response.status_code response.raise_for_status() @@ -260,10 +260,10 @@ def auth_code(self, user_id:str, prefix:str = None, joincode:str = None): bool: Success flag (True/False) indicating the api call worked as expected. object: Response information from the Space and Time network, as list or dict(json). """ - dataparms = {"userId": user_id} - if prefix: dataparms["prefix"] = prefix - if joincode: dataparms[joincode] = joincode - success, rtn = self.call_api(endpoint = 'auth/code', auth_header = False, data_parms = dataparms) + dataparams = {"userId": user_id} + if prefix: dataparams["prefix"] = prefix + if joincode: dataparams[joincode] = joincode + success, rtn = self.call_api(endpoint = 'auth/code', auth_header = False, data_params = dataparams) return success, rtn if success else [rtn] @@ -295,12 +295,12 @@ def auth_token(self, user_id:str, challange_token:str, signed_challange_token:st except Exception as ex: return False, {'error':'keymanager object must be of type SXTKeyManager, if supplied.'} - dataparms = { "userId": user_id + dataparams = { "userId": user_id ,"signature": signed_challange_token ,"authCode": challange_token ,"key": public_key ,"scheme": scheme} - success, rtn = self.call_api(endpoint='auth/token', auth_header=False, data_parms=dataparms) + success, rtn = self.call_api(endpoint='auth/token', auth_header=False, data_params=dataparams) return success, rtn if success else [rtn] @@ -313,7 +313,7 @@ def token_refresh(self, refresh_token:str): object: Response information from the Space and Time network, as list or dict(json). """ headers = { 'authorization': f'Bearer {refresh_token}' } - success, rtn = self.call_api('auth/refresh', False, header_parms=headers) + success, rtn = self.call_api('auth/refresh', False, header_params=headers) return success, rtn if success else [rtn] @@ -373,11 +373,11 @@ def auth_addkey(self, user_id:str, public_key:str, challange_token:str, signed_c bool: Success flag (True/False) indicating the api call worked as expected. object: Response information from the Space and Time network, as list or dict(json). """ - dataparms = { "authCode": challange_token + dataparams = { "authCode": challange_token ,"signature": signed_challange_token ,"key": public_key ,"scheme": scheme } - success, rtn = self.call_api('auth/keys', True, SXTApiCallTypes.POST, data_parms=dataparms) + success, rtn = self.call_api('auth/keys', True, SXTApiCallTypes.POST, data_params=dataparams) return success, rtn if success else [rtn] @@ -430,10 +430,10 @@ def sql_exec(self, sql_text:str, biscuits:list = None, app_name:str = None, vali sql_text = self.prep_sql(sql_text=sql_text) biscuit_tokens = self.prep_biscuits(biscuits) if type(biscuit_tokens) != list: raise SxTArgumentError("sql_all requires parameter 'biscuits' to be a list of biscuit_tokens or SXTBiscuit objects.", logger = self.logger) - dataparms = {"sqlText": sql_text + dataparams = {"sqlText": sql_text ,"biscuits": biscuit_tokens ,"validate": str(validate).lower() } - success, rtn = self.call_api('sql', True, header_parms=headers, data_parms=dataparms) + success, rtn = self.call_api('sql', True, header_params=headers, data_params=dataparams) return success, rtn if success else [rtn] @@ -458,10 +458,10 @@ def sql_ddl(self, sql_text:str, biscuits:list = None, app_name:str = None): sql_text = self.prep_sql(sql_text=sql_text) biscuit_tokens = self.prep_biscuits(biscuits) if biscuit_tokens==[]: raise SxTArgumentError("sql_ddl requires 'biscuits', none were provided.", logger = self.logger) - dataparms = {"sqlText": sql_text + dataparams = {"sqlText": sql_text ,"biscuits": biscuit_tokens } # ,"resources": [r for r in resources] } - success, rtn = self.call_api('sql/ddl', True, header_parms=headers, data_parms=dataparms) + success, rtn = self.call_api('sql/ddl', True, header_params=headers, data_params=dataparams) return success, rtn if success else [rtn] @@ -489,10 +489,10 @@ def sql_dml(self, sql_text:str, resources:list, biscuits:list = None, app_name:s biscuit_tokens = self.prep_biscuits(biscuits) if type(biscuit_tokens) != list: raise SxTArgumentError("sql_all requires parameter 'biscuits' to be a list of biscuit_tokens or SXTBiscuit objects.", logger = self.logger) headers = { 'originApp': app_name } if app_name else {} - dataparms = {"sqlText": sql_text + dataparams = {"sqlText": sql_text ,"biscuits": biscuit_tokens ,"resources": [r for r in resources] } - success, rtn = self.call_api('sql/dml', True, header_parms=headers, data_parms=dataparms) + success, rtn = self.call_api('sql/dml', True, header_params=headers, data_params=dataparams) return success, rtn if success else [rtn] @@ -519,10 +519,10 @@ def sql_dql(self, sql_text:str, resources:list, biscuits:list = None, app_name:s sql_text = self.prep_sql(sql_text=sql_text) biscuit_tokens = self.prep_biscuits(biscuits) if type(biscuit_tokens) != list: raise SxTArgumentError("sql_all requires parameter 'biscuits' to be a list of biscuit_tokens or SXTBiscuit objects.", logger = self.logger) - dataparms = {"sqlText": sql_text + dataparams = {"sqlText": sql_text ,"biscuits": biscuit_tokens ,"resources": [r for r in resources] } - success, rtn = self.call_api('sql/dql', True, header_parms=headers, data_parms=dataparms) + success, rtn = self.call_api('sql/dql', True, header_params=headers, data_params=dataparams) return success, rtn if success else [rtn] @@ -539,7 +539,7 @@ def discovery_get_schemas(self, scope:str = 'ALL'): bool: Success flag (True/False) indicating the api call worked as expected. object: Response information from the Space and Time network, as list of dict. """ - success, rtn = self.call_api('discover/schema',True, SXTApiCallTypes.GET, query_parms={'scope':scope}) + success, rtn = self.call_api('discover/schema',True, SXTApiCallTypes.GET, query_params={'scope':scope}) return success, (rtn if success else [rtn]) @@ -560,9 +560,9 @@ def discovery_get_tables(self, schema:str = 'ETHEREUM', scope:str = 'ALL', searc """ version = 'v2' if 'discover/table' not in list(self.versions.keys()) else self.versions['discover/table'] schema_or_namespace = 'namespace' if version=='v1' else 'schema' - query_parms = {'scope':scope.upper(), schema_or_namespace:schema.upper()} - if version != 'v1' and search_pattern: query_parms['searchPattern'] = search_pattern - success, rtn = self.call_api('discover/table',True, SXTApiCallTypes.GET, query_parms=query_parms) + query_params = {'scope':scope.upper(), schema_or_namespace:schema.upper()} + if version != 'v1' and search_pattern: query_params['searchPattern'] = search_pattern + success, rtn = self.call_api('discover/table',True, SXTApiCallTypes.GET, query_params=query_params) return success, (rtn if success else [rtn]) @@ -582,9 +582,9 @@ def discovery_get_views(self, schema:str = 'ETHEREUM', scope:str = 'ALL', search object: Response information from the Space and Time network, as list of dict. """ version = 'v2' if 'discover/view' not in list(self.versions.keys()) else self.versions['discover/view'] - query_parms = {'scope':scope.upper(), 'schema':schema.upper()} - if version != 'v1' and search_pattern: query_parms['searchPattern'] = search_pattern - success, rtn = self.call_api('discover/view',True, SXTApiCallTypes.GET, query_parms=query_parms) + query_params = {'scope':scope.upper(), 'schema':schema.upper()} + if version != 'v1' and search_pattern: query_params['searchPattern'] = search_pattern + success, rtn = self.call_api('discover/view',True, SXTApiCallTypes.GET, query_params=query_params) return success, (rtn if success else [rtn]) @@ -604,8 +604,8 @@ def discovery_get_columns(self, schema:str, table:str): """ version = 'v2' if 'discover/table/column' not in list(self.versions.keys()) else self.versions['discover/table/column'] schema_or_namespace = 'namespace' if version=='v1' else 'schema' - query_parms = {schema_or_namespace:schema.upper(), 'table':table} - success, rtn = self.call_api('discover/table/column',True, SXTApiCallTypes.GET, query_parms=query_parms) + query_params = {schema_or_namespace:schema.upper(), 'table':table} + success, rtn = self.call_api('discover/table/column',True, SXTApiCallTypes.GET, query_params=query_params) return success, (rtn if success else [rtn]) @@ -671,7 +671,7 @@ def subscription_invite_user(self, role:str = 'member'): return False, {'error':'Invites must be either member, admin, or owner. Permissions cannot exceed the invitor.'} version = 'v2' if endpoint not in list(self.versions.keys()) else self.versions[endpoint] success, rtn = self.call_api(endpoint=endpoint, auth_header=True, request_type=SXTApiCallTypes.POST, - query_parms={'role':role} ) + query_params={'role':role} ) return success, (rtn if success else [rtn]) @@ -692,7 +692,7 @@ def subscription_join(self, joincode:str): endpoint = 'subscription/invite/{joinCode}' version = 'v2' if endpoint not in list(self.versions.keys()) else self.versions[endpoint] success, rtn = self.call_api(endpoint=endpoint, auth_header=True, request_type=SXTApiCallTypes.POST, - path_parms= {'{joinCode}': joincode} ) + path_params= {'{joinCode}': joincode} ) return success, (rtn if success else [rtn]) diff --git a/tests/test_sxtbaseapi.py b/tests/test_sxtbaseapi.py index a0485ba..fd8a6de 100644 --- a/tests/test_sxtbaseapi.py +++ b/tests/test_sxtbaseapi.py @@ -84,18 +84,18 @@ def test_call_api(): sxtb = SXTBaseAPI(access_token) success, user_exists = sxtb.call_api('auth/idexists/{id}', auth_header=False, request_type=sxtb.APICALLTYPE.GET, - path_parms={'id':userid}) + path_params={'id':userid}) assert success assert user_exists success, user_exists = sxtb.call_api('auth/idexists/{id}', auth_header=False, request_type=sxtb.APICALLTYPE.GET, - path_parms={'id':'this_user_should_not_exist_please_dont_create'}) + path_params={'id':'this_user_should_not_exist_please_dont_create'}) assert success assert not user_exists success, data = sxtb.call_api('sql', auth_header=True, request_type=sxtb.APICALLTYPE.POST, - data_parms={'sqlText':'select * from sxtlabs.singularity'} ) + data_params={'sqlText':'select * from sxtlabs.singularity'} ) assert success assert data[0]['NAME'] == 'Singularity' # hopefully this doesn't change... @@ -107,7 +107,7 @@ def test_call_api(): # good API, not validated success, data = sxtb.call_api('sql', auth_header=False, request_type=sxtb.APICALLTYPE.POST, - data_parms={'sqlText':'select * from sxtlabs.singularity'} ) + data_params={'sqlText':'select * from sxtlabs.singularity'} ) assert not success assert data['status_code'] == 401 assert 'Unauthorized' in data['error']