Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 22 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,17 @@ pip install git+https://github.com/RedHatProductSecurity/trustshell.git#egg=trus

## Configuration

Ensure the following environment variables are set:
### Required Environment Variables

Set `TRUSTIFY_URL` to point to your Trustify instance:

```bash
export TRUSTIFY_URL="https://trustify.example.com"
```

### Authentication (Optional)

If your Trustify instance requires authentication, also set `AUTH_ENDPOINT`:

Atlas Production:
```bash
Expand All @@ -27,6 +37,13 @@ export TRUSTIFY_URL="https://atlas.release.stage.devshift.net"
export AUTH_ENDPOINT="https://auth.stage.redhat.com/auth/realms/EmployeeIDP/protocol/openid-connect"
```

For local or public Trustify instances without authentication, simply omit `AUTH_ENDPOINT`:

```bash
export TRUSTIFY_URL="http://localhost:8080"
# No AUTH_ENDPOINT needed - authentication will be skipped
```

Product Mapping:
```bash
export PRODDEFS_URL="https://prodsec.pages.example.com/product-definitions/products.json"
Expand All @@ -42,6 +59,8 @@ export TRUSTSHELL_SCRATCH="/path/to/custom/config/dir"

### Running in a container

**Note:** This section only applies if you need authentication (i.e., if you set `AUTH_ENDPOINT`).

The authentication flows tries to spawn a browser in order to authentication to Single-Sign On (SSO). If running in a 'headless' environment like a container image that won't work. When running in a container it's necessary to run the container image defined in [this Containerfile](src/trustshell/oidc/Containerfile).

One can build and run the container as follows:
Expand All @@ -63,6 +82,8 @@ Subsequent requests to Trustify will use an access_token stored or refreshed by

### Running in 'headless' mode

**Note:** This section only applies if you need authentication (i.e., if you set `AUTH_ENDPOINT`).

If you want to run in 'headless' mode, and have the `oidc-pkce-server` maintain a persistent authentication session. You can run the `oidc-pkce-server` container as mentioned above and set the following environment variable. You will still have to authenticate in the browser each time the `oidc-pkce-server` container is restarted.

```bash
Expand Down
3 changes: 2 additions & 1 deletion src/trustshell/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@
)
else:
TRUSTIFY_URL = url_env
AUTH_ENABLED = True
# Only enable authentication if AUTH_ENDPOINT is also set
AUTH_ENABLED = bool(os.getenv("AUTH_ENDPOINT"))
else:
TRUSTIFY_URL = "http://localhost:8080/api/v2/"
AUTH_ENABLED = False
Expand Down
39 changes: 33 additions & 6 deletions src/trustshell/products.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@

LATEST_ENDPOINT = f"{TRUSTIFY_URL}analysis/latest/component"
ANALYSIS_ENDPOINT = f"{TRUSTIFY_URL}analysis/component"
ANCESTOR_COUNT = 10000
ANCESTOR_COUNT = 100

custom_theme = Theme({"warning": "magenta", "error": "bold red"})
console = Console(color_system="auto", theme=custom_theme)
Expand Down Expand Up @@ -59,7 +59,29 @@ def prime_cache(check: bool, debug: bool) -> None:
console.print(f"sbom_count: {sbom_count}")
if not check:
console.print("Priming graph cache...")
httpx.get(f"{TRUSTIFY_URL}analysis/component", headers=auth_header, timeout=300)
console.print(
f"This may take a while with {sbom_count} SBOMs...", style="warning"
)
try:
response = httpx.get(
f"{TRUSTIFY_URL}analysis/component",
headers=auth_header,
timeout=1800, # 30 minutes - increased for large SBOM counts
)
response.raise_for_status()
console.print("Cache priming completed successfully!", style="bold green")
except httpx.TimeoutException:
console.print(
f"Request timed out after 30 minutes. The server may need more time to process {sbom_count} SBOMs.",
style="error",
)
sys.exit(1)
except httpx.HTTPStatusError as e:
console.print(f"HTTP error occurred: {e}", style="error")
sys.exit(1)
except Exception as e:
console.print(f"An error occurred: {e}", style="error")
sys.exit(1)


@click.command(context_settings={"help_option_names": ["-h", "--help"]})
Expand All @@ -74,7 +96,7 @@ def prime_cache(check: bool, debug: bool) -> None:
@click.option(
"--versions", "-v", is_flag=True, default=False, help="Show PURL versions."
)
@click.option("--latest", "-l", is_flag=True, default=True)
@click.option("--latest", "-l", is_flag=True, default=False)
@click.option("--cpes", "-c", is_flag=True, default=False)
@click.option("--flaw", "-f", help="OSIDB flaw uuid or CVE")
@click.option(
Expand Down Expand Up @@ -209,7 +231,11 @@ def extract_affects(ancestor_trees: list[Node]) -> set[tuple[str, str]]:
def _get_roots(
base_purl: str, latest: bool = True, show_versions: bool = False
) -> list[Node]:
"""Look up base_purl ancestors in Trustify"""
"""Look up base_purl ancestors in Trustify

Uses purl~ query which Trustify automatically translates into optimized
field-specific queries (purl:ty, purl:name, purl:namespace, etc.)
"""

auth_header = {}
if AUTH_ENABLED:
Expand All @@ -221,8 +247,9 @@ def _get_roots(
else:
endpoint = ANALYSIS_ENDPOINT

# Use the paginated query function
base_params = {"ancestors": ANCESTOR_COUNT, "q": f"purl~{base_purl}@"}
# purl~ is automatically translated by Trustify to field-specific queries
# e.g., purl~pkg:npm/foo becomes purl:ty=npm&purl:name=foo
base_params = {"ancestors": ANCESTOR_COUNT, "q": f"purl~{base_purl}"}
ancestors = paginated_trustify_query(
endpoint, base_params, auth_header, component_name=base_purl
)
Expand Down
31 changes: 30 additions & 1 deletion src/trustshell/purl.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ def _create_base_purl(purl_obj: PackageURL) -> str:
"--use-base-purl",
"-b",
is_flag=True,
default=True,
help="Use base_purl endpoint instead of analysis endpoint, faster but less accurate",
)
@click.argument(
Expand Down Expand Up @@ -195,6 +196,11 @@ def _query_trustify_packages_base_purl(
This uses a simple text search against base purls rather than the analysis endpoints.
Uses pagination to handle large result sets efficiently.

Search strategy:
1. First try searching with name~{component}
2. If no results and component contains '/', split at last '/' and search with
namespace~{namespace}&name~{name}

Parameters:
component (str): The component name to search for
auth_header (dict[str, str]): Authentication headers
Expand All @@ -203,7 +209,8 @@ def _query_trustify_packages_base_purl(
Returns:
list[PackageURL]: List of PackageURL objects found
"""
package_query = {"q": component}
# First attempt: search by name field
package_query = {"q": f"name~{component}"}
console.print(f"Querying Trustify for packages matching {component}")

try:
Expand All @@ -219,6 +226,28 @@ def _query_trustify_packages_base_purl(
items = response_data.get("items", [])
total = response_data.get("total", 0)

# If no results and component contains '/', try splitting namespace and name
if total == 0 and "/" in component:
logger.debug(
f"No results for name~{component}, trying namespace+name split"
)
last_slash_idx = component.rfind("/")
namespace = component[:last_slash_idx]
name = component[last_slash_idx + 1 :]

console.print(f"Retrying with namespace '{namespace}' and name '{name}'")
package_query = {"q": f"namespace~{namespace}&name~{name}"}

response_data = paginated_trustify_query(
PURL_BASE_ENDPOINT,
package_query,
auth_header,
component_name=f"base PURLs matching namespace '{namespace}' and name '{name}'",
)

items = response_data.get("items", [])
total = response_data.get("total", 0)

console.print(f"Found {total} base purls matching {component}")

if not include_versions:
Expand Down
65 changes: 1 addition & 64 deletions tests/test_purl.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ def test_query_trustify_packages_base_purl_success(self, mock_paginated_query):
# Check endpoint URL (first argument)
assert "purl/base" in call_args[0][0]
# Check query parameters (second argument)
assert call_args[0][1] == {"q": "openssl"}
assert call_args[0][1] == {"q": "name~openssl"}
# Check auth header (third argument)
assert call_args[0][2] == auth_header
# Check component name (fourth argument)
Expand Down Expand Up @@ -149,69 +149,6 @@ def test_query_trustify_packages_base_purl_with_versions(
assert any("arch=x86_64" in purl_str for purl_str in purl_strings)
assert any("arch=aarch64" in purl_str for purl_str in purl_strings)

@patch("trustshell.purl.paginated_trustify_query")
@patch("trustshell.purl.console.print")
def test_search_timeout_suggestion_analysis_endpoint(
self, mock_console_print, mock_paginated_query
):
"""Test that timeout errors from analysis endpoint suggest using --use-base-purl option"""
from trustshell.purl import search
from click.testing import CliRunner
import httpx

# Mock a timeout error
mock_paginated_query.side_effect = httpx.ReadTimeout("Request timed out")

runner = CliRunner()

# Should exit cleanly with Abort (exit code 1) - NOT using base_purl
result = runner.invoke(search, ["jenkins"])

# Should exit with code 1 (click.Abort)
assert result.exit_code == 1

# Verify the helpful message was printed
assert mock_console_print.called

# Check that the error message mentions --use-base-purl
call_args = [
call[0][0] for call in mock_console_print.call_args_list if call[0]
]
error_messages = " ".join(str(arg) for arg in call_args)
assert "--use-base-purl" in error_messages or "-b" in error_messages

@patch("trustshell.purl.paginated_trustify_query")
@patch("trustshell.purl.console.print")
def test_search_timeout_base_purl_endpoint(
self, mock_console_print, mock_paginated_query
):
"""Test that timeout errors from base_purl endpoint show different message"""
from trustshell.purl import search
from click.testing import CliRunner
import httpx

# Mock a timeout error from base_purl endpoint
mock_paginated_query.side_effect = httpx.ReadTimeout("Request timed out")

runner = CliRunner()

# Should exit cleanly with Abort (exit code 1) - USING base_purl
result = runner.invoke(search, ["--use-base-purl", "jenkins"])

# Should exit with code 1 (click.Abort)
assert result.exit_code == 1

# Verify the message was printed
assert mock_console_print.called

# Check that the error message does NOT suggest --use-base-purl (since already using it)
call_args = [
call[0][0] for call in mock_console_print.call_args_list if call[0]
]
error_messages = " ".join(str(arg) for arg in call_args)
assert "--use-base-purl" not in error_messages and "-b" not in error_messages
assert "server may be experiencing high load" in error_messages.lower()

@patch("trustshell.purl.paginated_trustify_query")
@patch("trustshell.purl.console.print")
def test_partial_results_warning(self, mock_console_print, mock_paginated_query):
Expand Down
2 changes: 1 addition & 1 deletion uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading