diff --git a/pyproject.toml b/pyproject.toml index 39e5b95..81e3641 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "reait" -version = "1.1.1" +version = "1.2.0" readme = "README.md" classifiers=[ "Programming Language :: Python :: 3", diff --git a/src/reait/api.py b/src/reait/api.py index c3a3d5c..29f17f5 100755 --- a/src/reait/api.py +++ b/src/reait/api.py @@ -10,12 +10,12 @@ from datetime import datetime from hashlib import sha256 from lief import parse, Binary, ELF, PE, MachO -from numpy import array, vstack, dot, arccos, pi +from numpy import array, vstack from pandas import DataFrame from requests import request, Response, HTTPError from sklearn.metrics.pairwise import cosine_similarity -__version__ = "1.1.1" +__version__ = "1.2.0" re_conf = { "apikey": environ.get("REAI_API_KEY", ""), @@ -759,7 +759,9 @@ def RE_functions_list( params["max_v_address"] = max_v_address res: Response = reveng_req( - requests.get, f"v2/analyses/{analysis_id}/info/functions/list", params=params + requests.get, + f"v2/analyses/{analysis_id}/info/functions/list", + params=params ) res.raise_for_status() @@ -866,10 +868,12 @@ def _binary_format(binary: Binary) -> str: return "Mach-O" logger.error( - "Error, could not determine or unsupported" f" binary format: {binary.format}." + "Error, could not determine or unsupported" + f" binary format: {binary.format}." ) raise RuntimeError( - "Error, could not determine or " f"unsupported binary format: {binary.format}" + "Error, could not determine or " + f"unsupported binary format: {binary.format}" ) @@ -933,7 +937,42 @@ def RE_analysis_id(fpath: str, binary_id: int = 0) -> Response: return res -def RE_generate_data_types(analysis_id: int, function_ids: list[int]) -> Response: +def RE_functions_data_types( + function_ids: list[int], +) -> Response: + """ + Get data types for the functions + :param functions_ids: List of function IDs + :return: Response object + """ + endpoint = "/v2/functions/data_types" + res: Response = reveng_req( + requests.post, endpoint, json_data={"function_ids": function_ids} + ) + res.raise_for_status() + return res + + +def RE_functions_data_types_poll( + function_ids: list[int], +) -> Response: + """ + Poll data types for the functions + :param functions_ids: List of function IDs + :return: Response object + """ + endpoint = "/v2/functions/data_types" + res: Response = reveng_req( + requests.get, endpoint, params={"function_ids": function_ids} + ) + res.raise_for_status() + return res + + +def RE_generate_data_types( + analysis_id: int, + function_ids: list[int] +) -> Response: """ Generate data types for the analysis :param aid: Analysis ID @@ -947,6 +986,21 @@ def RE_generate_data_types(analysis_id: int, function_ids: list[int]) -> Respons return res +def RE_poll_data_types( + analysis_id: int, + function_id: int, +) -> Response: + """ + Poll data types for the analysis + :param aid: Analysis ID + """ + end_point = f"/v2/analyses/{analysis_id}/functions/{function_id}/data_types" + + res: Response = reveng_req(requests.get, end_point) + res.raise_for_status() + return res + + def RE_list_data_types(analysis_id: int, function_ids: list[int]) -> Response: """ List data types for the analysis @@ -978,16 +1032,25 @@ def RE_begin_ai_decompilation(function_id: int) -> Response: return res -def RE_poll_ai_decompilation(function_id: int) -> Response: +def RE_poll_ai_decompilation( + function_id: int, + summarise: bool = False +) -> Response: """ Poll AI decompilation for the function :param function_id: Function ID """ end_point = f"/v2/functions/{function_id}/ai-decompilation" + params = {} + + if summarise: + params["summarise"] = summarise + res: Response = reveng_req( requests.get, end_point, + params=params, ) res.raise_for_status() return res @@ -1007,16 +1070,41 @@ def RE_analysis_lookup(binary_id: int) -> Response: def RE_collections_search( page: int = 1, page_size: int = 10, - search: str = "", + query: dict = {}, ) -> Response: """ + Search for collections in the database """ end_point = "/v2/search/collections" - res: Response = reveng_req(requests.get, end_point, params={ + params = { "page": page, "page_size": page_size, - "partial_collection_name": search, - }) + } + + # this api support: + # partial_collection_name + # partial_binary_name + # partial_binary_sha256 + # model_name + # tags + + def exist_and_populated(key: str) -> bool: + return key in query and query[key] is not None and len(query[key]) > 0 + + if exist_and_populated("collection_name"): + params["partial_collection_name"] = query["collection_name"] + elif exist_and_populated("binary_name"): + params["partial_binary_name"] = query["binary_name"] + elif exist_and_populated("sha_256_hash"): + params["partial_binary_sha256"] = query["sha_256_hash"] + elif exist_and_populated("tags"): + params["tags"] = query["tags"] + elif exist_and_populated("model_name"): + params["model_name"] = query["model_name"] + elif exist_and_populated("query"): + params["partial_collection_name"] = query["query"] + + res: Response = reveng_req(requests.get, end_point, params=params) res.raise_for_status() return res @@ -1024,16 +1112,38 @@ def RE_collections_search( def RE_binaries_search( page: int = 1, page_size: int = 10, - search: str = "", + query: dict = {}, ) -> Response: """ + Search for binaries in the database """ end_point = "/v2/search/binaries" - res: Response = reveng_req(requests.get, end_point, params={ + params = { "page": page, "page_size": page_size, - "partial_name": search, - }) + } + + # this api support: + # partial_name + # partial_sha256 + # tags + # model_name + + def exist_and_populated(key: str) -> bool: + return key in query and query[key] is not None and len(query[key]) > 0 + + if exist_and_populated("binary_name"): + params["partial_name"] = query["binary_name"] + elif exist_and_populated("sha_256_hash"): + params["partial_sha256"] = query["sha_256_hash"] + elif exist_and_populated("tags"): + params["tags"] = query["tags"] + elif exist_and_populated("model_name"): + params["model_name"] = query["model_name"] + elif exist_and_populated("query"): + params["partial_name"] = query["query"] + + res: Response = reveng_req(requests.get, end_point, params=params) res.raise_for_status() return res