Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ Click on *Finish*
6/ You need to repeat 1/ to 5/ above for each of the transforms contained in this set:
- To Tags (project.py local totags)
- To Cluster (project.py local tocluster)
- To Input Addresses (project.py local toinputaddresses)

7/ Import the GraphSense Entities:
For this, go to *Entities* tab, click on *Import Entities*
Expand Down
29 changes: 29 additions & 0 deletions api/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

import re as regex

from typing import Literal

from graphsense.api import addresses_api, entities_api
from graphsense.api_client import ApiClient, ApiException

Expand Down Expand Up @@ -108,6 +110,23 @@ def get_address_details(currency, address):
return address_obj, tags, error


def get_address_neighbors(currency, address, direction: Literal["in", "out"]):
error = ""
configuration = open_config()[0]

with ApiClient(configuration) as api_client:
api_instance = addresses_api.AddressesApi(api_client)
neighbors = []

try:
neighbors = api_instance.list_address_neighbors(currency, str(address), direction)["neighbors"]

except Exception as e:
error = "Exception when calling List_address_neighbors: " + str(e)

return neighbors, error


def get_entity_details(currency, entity):
error = ""
configuration = open_config()[0]
Expand Down Expand Up @@ -575,6 +594,16 @@ def create_entity_with_details(
entity = ""
error = "No attribution tags found for this cluster in " + currency

if query_type == "input_addresses":
neighbor = json_result[0]["address"]
address = neighbor["address"]
amount_received = json_result[0]["value"]["value"] / set_factor
entity = response.addEntity(set_type, address)

# Override direction to indicate an input address
entity.addProperty('link#maltego.link.direction','link#maltego.link.direction','loose','output-to-input')
entity.setLinkLabel(f"{amount_received} {currency.upper()}")

entity = ""

return entity, error
9 changes: 5 additions & 4 deletions transforms.csv
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
Owner,Author,Disclaimer,Description,Version,Name,UIName,URL,entityName,oAuthSettingId,transformSettingIDs,seedIDs
INTERPOL Innovation Centre,Vincent Danjean <v.danjean@interpol.int>,,Returns the cluster details to which the address belongs.,0.1,tocluster,To Cluster [GRAPHSENSE],http://0.0.0.0:8080/run/tocluster,maltego.Cryptocurrency,,global#api_key;global#api_url,interpol.graphsense
INTERPOL Innovation Centre,Vincent Danjean <v.danjean@interpol.int>,,Returns details of a cryptocurerncy address.,0.1,todetails,To Details [GRAPHSENSE],http://0.0.0.0:8080/run/todetails,maltego.Cryptocurrency,,global#api_key;global#api_url,interpol.graphsense
INTERPOL Innovation Centre,Vincent Danjean <v.danjean@interpol.int>,,Returns known attribution tags,0.1,totags,To Tags [GRAPHSENSE],http://0.0.0.0:8080/run/totags,maltego.Cryptocurrency,,global#api_key;global#api_url,interpol.graphsense
Owner,Author,Disclaimer,Description,Version,Name,UIName,URL,entityName,oAuthSettingId,transformSettingIDs,seedIDs,outputEntities
INTERPOL Innovation Centre,Vincent Danjean <v.danjean@interpol.int>,,Returns the cluster details to which the address belongs.,0.1,tocluster,To Cluster [GRAPHSENSE],http://0.0.0.0:8080/run/tocluster,maltego.Cryptocurrency,,global#api_key;global#api_url,interpol.graphsense,maltego.Cryptocurrency
INTERPOL Innovation Centre,Vincent Danjean <v.danjean@interpol.int>,,Returns details of a cryptocurerncy address.,0.1,todetails,To Details [GRAPHSENSE],http://0.0.0.0:8080/run/todetails,maltego.Cryptocurrency,,global#api_key;global#api_url,interpol.graphsense,maltego.Cryptocurrency
INTERPOL Innovation Centre,Vincent Danjean <v.danjean@interpol.int>,,Returns input addresses of a cryptocurerncy address.,0.1,toinputaddresses,To Input Addresses [GRAPHSENSE],http://0.0.0.0:8080/run/toinputaddresses,maltego.Cryptocurrency,,global#api_key;global#api_url,interpol.graphsense,maltego.Cryptocurrency
INTERPOL Innovation Centre,Vincent Danjean <v.danjean@interpol.int>,,Returns known attribution tags,0.1,totags,To Tags [GRAPHSENSE],http://0.0.0.0:8080/run/totags,maltego.Cryptocurrency,,global#api_key;global#api_url,interpol.graphsense,maltego.Cryptocurrency
89 changes: 89 additions & 0 deletions transforms/ToInputAddresses.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
import sys
from maltego_trx.maltego import MaltegoMsg, MaltegoTransform, UIM_INFORM
from maltego_trx.transform import DiscoverableTransform

from extensions import registry

from api.utils import (
create_entity_with_details,
get_currency_from_entity_details,
get_address_neighbors,
)

from .utils import set_maltego_transformation_error


@registry.register_transform(
display_name="To Input Addresses",
input_entity="maltego.Cryptocurrency",
description="Returns input addresses of a cryptocurerncy address.",
output_entities=["maltego.Cryptocurrency"],
)
class ToInputAddresses(DiscoverableTransform):
"""
Lookup for all input addresses associated with a Virtual Asset (balance, total in and out, date last and first Tx...)
"""

@classmethod
def create_entities(
cls, request: MaltegoMsg, responseMaltego: MaltegoTransform = None
):
query_type = "input_addresses"

entity_details = request.Properties
if (
"cryptocurrency.wallet.name" in entity_details
): # this means the source entity is a cluster (Wallet) and not a cryptocurrency address
address = int(entity_details["cryptocurrency.wallet.name"])
currencies = [
str(entity_details["currency"])
] # if it is a cluster the currency is known
# if 'properties.cryptocurrencyaddress' in entity_details: #Else, we are looking for tags on a specific address
# address = entity_details['properties.cryptocurrencyaddress']
else:
if (
"currency" in entity_details
): # if a currency is already specified in the details, we use that currency and only that one. Else, we try to figure out what currency this could be and check all.
currencies = [entity_details["currency"]]
else:
currencies = get_currency_from_entity_details(
request.Properties
) # We use regex to check what Cryptocurrency this is. This could return BTC and BCH since they are similar.
if currencies[1]:
responseMaltego.addUIMessage(
f"\n{currencies[1]}\n", UIM_INFORM
)
return
currencies = currencies[0]
address = entity_details["properties.cryptocurrencyaddress"]

for currency in currencies:
(neighbors, error) = get_address_neighbors(
currency, address, "in"
)

if error:
set_maltego_transformation_error(
responseMaltego, currency, query_type, str(address), error
)
else:
for neighbor in neighbors:
neighbor, error = create_entity_with_details(
(neighbor, [], error),
currency,
query_type,
responseMaltego,
)
if error:
set_maltego_transformation_error(
responseMaltego,
currency,
query_type,
str(address),
error,
)
return


if __name__ == "__main__":
ToInputAddresses.create_entities(sys.argv[1])