Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"

[project]
name = "reait"
version = "1.1.1"
version = "1.2.0"
readme = "README.md"
classifiers=[
"Programming Language :: Python :: 3",
Expand Down
140 changes: 125 additions & 15 deletions src/reait/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,12 @@
from datetime import datetime
from hashlib import sha256
from lief import parse, Binary, ELF, PE, MachO
from numpy import array, vstack, dot, arccos, pi
from numpy import array, vstack
from pandas import DataFrame
from requests import request, Response, HTTPError
from sklearn.metrics.pairwise import cosine_similarity

__version__ = "1.1.1"
__version__ = "1.2.0"

re_conf = {
"apikey": environ.get("REAI_API_KEY", ""),
Expand Down Expand Up @@ -759,7 +759,9 @@ def RE_functions_list(
params["max_v_address"] = max_v_address

res: Response = reveng_req(
requests.get, f"v2/analyses/{analysis_id}/info/functions/list", params=params
requests.get,
f"v2/analyses/{analysis_id}/info/functions/list",
params=params
)

res.raise_for_status()
Expand Down Expand Up @@ -866,10 +868,12 @@ def _binary_format(binary: Binary) -> str:
return "Mach-O"

logger.error(
"Error, could not determine or unsupported" f" binary format: {binary.format}."
"Error, could not determine or unsupported"
f" binary format: {binary.format}."
)
raise RuntimeError(
"Error, could not determine or " f"unsupported binary format: {binary.format}"
"Error, could not determine or "
f"unsupported binary format: {binary.format}"
)


Expand Down Expand Up @@ -933,7 +937,42 @@ def RE_analysis_id(fpath: str, binary_id: int = 0) -> Response:
return res


def RE_generate_data_types(analysis_id: int, function_ids: list[int]) -> Response:
def RE_functions_data_types(
function_ids: list[int],
) -> Response:
"""
Get data types for the functions
:param functions_ids: List of function IDs
:return: Response object
"""
endpoint = "/v2/functions/data_types"
res: Response = reveng_req(
requests.post, endpoint, json_data={"function_ids": function_ids}
)
res.raise_for_status()
return res


def RE_functions_data_types_poll(
function_ids: list[int],
) -> Response:
"""
Poll data types for the functions
:param functions_ids: List of function IDs
:return: Response object
"""
endpoint = "/v2/functions/data_types"
res: Response = reveng_req(
requests.get, endpoint, params={"function_ids": function_ids}
)
res.raise_for_status()
return res


def RE_generate_data_types(
analysis_id: int,
function_ids: list[int]
) -> Response:
"""
Generate data types for the analysis
:param aid: Analysis ID
Expand All @@ -947,6 +986,21 @@ def RE_generate_data_types(analysis_id: int, function_ids: list[int]) -> Respons
return res


def RE_poll_data_types(
analysis_id: int,
function_id: int,
) -> Response:
"""
Poll data types for the analysis
:param aid: Analysis ID
"""
end_point = f"/v2/analyses/{analysis_id}/functions/{function_id}/data_types"

res: Response = reveng_req(requests.get, end_point)
res.raise_for_status()
return res


def RE_list_data_types(analysis_id: int, function_ids: list[int]) -> Response:
"""
List data types for the analysis
Expand Down Expand Up @@ -978,16 +1032,25 @@ def RE_begin_ai_decompilation(function_id: int) -> Response:
return res


def RE_poll_ai_decompilation(function_id: int) -> Response:
def RE_poll_ai_decompilation(
function_id: int,
summarise: bool = False
) -> Response:
"""
Poll AI decompilation for the function
:param function_id: Function ID
"""
end_point = f"/v2/functions/{function_id}/ai-decompilation"

params = {}

if summarise:
params["summarise"] = summarise

res: Response = reveng_req(
requests.get,
end_point,
params=params,
)
res.raise_for_status()
return res
Expand All @@ -1007,33 +1070,80 @@ def RE_analysis_lookup(binary_id: int) -> Response:
def RE_collections_search(
page: int = 1,
page_size: int = 10,
search: str = "",
query: dict = {},
) -> Response:
"""
Search for collections in the database
"""
end_point = "/v2/search/collections"
res: Response = reveng_req(requests.get, end_point, params={
params = {
"page": page,
"page_size": page_size,
"partial_collection_name": search,
})
}

# this api support:
# partial_collection_name
# partial_binary_name
# partial_binary_sha256
# model_name
# tags

def exist_and_populated(key: str) -> bool:
return key in query and query[key] is not None and len(query[key]) > 0

if exist_and_populated("collection_name"):
params["partial_collection_name"] = query["collection_name"]
elif exist_and_populated("binary_name"):
params["partial_binary_name"] = query["binary_name"]
elif exist_and_populated("sha_256_hash"):
params["partial_binary_sha256"] = query["sha_256_hash"]
elif exist_and_populated("tags"):
params["tags"] = query["tags"]
elif exist_and_populated("model_name"):
params["model_name"] = query["model_name"]
elif exist_and_populated("query"):
params["partial_collection_name"] = query["query"]

res: Response = reveng_req(requests.get, end_point, params=params)
res.raise_for_status()
return res


def RE_binaries_search(
page: int = 1,
page_size: int = 10,
search: str = "",
query: dict = {},
) -> Response:
"""
Search for binaries in the database
"""
end_point = "/v2/search/binaries"
res: Response = reveng_req(requests.get, end_point, params={
params = {
"page": page,
"page_size": page_size,
"partial_name": search,
})
}

# this api support:
# partial_name
# partial_sha256
# tags
# model_name

def exist_and_populated(key: str) -> bool:
return key in query and query[key] is not None and len(query[key]) > 0

if exist_and_populated("binary_name"):
params["partial_name"] = query["binary_name"]
elif exist_and_populated("sha_256_hash"):
params["partial_sha256"] = query["sha_256_hash"]
elif exist_and_populated("tags"):
params["tags"] = query["tags"]
elif exist_and_populated("model_name"):
params["model_name"] = query["model_name"]
elif exist_and_populated("query"):
params["partial_name"] = query["query"]

res: Response = reveng_req(requests.get, end_point, params=params)
res.raise_for_status()
return res

Expand Down