Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 52 additions & 58 deletions src/reait/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
re_conf = {
"apikey": environ.get("REAI_API_KEY", ""),
"host": environ.get("REAI_API_HOST", "https://api.reveng.ai"),
"user_agent": environ.get("REAI_USER_AGENT", "RevEng.AI Toolkit")
"user_agent": environ.get("REAI_USER_AGENT", "RevEng.AI Toolkit"),
}


Expand All @@ -33,8 +33,7 @@ def __init__(self, reason: str, end_point: str = None):

response.reason = reason
response.status_code = 404
response._content = b'{"success": false, "error": "' + \
reason.encode() + b'"}'
response._content = b'{"success": false, "error": "' + reason.encode() + b'"}'
response.url = (
f"{re_conf['host']}/{end_point if end_point[0] != '/' else end_point[1:]}"
if end_point
Expand Down Expand Up @@ -247,7 +246,7 @@ def RE_analyse(
skip_scraping: bool = False,
skip_capabilities: bool = False,
skip_sbom: bool = False,
advanced_analysis: bool = False
advanced_analysis: bool = False,
) -> Response:
"""
Start analysis job for binary file
Expand Down Expand Up @@ -309,7 +308,8 @@ def RE_analyse(
"skip_scraping",
"skip_capabilities",
"skip_sbom",
"advanced_analysis"
"advanced_analysis",
"skip_cves",
):
p_value = locals()[p_name]

Expand All @@ -323,8 +323,7 @@ def RE_analyse(
)
elif res.status_code == 400:
if "error" in res.json().keys():
logger.warning("Error analysing %s - %s",
fpath, res.json()["error"])
logger.warning("Error analysing %s - %s", fpath, res.json()["error"])

res.raise_for_status()
return res
Expand All @@ -338,17 +337,15 @@ def RE_upload(fpath: str) -> Response:
bin_id = re_binary_id(fpath)

with open(fpath, "rb") as fd:
res: Response = reveng_req(
requests.post, "v1/upload", files={"file": fd})
res: Response = reveng_req(requests.post, "v1/upload", files={"file": fd})

if res.ok:
logger.info(
"Successfully uploaded binary to your account. %s - %s", fpath, bin_id
)
elif res.status_code == 400:
if "error" in res.json().keys():
logger.warning("Error uploading %s - %s",
fpath, res.json()["error"])
logger.warning("Error uploading %s - %s", fpath, res.json()["error"])
elif res.status_code == 413:
logger.warning("File too large. Please upload files under 10MB.")
elif res.status_code == 500:
Expand Down Expand Up @@ -531,8 +528,7 @@ def RE_nearest_symbols_batch(
if binaries:
params["binaries_search_list"] = binaries

res: Response = reveng_req(
requests.post, "v1/ann/symbol/batch", json_data=params)
res: Response = reveng_req(requests.post, "v1/ann/symbol/batch", json_data=params)

res.raise_for_status()
return res
Expand Down Expand Up @@ -653,11 +649,9 @@ def RE_functions_rename(function_id: int, new_name: str) -> Response:
)

if res.ok:
logger.info("FunctionId %d has been renamed with '%s'.",
function_id, new_name)
logger.info("FunctionId %d has been renamed with '%s'.", function_id, new_name)
else:
logger.warning("Error, cannot rename FunctionId %d. %s",
function_id, res.text)
logger.warning("Error, cannot rename FunctionId %d. %s", function_id, res.text)

res.raise_for_status()
return res
Expand Down Expand Up @@ -742,9 +736,7 @@ def RE_functions_list(
params["max_v_address"] = max_v_address

res: Response = reveng_req(
requests.get,
f"/v2/analyses/{analysis_id}/functions/list",
params=params
requests.get, f"/v2/analyses/{analysis_id}/functions/list", params=params
)

res.raise_for_status()
Expand All @@ -757,8 +749,7 @@ def RE_function_callers_callees(function: int) -> Response:
Get the callers and callees of a functions
:param function: Function ID
"""
res: Response = reveng_req(
requests.get, f"v2/functions/{function}/callees_callers")
res: Response = reveng_req(requests.get, f"v2/functions/{function}/callees_callers")

res.raise_for_status()
return res
Expand All @@ -769,8 +760,7 @@ def RE_analysis_info(analysis_id: int) -> Response:
Get the analysis information
:param analysis_id: Analysis ID
"""
res: Response = reveng_req(
requests.get, f"v2/analyses/{analysis_id}/info/basic")
res: Response = reveng_req(requests.get, f"v2/analyses/{analysis_id}/info/basic")

res.raise_for_status()
return res
Expand Down Expand Up @@ -851,12 +841,10 @@ def _binary_format(binary: Binary) -> str:
return "Mach-O"

logger.error(
"Error, could not determine or unsupported"
f" binary format: {binary.format}."
"Error, could not determine or unsupported" f" binary format: {binary.format}."
)
raise RuntimeError(
"Error, could not determine or "
f"unsupported binary format: {binary.format}"
"Error, could not determine or " f"unsupported binary format: {binary.format}"
)


Expand Down Expand Up @@ -952,10 +940,7 @@ def RE_functions_data_types_poll(
return res


def RE_generate_data_types(
analysis_id: int,
function_ids: list[int]
) -> Response:
def RE_generate_data_types(analysis_id: int, function_ids: list[int]) -> Response:
"""
Generate data types for the analysis
:param aid: Analysis ID
Expand Down Expand Up @@ -1015,10 +1000,7 @@ def RE_begin_ai_decompilation(function_id: int) -> Response:
return res


def RE_poll_ai_decompilation(
function_id: int,
summarise: bool = False
) -> Response:
def RE_poll_ai_decompilation(function_id: int, summarise: bool = False) -> Response:
"""
Poll AI decompilation for the function
:param function_id: Function ID
Expand Down Expand Up @@ -1051,9 +1033,9 @@ def RE_analysis_lookup(binary_id: int) -> Response:


def RE_collections_search(
page: int = 1,
page_size: int = 10,
query: dict = {},
page: int = 1,
page_size: int = 10,
query: dict = {},
) -> Response:
"""
Search for collections in the database
Expand Down Expand Up @@ -1093,9 +1075,9 @@ def exist_and_populated(key: str) -> bool:


def RE_binaries_search(
page: int = 1,
page_size: int = 10,
query: dict = {},
page: int = 1,
page_size: int = 10,
query: dict = {},
) -> Response:
"""
Search for binaries in the database
Expand Down Expand Up @@ -1313,35 +1295,47 @@ def RE_binary_ann(
res.raise_for_status()
return res

def RE_name_score(
functions: list,
is_debug: bool = False
) -> Response:

body = {
"functions": functions,
"is_debug": is_debug
}
res: Response = reveng_req(requests.post, "v2/confidence/functions/name_score", json_data=body)
def RE_name_score(functions: list, is_debug: bool = False) -> Response:

body = {"functions": functions, "is_debug": is_debug}
res: Response = reveng_req(
requests.post, "v2/confidence/functions/name_score", json_data=body
)

res.raise_for_status()
return res


def RE_get_analysis_id_from_binary_id(
binary_id: int
) -> Response:
def RE_get_analysis_id_from_binary_id(binary_id: int) -> Response:

res: Response = reveng_req(requests.get, f"v2/analyses/lookup/{binary_id}")

res.raise_for_status()
return res

def RE_get_functions_from_analysis(
analysis_id: int
) -> Response:

res: Response = reveng_req(requests.get, f"v2/analyses/{analysis_id}/functions/list")
def RE_get_functions_from_analysis(analysis_id: int) -> Response:

res: Response = reveng_req(
requests.get, f"v2/analyses/{analysis_id}/functions/list"
)

res.raise_for_status()
return res


def RE_update_collection_description(
collection_id: int,
description: str = "",
):
params = {
"description": description,
}

res: Response = reveng_req(
requests.patch, f"v2/collections/{collection_id}", json_data=params
)

res.raise_for_status()
return res
Loading