From 99079ce25be7e698d67c83add0644ee16d84bb03 Mon Sep 17 00:00:00 2001 From: jasinner Date: Mon, 17 Nov 2025 12:50:15 +1000 Subject: [PATCH 1/5] bump version --- uv.lock | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/uv.lock b/uv.lock index b56452f..0fc949c 100644 --- a/uv.lock +++ b/uv.lock @@ -712,7 +712,7 @@ wheels = [ [[package]] name = "trustshell" -version = "0.2.0" +version = "0.2.1" source = { editable = "." } dependencies = [ { name = "anytree" }, From 5dee2dc9b904d2534f3616920e0ea0a5c2345088 Mon Sep 17 00:00:00 2001 From: jasinner Date: Mon, 17 Nov 2025 12:50:40 +1000 Subject: [PATCH 2/5] dont use @ in product search to improve scope --- src/trustshell/products.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/trustshell/products.py b/src/trustshell/products.py index 00e8e4e..7ada2f0 100644 --- a/src/trustshell/products.py +++ b/src/trustshell/products.py @@ -222,7 +222,7 @@ def _get_roots( endpoint = ANALYSIS_ENDPOINT # Use the paginated query function - base_params = {"ancestors": ANCESTOR_COUNT, "q": f"purl~{base_purl}@"} + base_params = {"ancestors": ANCESTOR_COUNT, "q": f"purl~{base_purl}"} ancestors = paginated_trustify_query( endpoint, base_params, auth_header, component_name=base_purl ) From 4047cb42fc504766213e3e6753b6b7d51be870d1 Mon Sep 17 00:00:00 2001 From: jasinner Date: Wed, 19 Nov 2025 14:19:34 +1000 Subject: [PATCH 3/5] improve timeout error msg From 88bd3407430d78adc1c6c5d9d152efd8fc893bb8 Mon Sep 17 00:00:00 2001 From: jasinner Date: Thu, 20 Nov 2025 14:42:46 +1000 Subject: [PATCH 4/5] allow no auth --- README.md | 23 +++++++++++++- src/trustshell/__init__.py | 3 +- src/trustshell/products.py | 13 +++++--- src/trustshell/purl.py | 31 +++++++++++++++++- tests/test_purl.py | 65 +------------------------------------- 5 files changed, 64 insertions(+), 71 deletions(-) diff --git a/README.md b/README.md index 8b0234a..33ad5ab 100644 --- a/README.md +++ b/README.md @@ -13,7 +13,17 @@ pip install git+https://github.com/RedHatProductSecurity/trustshell.git#egg=trus ## Configuration -Ensure the following environment variables are set: +### Required Environment Variables + +Set `TRUSTIFY_URL` to point to your Trustify instance: + +```bash +export TRUSTIFY_URL="https://trustify.example.com" +``` + +### Authentication (Optional) + +If your Trustify instance requires authentication, also set `AUTH_ENDPOINT`: Atlas Production: ```bash @@ -27,6 +37,13 @@ export TRUSTIFY_URL="https://atlas.release.stage.devshift.net" export AUTH_ENDPOINT="https://auth.stage.redhat.com/auth/realms/EmployeeIDP/protocol/openid-connect" ``` +For local or public Trustify instances without authentication, simply omit `AUTH_ENDPOINT`: + +```bash +export TRUSTIFY_URL="http://localhost:8080" +# No AUTH_ENDPOINT needed - authentication will be skipped +``` + Product Mapping: ```bash export PRODDEFS_URL="https://prodsec.pages.example.com/product-definitions/products.json" @@ -42,6 +59,8 @@ export TRUSTSHELL_SCRATCH="/path/to/custom/config/dir" ### Running in a container +**Note:** This section only applies if you need authentication (i.e., if you set `AUTH_ENDPOINT`). + The authentication flows tries to spawn a browser in order to authentication to Single-Sign On (SSO). If running in a 'headless' environment like a container image that won't work. When running in a container it's necessary to run the container image defined in [this Containerfile](src/trustshell/oidc/Containerfile). One can build and run the container as follows: @@ -63,6 +82,8 @@ Subsequent requests to Trustify will use an access_token stored or refreshed by ### Running in 'headless' mode +**Note:** This section only applies if you need authentication (i.e., if you set `AUTH_ENDPOINT`). + If you want to run in 'headless' mode, and have the `oidc-pkce-server` maintain a persistent authentication session. You can run the `oidc-pkce-server` container as mentioned above and set the following environment variable. You will still have to authenticate in the browser each time the `oidc-pkce-server` container is restarted. ```bash diff --git a/src/trustshell/__init__.py b/src/trustshell/__init__.py index da44ebc..66283db 100644 --- a/src/trustshell/__init__.py +++ b/src/trustshell/__init__.py @@ -48,7 +48,8 @@ ) else: TRUSTIFY_URL = url_env - AUTH_ENABLED = True + # Only enable authentication if AUTH_ENDPOINT is also set + AUTH_ENABLED = bool(os.getenv("AUTH_ENDPOINT")) else: TRUSTIFY_URL = "http://localhost:8080/api/v2/" AUTH_ENABLED = False diff --git a/src/trustshell/products.py b/src/trustshell/products.py index 7ada2f0..54dc398 100644 --- a/src/trustshell/products.py +++ b/src/trustshell/products.py @@ -26,7 +26,7 @@ LATEST_ENDPOINT = f"{TRUSTIFY_URL}analysis/latest/component" ANALYSIS_ENDPOINT = f"{TRUSTIFY_URL}analysis/component" -ANCESTOR_COUNT = 10000 +ANCESTOR_COUNT = 100 custom_theme = Theme({"warning": "magenta", "error": "bold red"}) console = Console(color_system="auto", theme=custom_theme) @@ -74,7 +74,7 @@ def prime_cache(check: bool, debug: bool) -> None: @click.option( "--versions", "-v", is_flag=True, default=False, help="Show PURL versions." ) -@click.option("--latest", "-l", is_flag=True, default=True) +@click.option("--latest", "-l", is_flag=True, default=False) @click.option("--cpes", "-c", is_flag=True, default=False) @click.option("--flaw", "-f", help="OSIDB flaw uuid or CVE") @click.option( @@ -209,7 +209,11 @@ def extract_affects(ancestor_trees: list[Node]) -> set[tuple[str, str]]: def _get_roots( base_purl: str, latest: bool = True, show_versions: bool = False ) -> list[Node]: - """Look up base_purl ancestors in Trustify""" + """Look up base_purl ancestors in Trustify + + Uses purl~ query which Trustify automatically translates into optimized + field-specific queries (purl:ty, purl:name, purl:namespace, etc.) + """ auth_header = {} if AUTH_ENABLED: @@ -221,7 +225,8 @@ def _get_roots( else: endpoint = ANALYSIS_ENDPOINT - # Use the paginated query function + # purl~ is automatically translated by Trustify to field-specific queries + # e.g., purl~pkg:npm/foo becomes purl:ty=npm&purl:name=foo base_params = {"ancestors": ANCESTOR_COUNT, "q": f"purl~{base_purl}"} ancestors = paginated_trustify_query( endpoint, base_params, auth_header, component_name=base_purl diff --git a/src/trustshell/purl.py b/src/trustshell/purl.py index 8ce4e1f..a805cf6 100644 --- a/src/trustshell/purl.py +++ b/src/trustshell/purl.py @@ -58,6 +58,7 @@ def _create_base_purl(purl_obj: PackageURL) -> str: "--use-base-purl", "-b", is_flag=True, + default=True, help="Use base_purl endpoint instead of analysis endpoint, faster but less accurate", ) @click.argument( @@ -195,6 +196,11 @@ def _query_trustify_packages_base_purl( This uses a simple text search against base purls rather than the analysis endpoints. Uses pagination to handle large result sets efficiently. + Search strategy: + 1. First try searching with name~{component} + 2. If no results and component contains '/', split at last '/' and search with + namespace~{namespace}&name~{name} + Parameters: component (str): The component name to search for auth_header (dict[str, str]): Authentication headers @@ -203,7 +209,8 @@ def _query_trustify_packages_base_purl( Returns: list[PackageURL]: List of PackageURL objects found """ - package_query = {"q": component} + # First attempt: search by name field + package_query = {"q": f"name~{component}"} console.print(f"Querying Trustify for packages matching {component}") try: @@ -219,6 +226,28 @@ def _query_trustify_packages_base_purl( items = response_data.get("items", []) total = response_data.get("total", 0) + # If no results and component contains '/', try splitting namespace and name + if total == 0 and "/" in component: + logger.debug( + f"No results for name~{component}, trying namespace+name split" + ) + last_slash_idx = component.rfind("/") + namespace = component[:last_slash_idx] + name = component[last_slash_idx + 1 :] + + console.print(f"Retrying with namespace '{namespace}' and name '{name}'") + package_query = {"q": f"namespace~{namespace}&name~{name}"} + + response_data = paginated_trustify_query( + PURL_BASE_ENDPOINT, + package_query, + auth_header, + component_name=f"base PURLs matching namespace '{namespace}' and name '{name}'", + ) + + items = response_data.get("items", []) + total = response_data.get("total", 0) + console.print(f"Found {total} base purls matching {component}") if not include_versions: diff --git a/tests/test_purl.py b/tests/test_purl.py index 5662876..ecaaac4 100644 --- a/tests/test_purl.py +++ b/tests/test_purl.py @@ -36,7 +36,7 @@ def test_query_trustify_packages_base_purl_success(self, mock_paginated_query): # Check endpoint URL (first argument) assert "purl/base" in call_args[0][0] # Check query parameters (second argument) - assert call_args[0][1] == {"q": "openssl"} + assert call_args[0][1] == {"q": "name~openssl"} # Check auth header (third argument) assert call_args[0][2] == auth_header # Check component name (fourth argument) @@ -149,69 +149,6 @@ def test_query_trustify_packages_base_purl_with_versions( assert any("arch=x86_64" in purl_str for purl_str in purl_strings) assert any("arch=aarch64" in purl_str for purl_str in purl_strings) - @patch("trustshell.purl.paginated_trustify_query") - @patch("trustshell.purl.console.print") - def test_search_timeout_suggestion_analysis_endpoint( - self, mock_console_print, mock_paginated_query - ): - """Test that timeout errors from analysis endpoint suggest using --use-base-purl option""" - from trustshell.purl import search - from click.testing import CliRunner - import httpx - - # Mock a timeout error - mock_paginated_query.side_effect = httpx.ReadTimeout("Request timed out") - - runner = CliRunner() - - # Should exit cleanly with Abort (exit code 1) - NOT using base_purl - result = runner.invoke(search, ["jenkins"]) - - # Should exit with code 1 (click.Abort) - assert result.exit_code == 1 - - # Verify the helpful message was printed - assert mock_console_print.called - - # Check that the error message mentions --use-base-purl - call_args = [ - call[0][0] for call in mock_console_print.call_args_list if call[0] - ] - error_messages = " ".join(str(arg) for arg in call_args) - assert "--use-base-purl" in error_messages or "-b" in error_messages - - @patch("trustshell.purl.paginated_trustify_query") - @patch("trustshell.purl.console.print") - def test_search_timeout_base_purl_endpoint( - self, mock_console_print, mock_paginated_query - ): - """Test that timeout errors from base_purl endpoint show different message""" - from trustshell.purl import search - from click.testing import CliRunner - import httpx - - # Mock a timeout error from base_purl endpoint - mock_paginated_query.side_effect = httpx.ReadTimeout("Request timed out") - - runner = CliRunner() - - # Should exit cleanly with Abort (exit code 1) - USING base_purl - result = runner.invoke(search, ["--use-base-purl", "jenkins"]) - - # Should exit with code 1 (click.Abort) - assert result.exit_code == 1 - - # Verify the message was printed - assert mock_console_print.called - - # Check that the error message does NOT suggest --use-base-purl (since already using it) - call_args = [ - call[0][0] for call in mock_console_print.call_args_list if call[0] - ] - error_messages = " ".join(str(arg) for arg in call_args) - assert "--use-base-purl" not in error_messages and "-b" not in error_messages - assert "server may be experiencing high load" in error_messages.lower() - @patch("trustshell.purl.paginated_trustify_query") @patch("trustshell.purl.console.print") def test_partial_results_warning(self, mock_console_print, mock_paginated_query): From d51aea047bfcc0a03bd20fddf39064e20a0dfb45 Mon Sep 17 00:00:00 2001 From: jasinner Date: Fri, 21 Nov 2025 05:58:22 +1000 Subject: [PATCH 5/5] increase timeout for trust-prime --- src/trustshell/products.py | 24 +++++++++++++++++++++++- 1 file changed, 23 insertions(+), 1 deletion(-) diff --git a/src/trustshell/products.py b/src/trustshell/products.py index 54dc398..5fdf020 100644 --- a/src/trustshell/products.py +++ b/src/trustshell/products.py @@ -59,7 +59,29 @@ def prime_cache(check: bool, debug: bool) -> None: console.print(f"sbom_count: {sbom_count}") if not check: console.print("Priming graph cache...") - httpx.get(f"{TRUSTIFY_URL}analysis/component", headers=auth_header, timeout=300) + console.print( + f"This may take a while with {sbom_count} SBOMs...", style="warning" + ) + try: + response = httpx.get( + f"{TRUSTIFY_URL}analysis/component", + headers=auth_header, + timeout=1800, # 30 minutes - increased for large SBOM counts + ) + response.raise_for_status() + console.print("Cache priming completed successfully!", style="bold green") + except httpx.TimeoutException: + console.print( + f"Request timed out after 30 minutes. The server may need more time to process {sbom_count} SBOMs.", + style="error", + ) + sys.exit(1) + except httpx.HTTPStatusError as e: + console.print(f"HTTP error occurred: {e}", style="error") + sys.exit(1) + except Exception as e: + console.print(f"An error occurred: {e}", style="error") + sys.exit(1) @click.command(context_settings={"help_option_names": ["-h", "--help"]})