diff --git a/src/reait/api.py b/src/reait/api.py index fca82e1..a32684f 100755 --- a/src/reait/api.py +++ b/src/reait/api.py @@ -20,7 +20,7 @@ re_conf = { "apikey": environ.get("REAI_API_KEY", ""), "host": environ.get("REAI_API_HOST", "https://api.reveng.ai"), - "user_agent": environ.get("REAI_USER_AGENT", "RevEng.AI Toolkit") + "user_agent": environ.get("REAI_USER_AGENT", "RevEng.AI Toolkit"), } @@ -33,8 +33,7 @@ def __init__(self, reason: str, end_point: str = None): response.reason = reason response.status_code = 404 - response._content = b'{"success": false, "error": "' + \ - reason.encode() + b'"}' + response._content = b'{"success": false, "error": "' + reason.encode() + b'"}' response.url = ( f"{re_conf['host']}/{end_point if end_point[0] != '/' else end_point[1:]}" if end_point @@ -247,7 +246,7 @@ def RE_analyse( skip_scraping: bool = False, skip_capabilities: bool = False, skip_sbom: bool = False, - advanced_analysis: bool = False + advanced_analysis: bool = False, ) -> Response: """ Start analysis job for binary file @@ -309,7 +308,8 @@ def RE_analyse( "skip_scraping", "skip_capabilities", "skip_sbom", - "advanced_analysis" + "advanced_analysis", + "skip_cves", ): p_value = locals()[p_name] @@ -323,8 +323,7 @@ def RE_analyse( ) elif res.status_code == 400: if "error" in res.json().keys(): - logger.warning("Error analysing %s - %s", - fpath, res.json()["error"]) + logger.warning("Error analysing %s - %s", fpath, res.json()["error"]) res.raise_for_status() return res @@ -338,8 +337,7 @@ def RE_upload(fpath: str) -> Response: bin_id = re_binary_id(fpath) with open(fpath, "rb") as fd: - res: Response = reveng_req( - requests.post, "v1/upload", files={"file": fd}) + res: Response = reveng_req(requests.post, "v1/upload", files={"file": fd}) if res.ok: logger.info( @@ -347,8 +345,7 @@ def RE_upload(fpath: str) -> Response: ) elif res.status_code == 400: if "error" in res.json().keys(): - logger.warning("Error uploading %s - %s", - fpath, res.json()["error"]) + logger.warning("Error uploading %s - %s", fpath, res.json()["error"]) elif res.status_code == 413: logger.warning("File too large. Please upload files under 10MB.") elif res.status_code == 500: @@ -531,8 +528,7 @@ def RE_nearest_symbols_batch( if binaries: params["binaries_search_list"] = binaries - res: Response = reveng_req( - requests.post, "v1/ann/symbol/batch", json_data=params) + res: Response = reveng_req(requests.post, "v1/ann/symbol/batch", json_data=params) res.raise_for_status() return res @@ -653,11 +649,9 @@ def RE_functions_rename(function_id: int, new_name: str) -> Response: ) if res.ok: - logger.info("FunctionId %d has been renamed with '%s'.", - function_id, new_name) + logger.info("FunctionId %d has been renamed with '%s'.", function_id, new_name) else: - logger.warning("Error, cannot rename FunctionId %d. %s", - function_id, res.text) + logger.warning("Error, cannot rename FunctionId %d. %s", function_id, res.text) res.raise_for_status() return res @@ -742,9 +736,7 @@ def RE_functions_list( params["max_v_address"] = max_v_address res: Response = reveng_req( - requests.get, - f"/v2/analyses/{analysis_id}/functions/list", - params=params + requests.get, f"/v2/analyses/{analysis_id}/functions/list", params=params ) res.raise_for_status() @@ -757,8 +749,7 @@ def RE_function_callers_callees(function: int) -> Response: Get the callers and callees of a functions :param function: Function ID """ - res: Response = reveng_req( - requests.get, f"v2/functions/{function}/callees_callers") + res: Response = reveng_req(requests.get, f"v2/functions/{function}/callees_callers") res.raise_for_status() return res @@ -769,8 +760,7 @@ def RE_analysis_info(analysis_id: int) -> Response: Get the analysis information :param analysis_id: Analysis ID """ - res: Response = reveng_req( - requests.get, f"v2/analyses/{analysis_id}/info/basic") + res: Response = reveng_req(requests.get, f"v2/analyses/{analysis_id}/info/basic") res.raise_for_status() return res @@ -851,12 +841,10 @@ def _binary_format(binary: Binary) -> str: return "Mach-O" logger.error( - "Error, could not determine or unsupported" - f" binary format: {binary.format}." + "Error, could not determine or unsupported" f" binary format: {binary.format}." ) raise RuntimeError( - "Error, could not determine or " - f"unsupported binary format: {binary.format}" + "Error, could not determine or " f"unsupported binary format: {binary.format}" ) @@ -952,10 +940,7 @@ def RE_functions_data_types_poll( return res -def RE_generate_data_types( - analysis_id: int, - function_ids: list[int] -) -> Response: +def RE_generate_data_types(analysis_id: int, function_ids: list[int]) -> Response: """ Generate data types for the analysis :param aid: Analysis ID @@ -1015,10 +1000,7 @@ def RE_begin_ai_decompilation(function_id: int) -> Response: return res -def RE_poll_ai_decompilation( - function_id: int, - summarise: bool = False -) -> Response: +def RE_poll_ai_decompilation(function_id: int, summarise: bool = False) -> Response: """ Poll AI decompilation for the function :param function_id: Function ID @@ -1051,9 +1033,9 @@ def RE_analysis_lookup(binary_id: int) -> Response: def RE_collections_search( - page: int = 1, - page_size: int = 10, - query: dict = {}, + page: int = 1, + page_size: int = 10, + query: dict = {}, ) -> Response: """ Search for collections in the database @@ -1093,9 +1075,9 @@ def exist_and_populated(key: str) -> bool: def RE_binaries_search( - page: int = 1, - page_size: int = 10, - query: dict = {}, + page: int = 1, + page_size: int = 10, + query: dict = {}, ) -> Response: """ Search for binaries in the database @@ -1313,35 +1295,47 @@ def RE_binary_ann( res.raise_for_status() return res -def RE_name_score( - functions: list, - is_debug: bool = False -) -> Response: - body = { - "functions": functions, - "is_debug": is_debug - } - res: Response = reveng_req(requests.post, "v2/confidence/functions/name_score", json_data=body) +def RE_name_score(functions: list, is_debug: bool = False) -> Response: + + body = {"functions": functions, "is_debug": is_debug} + res: Response = reveng_req( + requests.post, "v2/confidence/functions/name_score", json_data=body + ) res.raise_for_status() return res -def RE_get_analysis_id_from_binary_id( - binary_id: int -) -> Response: +def RE_get_analysis_id_from_binary_id(binary_id: int) -> Response: res: Response = reveng_req(requests.get, f"v2/analyses/lookup/{binary_id}") res.raise_for_status() return res -def RE_get_functions_from_analysis( - analysis_id: int -) -> Response: - res: Response = reveng_req(requests.get, f"v2/analyses/{analysis_id}/functions/list") +def RE_get_functions_from_analysis(analysis_id: int) -> Response: + + res: Response = reveng_req( + requests.get, f"v2/analyses/{analysis_id}/functions/list" + ) + + res.raise_for_status() + return res + + +def RE_update_collection_description( + collection_id: int, + description: str = "", +): + params = { + "description": description, + } + + res: Response = reveng_req( + requests.patch, f"v2/collections/{collection_id}", json_data=params + ) res.raise_for_status() return res