From 5ef4f4387fff8969bb4a322c58550322295c6a60 Mon Sep 17 00:00:00 2001 From: Palash Tyagi Date: Fri, 17 Feb 2023 22:36:22 +0000 Subject: [PATCH 1/6] moved python files to a separate folder --- LICENSE => python/LICENSE | 0 README.md => python/README.md | 0 dataquery_api.py => python/dataquery_api.py | 0 example.py => python/example.py | 0 requirements.txt => python/requirements.txt | 0 5 files changed, 0 insertions(+), 0 deletions(-) rename LICENSE => python/LICENSE (100%) rename README.md => python/README.md (100%) rename dataquery_api.py => python/dataquery_api.py (100%) rename example.py => python/example.py (100%) rename requirements.txt => python/requirements.txt (100%) diff --git a/LICENSE b/python/LICENSE similarity index 100% rename from LICENSE rename to python/LICENSE diff --git a/README.md b/python/README.md similarity index 100% rename from README.md rename to python/README.md diff --git a/dataquery_api.py b/python/dataquery_api.py similarity index 100% rename from dataquery_api.py rename to python/dataquery_api.py diff --git a/example.py b/python/example.py similarity index 100% rename from example.py rename to python/example.py diff --git a/requirements.txt b/python/requirements.txt similarity index 100% rename from requirements.txt rename to python/requirements.txt From c75021f06c934421068933273785396cf442ce91 Mon Sep 17 00:00:00 2001 From: Palash Tyagi Date: Fri, 17 Feb 2023 22:44:35 +0000 Subject: [PATCH 2/6] added basic readme --- README.md | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) create mode 100644 README.md diff --git a/README.md b/README.md new file mode 100644 index 0000000..304f908 --- /dev/null +++ b/README.md @@ -0,0 +1,21 @@ +# DataQuery API Client + +This repository contains example programs in various languages that show how to make calls to the JPMorgan DataQuery API. + +## Running the examples + +Each subdirectory contains a README.md file with instructions specific to each language on how to run the example. + +## API Documentation + +The API documentation is available at [JPMorgan Developer/DataQuery API](https://developer.jpmorgan.com/products/dataquery_api) + +## Issues + +These examples are provided as-is and are not meant to handle all possible error conditions. The [Macrosynergy Package](https://github.com/macrosynergy/macrosynergy) contains a more robust implementation. + +If you find any issues with the examples, please report them in the [Issues](https://github.com/macrosynergy/dataquery-api/issues) section of this repository. + +## License + +The code in this repository is licensed under the Apache License, Version 2.0. See [LICENSE](LICENSE) for the full license text. \ No newline at end of file From b032e3ec2dbd661d50eb5772136ce12fe2d1650a Mon Sep 17 00:00:00 2001 From: Palash Tyagi Date: Sat, 18 Feb 2023 10:47:07 +0000 Subject: [PATCH 3/6] init attempt --- python/dataquery_api.py | 206 +++++----------------------------------- 1 file changed, 24 insertions(+), 182 deletions(-) diff --git a/python/dataquery_api.py b/python/dataquery_api.py index 1d5b143..a7cbbf1 100644 --- a/python/dataquery_api.py +++ b/python/dataquery_api.py @@ -26,30 +26,11 @@ OAUTH_DQ_RESOURCE_ID: str = "JPMC:URI:RS-06785-DataQueryExternalApi-PROD" API_DELAY_PARAM: float = 0.3 # 300ms delay between requests. EXPR_LIMIT: int = 20 # Maximum number of expressions per request (not per "download"). - - def request_wrapper( - url: str, - headers: Optional[Dict] = None, - params: Optional[Dict] = None, - method: str = "get", - **kwargs, -) -> requests.Response: - """ - Wrapper function for requests.request() used to make a request - to the JPMorgan DataQuery API. - Parameters - :param url : URL to make request to - :param params : Parameters to pass to request - Returns - :return : Response object - """ - # this function wraps the requests.request() method in a try/except block + url: str,headers: Optional[Dict] = None,params: Optional[Dict] = None, + method: str = "get",**kwargs,) -> requests.Response: try: - response: requests.Response = requests.request( - method=method, url=url, params=params, headers=headers, **kwargs - ) - # Check response + response: requests.Response = requests.request(method=method, url=url, params=params, headers=headers, **kwargs) if response.status_code == 200: return response else: @@ -58,112 +39,46 @@ def request_wrapper( if isinstance(e, requests.exceptions.ProxyError): raise Exception("Proxy error. Check your proxy settings. Exception : ", e) elif isinstance(e, requests.exceptions.ConnectionError): - raise Exception( - "Connection error. Check your internet connection. Exception : ", e - ) + raise Exception("Connection error. Check your internet connection. Exception : ", e) else: raise e - - class DQInterface: def __init__( - self, - client_id: str, - client_secret: str, - proxy: Optional[Dict] = None, - dq_resource_id: Optional[str] = OAUTH_DQ_RESOURCE_ID, - ): + self,client_id: str,client_secret: str,proxy: Optional[Dict] = None, + dq_resource_id: Optional[str] = OAUTH_DQ_RESOURCE_ID,): self.client_id: str = client_id self.client_secret: str = client_secret self.proxy: str = proxy self.dq_resource_id: str = dq_resource_id self.current_token: Optional[Dict] = None - self.token_data: Dict = { - "grant_type": "client_credentials", - "client_id": self.client_id, - "client_secret": self.client_secret, - "aud": self.dq_resource_id, - } + self.token_data: Dict = {"grant_type": "client_credentials","client_id": self.client_id, + "client_secret": self.client_secret,"aud": self.dq_resource_id,} def get_access_token(self) -> str: - """ - Helper function to verify if the current token is active and valid, - and request a new one if it is not. - Returns - :return : Access token - """ - def is_active(token: Optional[dict] = None) -> bool: - """ - Helper function to check if a token is active. - Parameters - :param token : Token to check. Can be None, which will return False. - Returns - :return : True if token is active, False otherwise - """ - # return (token is None) or (datetime.now() - \ - # token["created_at"]).total_seconds() / 60 >= (token["expires_in"] - 1) if token is None: return False else: created: datetime = token["created_at"] expires: int = token["expires_in"] - return ((datetime.now() - created).total_seconds() / 60) >= ( - expires - 1 - ) - - # if the token is active (and valid), return it; else, make a request for a new token + return ((datetime.now() - created).total_seconds() / 60) >= (expires - 1) if is_active(self.current_token): return self.current_token["access_token"] else: - r_json = request_wrapper( - url=OAUTH_TOKEN_URL, - data=self.token_data, - method="post", - proxies=self.proxy, - ).json() - self.current_token = { - "access_token": r_json["access_token"], - "created_at": datetime.now(), - "expires_in": r_json["expires_in"], - } + r_json = request_wrapper(url=OAUTH_TOKEN_URL,data=self.token_data, + method="post",proxies=self.proxy,).json() + self.current_token = {"access_token": r_json["access_token"], + "created_at": datetime.now(),"expires_in": r_json["expires_in"],} return self.current_token["access_token"] def _request(self, url: str, params: dict, **kwargs) -> requests.Response: - """ - Helper function to make a request to the DataQuery API. - Parameters - :param url : URL to make request to - :param params : Parameters to pass to request - Returns - :return : Response object - """ - # Make request using wrapper function - # this funciton wraps the request wrapper to add the access token - # and add the proxy to all requests from this class - return request_wrapper( - url=url, - params=params, - headers={"Authorization": f"Bearer {self.get_access_token()}"}, - method="get", - proxies=self.proxy, - **kwargs, - ).json() + return request_wrapper(url=url,method="get",proxies=self.proxy,**kwargs,params=params, + headers={"Authorization": f"Bearer {self.get_access_token()}"},).json() def heartbeat(self) -> bool: - """ - Check if the DataQuery API is up. - Returns - :return : True if up, False otherwise - """ - response: requests.Response = self._request( - url=OAUTH_BASE_URL + HEARTBEAT_ENDPOINT, - params={"data": "NO_REFERENCE_DATA"}, - ) - # no need for response.ok because - # response.status_code==200 is checked in the wrapper + response: requests.Response = self._request(url=OAUTH_BASE_URL + HEARTBEAT_ENDPOINT, + params={"data": "NO_REFERENCE_DATA"},) return "info" in response - def download( self, expressions: List[str], @@ -175,61 +90,25 @@ def download( conversion: str = "CONV_LASTBUS_ABS", nan_treatment: str = "NA_NOTHING", ) -> Union[List[Dict], pd.DataFrame]: - """ - Download data from the DataQuery API. - Parameters - :param expressions : List of expressions to download - :param start_date : Start date of data to download - :param end_date : End date of data to download - :param as_dataframe : Whether to return the data as a Pandas DataFrame, - or as a list of dictionaries. Defaults to True, returning a DataFrame. - :param calender : Calendar setting from DataQuery's specifications - :param frequency : Frequency setting from DataQuery's specifications - :param conversion : Conversion setting from DataQuery's specifications - :param nan_treatment : NaN treatment setting from DataQuery's specifications - :param run_sequential : Whether to run the download - sequentially or as multithreaded requests. - Defaults to False (multithreaded recommended). - Returns - :return : List of dictionaries containing data - """ - params_dict: Dict = { - "format": "JSON", - "start-date": start_date, - "end-date": end_date, - "calendar": calender, - "frequency": frequency, - "conversion": conversion, - "nan_treatment": nan_treatment, - "data": "NO_REFERENCE_DATA", - } - - expr_batches: List[List[str]] = [ - [expressions[i : min(i + EXPR_LIMIT, len(expressions))]] - for i in range(0, len(expressions), EXPR_LIMIT) - ] + "format": "JSON","start-date": start_date,"end-date": end_date,"calendar": calender, + "frequency": frequency,"conversion": conversion,"nan_treatment": nan_treatment,"data": "NO_REFERENCE_DATA",} + expr_batches: List[List[str]] = [[expressions[i : min(i + EXPR_LIMIT, len(expressions))]]for i in range(0, len(expressions), EXPR_LIMIT)] invalid_response_msg: str = "Invalid response from DataQuery API." heartbeat_failed_msg: str = "DataQuery API Heartbeat failed." - downloaded_data: List[Dict] = [] assert self.heartbeat(), heartbeat_failed_msg - print("Heartbeat Successful.") - for expr_batch in expr_batches: current_params: Dict = params_dict.copy() current_params["expressions"]: List = expr_batch curr_url: str = OAUTH_BASE_URL + TIMESERIES_ENDPOINT downloaded_data: List[Dict] = [] curr_response: Dict = {} - # loop to get next page from the response if any get_pagination: bool = True while get_pagination: sleep(API_DELAY_PARAM) curr_response: Dict = self._request(url=curr_url, params=current_params) - if (curr_response is None) or ( - "instruments" not in curr_response.keys() - ): + if (curr_response is None) or ("instruments" not in curr_response.keys()): raise Exception(invalid_response_msg) else: downloaded_data.extend(curr_response["instruments"]) @@ -238,60 +117,23 @@ def download( get_pagination = False break else: - curr_url = ( - OAUTH_BASE_URL + curr_response["links"][1]["next"] - ) + curr_url = (OAUTH_BASE_URL + curr_response["links"][1]["next"]) current_params = {} - if as_dataframe: - downloaded_data: pd.DataFrame = time_series_to_df(downloaded_data) - return downloaded_data -def time_series_to_df(dicts_list: List[Dict]) -> pd.DataFrame: - """ - Convert the downloaded data to a pandas DataFrame. - Parameters - :param dicts_list : List of dictionaries containing time series - data from the DataQuery API - Returns - :return : DataFrame containing the data - """ - dfs: List = [] - for d in dicts_list: - df = pd.DataFrame( - d["attributes"][0]["time-series"], columns=["real_date", "value"] - ) - df["expression"] = d["attributes"][0]["expression"] - dfs += [df] - - return_df = pd.concat(dfs, axis=0).reset_index(drop=True)[ - ["real_date", "expression", "value"] - ] - return_df["real_date"] = pd.to_datetime(return_df["real_date"]) - return return_df - - if __name__ == "__main__": import os - client_id: str = os.environ["JPMAQS_API_CLIENT_ID"] client_secret: str = os.environ["JPMAQS_API_CLIENT_SECRET"] - dq: DQInterface = DQInterface(client_id, client_secret) assert dq.heartbeat(), "DataQuery API Heartbeat failed." - - expressions = [ - "DB(JPMAQS,USD_EQXR_VT10,value)", - "DB(JPMAQS,AUD_EXALLOPENNESS_NSA_1YMA,value)", - ] + expressions = ["DB(JPMAQS,USD_EQXR_VT10,value)","DB(JPMAQS,AUD_EXALLOPENNESS_NSA_1YMA,value)",] start_date: str = "2020-01-25" end_date: str = "2023-02-05" - data: Union[List[Dict], pd.DataFrame] = dq.download( - expressions=expressions, start_date=start_date, end_date=end_date - ) + data: Union[List[Dict], pd.DataFrame] = dq.download(expressions=expressions, start_date=start_date, end_date=end_date) if isinstance(data, pd.DataFrame): print(data.head()) else: From bec09fe4f8a7fc6dd61851abe782c4575809392b Mon Sep 17 00:00:00 2001 From: Palash Tyagi Date: Sat, 18 Feb 2023 10:47:18 +0000 Subject: [PATCH 4/6] initial commit --- javascript/dataquery_api.js | 238 ++++++++++++++++++++++++++++++++++++ 1 file changed, 238 insertions(+) create mode 100644 javascript/dataquery_api.js diff --git a/javascript/dataquery_api.js b/javascript/dataquery_api.js new file mode 100644 index 0000000..d76d693 --- /dev/null +++ b/javascript/dataquery_api.js @@ -0,0 +1,238 @@ +const OAUTH_BASE_URL = "https://api-developer.jpmorgan.com/research/dataquery-authe/api/v2"; +const TIMESERIES_ENDPOINT = "/expressions/time-series"; +const HEARTBEAT_ENDPOINT = "/services/heartbeat"; +const OAUTH_TOKEN_URL = "https://authe.jpmchase.com/as/token.oauth2"; +const OAUTH_DQ_RESOURCE_ID = "JPMC:URI:RS-06785-DataQueryExternalApi-PROD"; +const API_DELAY_PARAM = 0.3; +const EXPR_LIMIT = 20; + +function requestWrapper(url, headers = null, params = null, method = "GET") { + return new Promise((resolve, reject) => { + const xhr = new XMLHttpRequest(); + xhr.open(method, url); + xhr.setRequestHeader("Content-Type", "application/json"); + xhr.setRequestHeader("Accept", "application/json"); + xhr.setRequestHeader("Authorization", `Bearer ${headers.Authorization}`); + + xhr.onload = function () { + if (xhr.status === 200) { + resolve(xhr.response); + } else { + reject(xhr.response); + } + }; + xhr.onerror = function () { + reject("Request failed."); + }; + + if (params) { + xhr.send(JSON.stringify(params)); + } else { + xhr.send(); + } + }); +} + +class DQInterface { + constructor(client_id, client_secret, proxy = null, dq_resource_id = OAUTH_DQ_RESOURCE_ID) { + this.client_id = client_id; + this.client_secret = client_secret; + this.proxy = proxy; + this.dq_resource_id = dq_resource_id; + this.current_token = null; + this.token_data = { + grant_type: "client_credentials", + client_id: this.client_id, + client_secret: this.client_secret, + aud: this.dq_resource_id, + }; + } + + getAccessToken() { + const isActive = (token) => { + if (!token) { + return false; + } else { + const created = new Date(token.created_at); + const expires = token.expires_in; + return (new Date() - created) / 60000 >= expires - 1; + } + }; + + if (isActive(this.current_token)) { + return this.current_token.access_token; + } else { + return new Promise((resolve, reject) => { + requestWrapper(OAUTH_TOKEN_URL, null, this.token_data, "POST") + .then((response) => { + const r_json = JSON.parse(response); + this.current_token = { + access_token: r_json.access_token, + created_at: new Date(), + expires_in: r_json.expires_in, + }; + resolve(this.current_token.access_token); + }) + .catch((error) => { + reject(error); + }); + }); + } + } + + request(url, params, method, headers = null) { + return new Promise((resolve, reject) => { + this.getAccessToken() + .then((access_token) => { + const full_url = OAUTH_BASE_URL + url; + requestWrapper(full_url, { Authorization: access_token }, params, method) + .then((response) => { + resolve(JSON.parse(response)); + }) + .catch((error) => { + reject(error); + }); + }) + .catch((error) => { + reject(error); + }); + }); + } + + heartbeat() { + // use the request function to make a heartbeat request + response = new Promise((resolve, reject) => { + this.request(HEARTBEAT_ENDPOINT, null, "GET") + .then((response) => { + resolve(response); + }) + .catch((error) => { + reject(error); + }); + }); + // if the "info" in response dict, return true else return false + return response["info"] == "OK"; + } + + + + download( + expressions, + start_date, + end_date, + calender = "CAL_ALLDAYS", + frequency = "FREQ_DAY", + conversion = "CONV_LASTBUS_ABS", + nan_treatment = "NA_NOTHING", + ) { + + // declare a dictionary to store predefined parameters + let params_dict = { + "format": "JSON", + "start-date": start_date, + "end-date": end_date, + "calendar": calender, + "frequency": frequency, + "conversion": conversion, + "nan_treatment": nan_treatment, + "data": "NO_REFERENCE_DATA", + } + + // create a list of lists to store the expressions of batches = expr_limit. + let expr_list = [] + let expr_batch = [] + for (let i = 0; i < expressions.length; i++) { + expr_batch.push(expressions[i]) + if (expr_batch.length == EXPR_LIMIT) { + expr_list.push(expr_batch) + expr_batch = [] + } + } + if (expr_batch.length > 0) { + expr_list.push(expr_batch) + } + + // assert that heartbeat is working + if (!this.heartbeat()) { + throw new Error("Heartbeat failed.") + } + + // create a list to store the downloaded data + let downloaded_data = [] + + // loop through the batches of expressions + for (let i = 0; i < expr_list.length; i++) { + // create a copy of the params_dict + let current_params = Object.assign({}, params_dict); + // add the expressions to the copy of params_dict + current_params["expressions"] = expr_list[i]; + // create a url + let curr_url = OAUTH_BASE_URL + TIMESERIES_ENDPOINT; + // create a list to store the current response + let curr_response = {}; + // loop to get next page from the response if any + let get_pagination = true; + while (get_pagination) { + // sleep(API_DELAY_PARAM) + curr_response = this.request(curr_url, current_params, "GET"); + if (curr_response === null || !("instruments" in curr_response)) { + throw new Error("Invalid response."); + } else { + downloaded_data = downloaded_data.concat(curr_response["instruments"]); + if ("links" in curr_response) { + if (curr_response["links"][1]["next"] === null) { + get_pagination = false; + break; + } else { + curr_url = OAUTH_BASE_URL + curr_response["links"][1]["next"]; + current_params = {}; + } + } + } + } + } + return downloaded_data; + } + + to_array(downloaded_data) { + /* for d in dict list + d["attributes"][0]["time-series"] has 2 values - first is datetime64[ns] and second is value of the expression + + create an output list of expression, date and value + */ + let output = [] + for (let i = 0; i < downloaded_data.length; i++) { + let d = downloaded_data[i]; + let expr = d["attributes"][0]["expression"]; + let date = d["attributes"][0]["time-series"][0]; + let value = d["attributes"][0]["time-series"][1]; + output.push([expr, date, value]); + } + return output; + } +} + +// create a main function to run the code +async function main() { + let client_id = ""; + let client_secret = ""; + + // create an instance of the class + let dqClient = new DQInterface(client_id, client_secret); + + // check heartbeat + let heartbeat = await dqClient.heartbeat(); + console.log(heartbeat); + + // download data + let expressions = [ "DB(JPMAQS,USD_EQXR_VT10,value)", + "DB(JPMAQS,AUD_EXALLOPENNESS_NSA_1YMA,value)"]; + let start_date = "2020-01-01"; + let end_date = "2020-12-31"; + let downloaded_data = await dqClient.download(expressions, start_date, end_date); + + // convert the downloaded data to an array + let output = dqClient.to_array(downloaded_data); + + console.log(output.slice(0, 10)); +} \ No newline at end of file From 1623a88a568de17a5a30d28db96c9eb04e930ca6 Mon Sep 17 00:00:00 2001 From: Palash Tyagi Date: Sat, 18 Feb 2023 12:18:05 +0000 Subject: [PATCH 5/6] fixed incorrect commit --- python/dataquery_api.py | 206 +++++++++++++++++++++++++++++++++++----- 1 file changed, 182 insertions(+), 24 deletions(-) diff --git a/python/dataquery_api.py b/python/dataquery_api.py index a7cbbf1..1d5b143 100644 --- a/python/dataquery_api.py +++ b/python/dataquery_api.py @@ -26,11 +26,30 @@ OAUTH_DQ_RESOURCE_ID: str = "JPMC:URI:RS-06785-DataQueryExternalApi-PROD" API_DELAY_PARAM: float = 0.3 # 300ms delay between requests. EXPR_LIMIT: int = 20 # Maximum number of expressions per request (not per "download"). + + def request_wrapper( - url: str,headers: Optional[Dict] = None,params: Optional[Dict] = None, - method: str = "get",**kwargs,) -> requests.Response: + url: str, + headers: Optional[Dict] = None, + params: Optional[Dict] = None, + method: str = "get", + **kwargs, +) -> requests.Response: + """ + Wrapper function for requests.request() used to make a request + to the JPMorgan DataQuery API. + Parameters + :param url : URL to make request to + :param params : Parameters to pass to request + Returns + :return : Response object + """ + # this function wraps the requests.request() method in a try/except block try: - response: requests.Response = requests.request(method=method, url=url, params=params, headers=headers, **kwargs) + response: requests.Response = requests.request( + method=method, url=url, params=params, headers=headers, **kwargs + ) + # Check response if response.status_code == 200: return response else: @@ -39,46 +58,112 @@ def request_wrapper( if isinstance(e, requests.exceptions.ProxyError): raise Exception("Proxy error. Check your proxy settings. Exception : ", e) elif isinstance(e, requests.exceptions.ConnectionError): - raise Exception("Connection error. Check your internet connection. Exception : ", e) + raise Exception( + "Connection error. Check your internet connection. Exception : ", e + ) else: raise e + + class DQInterface: def __init__( - self,client_id: str,client_secret: str,proxy: Optional[Dict] = None, - dq_resource_id: Optional[str] = OAUTH_DQ_RESOURCE_ID,): + self, + client_id: str, + client_secret: str, + proxy: Optional[Dict] = None, + dq_resource_id: Optional[str] = OAUTH_DQ_RESOURCE_ID, + ): self.client_id: str = client_id self.client_secret: str = client_secret self.proxy: str = proxy self.dq_resource_id: str = dq_resource_id self.current_token: Optional[Dict] = None - self.token_data: Dict = {"grant_type": "client_credentials","client_id": self.client_id, - "client_secret": self.client_secret,"aud": self.dq_resource_id,} + self.token_data: Dict = { + "grant_type": "client_credentials", + "client_id": self.client_id, + "client_secret": self.client_secret, + "aud": self.dq_resource_id, + } def get_access_token(self) -> str: + """ + Helper function to verify if the current token is active and valid, + and request a new one if it is not. + Returns + :return : Access token + """ + def is_active(token: Optional[dict] = None) -> bool: + """ + Helper function to check if a token is active. + Parameters + :param token : Token to check. Can be None, which will return False. + Returns + :return : True if token is active, False otherwise + """ + # return (token is None) or (datetime.now() - \ + # token["created_at"]).total_seconds() / 60 >= (token["expires_in"] - 1) if token is None: return False else: created: datetime = token["created_at"] expires: int = token["expires_in"] - return ((datetime.now() - created).total_seconds() / 60) >= (expires - 1) + return ((datetime.now() - created).total_seconds() / 60) >= ( + expires - 1 + ) + + # if the token is active (and valid), return it; else, make a request for a new token if is_active(self.current_token): return self.current_token["access_token"] else: - r_json = request_wrapper(url=OAUTH_TOKEN_URL,data=self.token_data, - method="post",proxies=self.proxy,).json() - self.current_token = {"access_token": r_json["access_token"], - "created_at": datetime.now(),"expires_in": r_json["expires_in"],} + r_json = request_wrapper( + url=OAUTH_TOKEN_URL, + data=self.token_data, + method="post", + proxies=self.proxy, + ).json() + self.current_token = { + "access_token": r_json["access_token"], + "created_at": datetime.now(), + "expires_in": r_json["expires_in"], + } return self.current_token["access_token"] def _request(self, url: str, params: dict, **kwargs) -> requests.Response: - return request_wrapper(url=url,method="get",proxies=self.proxy,**kwargs,params=params, - headers={"Authorization": f"Bearer {self.get_access_token()}"},).json() + """ + Helper function to make a request to the DataQuery API. + Parameters + :param url : URL to make request to + :param params : Parameters to pass to request + Returns + :return : Response object + """ + # Make request using wrapper function + # this funciton wraps the request wrapper to add the access token + # and add the proxy to all requests from this class + return request_wrapper( + url=url, + params=params, + headers={"Authorization": f"Bearer {self.get_access_token()}"}, + method="get", + proxies=self.proxy, + **kwargs, + ).json() def heartbeat(self) -> bool: - response: requests.Response = self._request(url=OAUTH_BASE_URL + HEARTBEAT_ENDPOINT, - params={"data": "NO_REFERENCE_DATA"},) + """ + Check if the DataQuery API is up. + Returns + :return : True if up, False otherwise + """ + response: requests.Response = self._request( + url=OAUTH_BASE_URL + HEARTBEAT_ENDPOINT, + params={"data": "NO_REFERENCE_DATA"}, + ) + # no need for response.ok because + # response.status_code==200 is checked in the wrapper return "info" in response + def download( self, expressions: List[str], @@ -90,25 +175,61 @@ def download( conversion: str = "CONV_LASTBUS_ABS", nan_treatment: str = "NA_NOTHING", ) -> Union[List[Dict], pd.DataFrame]: + """ + Download data from the DataQuery API. + Parameters + :param expressions : List of expressions to download + :param start_date : Start date of data to download + :param end_date : End date of data to download + :param as_dataframe : Whether to return the data as a Pandas DataFrame, + or as a list of dictionaries. Defaults to True, returning a DataFrame. + :param calender : Calendar setting from DataQuery's specifications + :param frequency : Frequency setting from DataQuery's specifications + :param conversion : Conversion setting from DataQuery's specifications + :param nan_treatment : NaN treatment setting from DataQuery's specifications + :param run_sequential : Whether to run the download + sequentially or as multithreaded requests. + Defaults to False (multithreaded recommended). + Returns + :return : List of dictionaries containing data + """ + params_dict: Dict = { - "format": "JSON","start-date": start_date,"end-date": end_date,"calendar": calender, - "frequency": frequency,"conversion": conversion,"nan_treatment": nan_treatment,"data": "NO_REFERENCE_DATA",} - expr_batches: List[List[str]] = [[expressions[i : min(i + EXPR_LIMIT, len(expressions))]]for i in range(0, len(expressions), EXPR_LIMIT)] + "format": "JSON", + "start-date": start_date, + "end-date": end_date, + "calendar": calender, + "frequency": frequency, + "conversion": conversion, + "nan_treatment": nan_treatment, + "data": "NO_REFERENCE_DATA", + } + + expr_batches: List[List[str]] = [ + [expressions[i : min(i + EXPR_LIMIT, len(expressions))]] + for i in range(0, len(expressions), EXPR_LIMIT) + ] invalid_response_msg: str = "Invalid response from DataQuery API." heartbeat_failed_msg: str = "DataQuery API Heartbeat failed." + downloaded_data: List[Dict] = [] assert self.heartbeat(), heartbeat_failed_msg + print("Heartbeat Successful.") + for expr_batch in expr_batches: current_params: Dict = params_dict.copy() current_params["expressions"]: List = expr_batch curr_url: str = OAUTH_BASE_URL + TIMESERIES_ENDPOINT downloaded_data: List[Dict] = [] curr_response: Dict = {} + # loop to get next page from the response if any get_pagination: bool = True while get_pagination: sleep(API_DELAY_PARAM) curr_response: Dict = self._request(url=curr_url, params=current_params) - if (curr_response is None) or ("instruments" not in curr_response.keys()): + if (curr_response is None) or ( + "instruments" not in curr_response.keys() + ): raise Exception(invalid_response_msg) else: downloaded_data.extend(curr_response["instruments"]) @@ -117,23 +238,60 @@ def download( get_pagination = False break else: - curr_url = (OAUTH_BASE_URL + curr_response["links"][1]["next"]) + curr_url = ( + OAUTH_BASE_URL + curr_response["links"][1]["next"] + ) current_params = {} + if as_dataframe: + downloaded_data: pd.DataFrame = time_series_to_df(downloaded_data) + return downloaded_data +def time_series_to_df(dicts_list: List[Dict]) -> pd.DataFrame: + """ + Convert the downloaded data to a pandas DataFrame. + Parameters + :param dicts_list : List of dictionaries containing time series + data from the DataQuery API + Returns + :return : DataFrame containing the data + """ + dfs: List = [] + for d in dicts_list: + df = pd.DataFrame( + d["attributes"][0]["time-series"], columns=["real_date", "value"] + ) + df["expression"] = d["attributes"][0]["expression"] + dfs += [df] + + return_df = pd.concat(dfs, axis=0).reset_index(drop=True)[ + ["real_date", "expression", "value"] + ] + return_df["real_date"] = pd.to_datetime(return_df["real_date"]) + return return_df + + if __name__ == "__main__": import os + client_id: str = os.environ["JPMAQS_API_CLIENT_ID"] client_secret: str = os.environ["JPMAQS_API_CLIENT_SECRET"] + dq: DQInterface = DQInterface(client_id, client_secret) assert dq.heartbeat(), "DataQuery API Heartbeat failed." - expressions = ["DB(JPMAQS,USD_EQXR_VT10,value)","DB(JPMAQS,AUD_EXALLOPENNESS_NSA_1YMA,value)",] + + expressions = [ + "DB(JPMAQS,USD_EQXR_VT10,value)", + "DB(JPMAQS,AUD_EXALLOPENNESS_NSA_1YMA,value)", + ] start_date: str = "2020-01-25" end_date: str = "2023-02-05" - data: Union[List[Dict], pd.DataFrame] = dq.download(expressions=expressions, start_date=start_date, end_date=end_date) + data: Union[List[Dict], pd.DataFrame] = dq.download( + expressions=expressions, start_date=start_date, end_date=end_date + ) if isinstance(data, pd.DataFrame): print(data.head()) else: From 7553eb2e79c14b840ca16224db698a0e3e948a85 Mon Sep 17 00:00:00 2001 From: Palash Tyagi Date: Sat, 18 Feb 2023 12:18:51 +0000 Subject: [PATCH 6/6] Moved license to root dir --- python/LICENSE => LICENSE | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename python/LICENSE => LICENSE (100%) diff --git a/python/LICENSE b/LICENSE similarity index 100% rename from python/LICENSE rename to LICENSE