From 0caf271e2096a25fd2999987616ea9286eb64f24 Mon Sep 17 00:00:00 2001 From: Ilya Siamionau Date: Thu, 17 Apr 2025 21:33:58 +0200 Subject: [PATCH 1/4] CM-46731 - Make all flows use scan service --- README.md | 1 - cycode/cli/apps/scan/code_scanner.py | 82 +++------ cycode/cli/apps/scan/scan_command.py | 4 +- cycode/cyclient/models.py | 28 ---- cycode/cyclient/scan_client.py | 132 +++------------ cycode/cyclient/scan_config_base.py | 28 +--- tests/cli/commands/test_main_command.py | 14 +- .../cyclient/mocked_responses/scan_client.py | 88 +--------- .../scan_config/test_default_scan_config.py | 5 +- .../scan_config/test_dev_scan_config.py | 5 +- tests/cyclient/test_scan_client.py | 155 +----------------- tests/test_code_scanner.py | 25 --- 12 files changed, 74 insertions(+), 493 deletions(-) diff --git a/README.md b/README.md index b30bc533..6218cba8 100644 --- a/README.md +++ b/README.md @@ -303,7 +303,6 @@ The Cycode CLI application offers several types of scans so that you can choose | `--monitor` | When specified, the scan results will be recorded in the knowledge graph. Please note that when working in `monitor` mode, the knowledge graph will not be updated as a result of SCM events (Push, Repo creation). (Supported for SCA scan type only). | | `--report` | When specified, a violations report will be generated. A URL link to the report will be printed as an output to the command execution. | | `--no-restore` | When specified, Cycode will not run restore command. Will scan direct dependencies ONLY! | -| `--sync` | Run scan synchronously (the default is asynchronous). | | `--gradle-all-sub-projects` | When specified, Cycode will run gradle restore command for all sub projects. Should run from root project directory ONLY! | | `--help` | Show options for given command. | diff --git a/cycode/cli/apps/scan/code_scanner.py b/cycode/cli/apps/scan/code_scanner.py index 35208d59..fcaec7a5 100644 --- a/cycode/cli/apps/scan/code_scanner.py +++ b/cycode/cli/apps/scan/code_scanner.py @@ -100,24 +100,27 @@ def set_issue_detected_by_scan_results(ctx: typer.Context, scan_results: List[Lo set_issue_detected(ctx, any(scan_result.issue_detected for scan_result in scan_results)) -def _should_use_scan_service(scan_type: str, scan_parameters: dict) -> bool: - return scan_type == consts.SECRET_SCAN_TYPE and scan_parameters.get('report') is True +def _should_use_sync_flow(command_scan_type: str, scan_type: str, sync_option: bool) -> bool: + """Decide whether to use sync flow or async flow for the scan. - -def _should_use_sync_flow( - command_scan_type: str, scan_type: str, sync_option: bool, scan_parameters: Optional[dict] = None -) -> bool: - if not sync_option: + The logic: + - for IAC scan, sync flow is always used + - for SAST scan, sync flow is not supported + - for SCA and Secrets scan, sync flow is supported only for path/repository scan + """ + if not sync_option and scan_type != consts.IAC_SCAN_TYPE: return False if command_scan_type not in {'path', 'repository'}: raise ValueError(f'Sync flow is not available for "{command_scan_type}" command type. Remove --sync option.') - if scan_type is consts.SAST_SCAN_TYPE: - raise ValueError('Sync scan is not available for SAST scan type.') + if scan_type == consts.IAC_SCAN_TYPE: + # sync in the only available flow for IAC scan; we do not use detector directly anymore + return True - if scan_parameters.get('report') is True: - raise ValueError('You can not use sync flow with report option. Either remove "report" or "sync" option.') + if scan_type is consts.SAST_SCAN_TYPE: # noqa: SIM103 + # SAST does not support sync flow + return False return True @@ -169,8 +172,7 @@ def _scan_batch_thread_func(batch: List[Document]) -> Tuple[str, CliError, Local scan_id = str(_generate_unique_id()) scan_completed = False - should_use_scan_service = _should_use_scan_service(scan_type, scan_parameters) - should_use_sync_flow = _should_use_sync_flow(command_scan_type, scan_type, sync_option, scan_parameters) + should_use_sync_flow = _should_use_sync_flow(command_scan_type, scan_type, sync_option) try: logger.debug('Preparing local files, %s', {'batch_files_count': len(batch)}) @@ -180,11 +182,9 @@ def _scan_batch_thread_func(batch: List[Document]) -> Tuple[str, CliError, Local cycode_client, zipped_documents, scan_type, - scan_id, is_git_diff, is_commit_range, scan_parameters, - should_use_scan_service, should_use_sync_flow, ) @@ -224,7 +224,6 @@ def _scan_batch_thread_func(batch: List[Document]) -> Tuple[str, CliError, Local zip_file_size, command_scan_type, error_message, - should_use_scan_service or should_use_sync_flow, # sync flow implies scan service ) return scan_id, error, local_scan_result @@ -456,24 +455,16 @@ def perform_scan( cycode_client: 'ScanClient', zipped_documents: 'InMemoryZip', scan_type: str, - scan_id: str, is_git_diff: bool, is_commit_range: bool, scan_parameters: dict, - should_use_scan_service: bool = False, should_use_sync_flow: bool = False, ) -> ZippedFileScanResult: if should_use_sync_flow: # it does not support commit range scans; should_use_sync_flow handles it return perform_scan_sync(cycode_client, zipped_documents, scan_type, scan_parameters, is_git_diff) - if scan_type in (consts.SCA_SCAN_TYPE, consts.SAST_SCAN_TYPE) or should_use_scan_service: - return perform_scan_async(cycode_client, zipped_documents, scan_type, scan_parameters, is_commit_range) - - if is_commit_range: - return cycode_client.commit_range_zipped_file_scan(scan_type, zipped_documents, scan_id) - - return cycode_client.zipped_file_scan(scan_type, zipped_documents, scan_id, scan_parameters, is_git_diff) + return perform_scan_async(cycode_client, zipped_documents, scan_type, scan_parameters, is_commit_range) def perform_scan_async( @@ -823,7 +814,6 @@ def _report_scan_status( zip_size: int, command_scan_type: str, error_message: Optional[str], - should_use_scan_service: bool = False, ) -> None: try: end_scan_time = time.time() @@ -840,12 +830,15 @@ def _report_scan_status( 'scan_type': scan_type, } - cycode_client.report_scan_status(scan_type, scan_id, scan_status, should_use_scan_service) + cycode_client.report_scan_status(scan_type, scan_id, scan_status) except Exception as e: logger.debug('Failed to report scan status', exc_info=e) def _generate_unique_id() -> UUID: + if 'PYTEST_TEST_UNIQUE_ID' in os.environ: + return UUID(os.environ['PYTEST_TEST_UNIQUE_ID']) + return uuid4() @@ -868,13 +861,13 @@ def _get_scan_result( if not scan_details.detections_count: return init_default_scan_result(scan_id) - scan_raw_detections = cycode_client.get_scan_raw_detections(scan_type, scan_id) + scan_raw_detections = cycode_client.get_scan_raw_detections(scan_id) return ZippedFileScanResult( did_detect=True, detections_per_file=_map_detections_per_file_and_commit_id(scan_type, scan_raw_detections), scan_id=scan_id, - report_url=_try_get_any_report_url_if_needed(cycode_client, scan_id, scan_type, scan_parameters), + report_url=_try_get_aggregation_report_url_if_needed(scan_parameters, cycode_client, scan_type), ) @@ -886,37 +879,6 @@ def init_default_scan_result(scan_id: str) -> ZippedFileScanResult: ) -def _try_get_any_report_url_if_needed( - cycode_client: 'ScanClient', - scan_id: str, - scan_type: str, - scan_parameters: dict, -) -> Optional[str]: - """Tries to get aggregation report URL if needed, otherwise tries to get report URL.""" - aggregation_report_url = None - if scan_parameters: - _try_get_report_url_if_needed(cycode_client, scan_id, scan_type, scan_parameters) - aggregation_report_url = _try_get_aggregation_report_url_if_needed(scan_parameters, cycode_client, scan_type) - - if aggregation_report_url: - return aggregation_report_url - - return _try_get_report_url_if_needed(cycode_client, scan_id, scan_type, scan_parameters) - - -def _try_get_report_url_if_needed( - cycode_client: 'ScanClient', scan_id: str, scan_type: str, scan_parameters: dict -) -> Optional[str]: - if not scan_parameters.get('report', False): - return None - - try: - report_url_response = cycode_client.get_scan_report_url(scan_id, scan_type) - return report_url_response.report_url - except Exception as e: - logger.debug('Failed to get report URL', exc_info=e) - - def _set_aggregation_report_url(ctx: typer.Context, aggregation_report_url: Optional[str] = None) -> None: ctx.obj['aggregation_report_url'] = aggregation_report_url diff --git a/cycode/cli/apps/scan/scan_command.py b/cycode/cli/apps/scan/scan_command.py index 3ba7699b..84485c0b 100644 --- a/cycode/cli/apps/scan/scan_command.py +++ b/cycode/cli/apps/scan/scan_command.py @@ -54,7 +54,9 @@ def scan_command( ] = SeverityOption.INFO, sync: Annotated[ bool, - typer.Option('--sync', help='Run scan synchronously.', show_default='asynchronously'), + typer.Option( + '--sync', help='Run scan synchronously (INTERNAL FOR IDEs).', show_default='asynchronously', hidden=True + ), ] = False, report: Annotated[ bool, diff --git a/cycode/cyclient/models.py b/cycode/cyclient/models.py index 2433ef6c..2c0f53d7 100644 --- a/cycode/cyclient/models.py +++ b/cycode/cyclient/models.py @@ -59,19 +59,6 @@ def __init__(self, file_name: str, detections: List[Detection], commit_id: Optio self.commit_id = commit_id -class DetectionsPerFileSchema(Schema): - class Meta: - unknown = EXCLUDE - - file_name = fields.String() - detections = fields.List(fields.Nested(DetectionSchema)) - commit_id = fields.String(allow_none=True) - - @post_load - def build_dto(self, data: Dict[str, Any], **_) -> 'DetectionsPerFile': - return DetectionsPerFile(**data) - - class ZippedFileScanResult(Schema): def __init__( self, @@ -89,21 +76,6 @@ def __init__( self.err = err -class ZippedFileScanResultSchema(Schema): - class Meta: - unknown = EXCLUDE - - did_detect = fields.Boolean() - scan_id = fields.String() - report_url = fields.String(allow_none=True) - detections_per_file = fields.List(fields.Nested(DetectionsPerFileSchema)) - err = fields.String() - - @post_load - def build_dto(self, data: Dict[str, Any], **_) -> 'ZippedFileScanResult': - return ZippedFileScanResult(**data) - - class ScanResult(Schema): def __init__( self, diff --git a/cycode/cyclient/scan_client.py b/cycode/cyclient/scan_client.py index c6bfc57c..09908943 100644 --- a/cycode/cyclient/scan_client.py +++ b/cycode/cyclient/scan_client.py @@ -1,5 +1,6 @@ import json -from typing import TYPE_CHECKING, List, Optional, Set, Union +from copy import deepcopy +from typing import TYPE_CHECKING, List, Set, Union from uuid import UUID from requests import Response @@ -22,34 +23,12 @@ def __init__( self.scan_cycode_client = scan_cycode_client self.scan_config = scan_config - self._SCAN_SERVICE_CONTROLLER_PATH = 'api/v1/scan' self._SCAN_SERVICE_CLI_CONTROLLER_PATH = 'api/v1/cli-scan' - - self._DETECTIONS_SERVICE_CONTROLLER_PATH = 'api/v1/detections' self._DETECTIONS_SERVICE_CLI_CONTROLLER_PATH = 'api/v1/detections/cli' - self._POLICIES_SERVICE_CONTROLLER_PATH_V3 = 'api/v3/policies' self._hide_response_log = hide_response_log - def get_scan_controller_path(self, scan_type: str, should_use_scan_service: bool = False) -> str: - if not should_use_scan_service and scan_type == consts.IAC_SCAN_TYPE: - # we don't use async flow for IaC scan yet - return self._SCAN_SERVICE_CONTROLLER_PATH - if not should_use_scan_service and scan_type == consts.SECRET_SCAN_TYPE: - # if a secret scan goes to detector directly, we should not use CLI controller. - # CLI controller belongs to the scan service only - return self._SCAN_SERVICE_CONTROLLER_PATH - - return self._SCAN_SERVICE_CLI_CONTROLLER_PATH - - def get_detections_service_controller_path(self, scan_type: str) -> str: - if scan_type == consts.IAC_SCAN_TYPE: - # we don't use async flow for IaC scan yet - return self._DETECTIONS_SERVICE_CONTROLLER_PATH - - return self._DETECTIONS_SERVICE_CLI_CONTROLLER_PATH - @staticmethod def get_scan_flow_type(should_use_sync_flow: bool = False) -> str: if should_use_sync_flow: @@ -57,13 +36,10 @@ def get_scan_flow_type(should_use_sync_flow: bool = False) -> str: return '' - def get_scan_service_url_path( - self, scan_type: str, should_use_scan_service: bool = False, should_use_sync_flow: bool = False - ) -> str: - service_path = self.scan_config.get_service_name(scan_type, should_use_scan_service) - controller_path = self.get_scan_controller_path(scan_type, should_use_scan_service) + def get_scan_service_url_path(self, scan_type: str, should_use_sync_flow: bool = False) -> str: + service_path = self.scan_config.get_service_name(scan_type) flow_type = self.get_scan_flow_type(should_use_sync_flow) - return f'{service_path}/{controller_path}{flow_type}' + return f'{service_path}/{self._SCAN_SERVICE_CLI_CONTROLLER_PATH}{flow_type}' def content_scan(self, scan_type: str, file_name: str, content: str, is_git_diff: bool = True) -> models.ScanResult: path = f'{self.get_scan_service_url_path(scan_type)}/content' @@ -73,27 +49,6 @@ def content_scan(self, scan_type: str, file_name: str, content: str, is_git_diff ) return self.parse_scan_response(response) - def get_zipped_file_scan_url_path(self, scan_type: str) -> str: - return f'{self.get_scan_service_url_path(scan_type)}/zipped-file' - - def zipped_file_scan( - self, scan_type: str, zip_file: InMemoryZip, scan_id: str, scan_parameters: dict, is_git_diff: bool = False - ) -> models.ZippedFileScanResult: - files = {'file': ('multiple_files_scan.zip', zip_file.read())} - - response = self.scan_cycode_client.post( - url_path=self.get_zipped_file_scan_url_path(scan_type), - data={'scan_id': scan_id, 'is_git_diff': is_git_diff, 'scan_parameters': json.dumps(scan_parameters)}, - files=files, - hide_response_content_log=self._hide_response_log, - ) - - return self.parse_zipped_file_scan_response(response) - - def get_scan_report_url(self, scan_id: str, scan_type: str) -> models.ScanReportUrlResponse: - response = self.scan_cycode_client.get(url_path=self.get_scan_report_url_path(scan_id, scan_type)) - return models.ScanReportUrlResponseSchema().build_dto(response.json()) - def get_scan_aggregation_report_url(self, aggregation_id: str, scan_type: str) -> models.ScanReportUrlResponse: response = self.scan_cycode_client.get( url_path=self.get_scan_aggregation_report_url_path(aggregation_id, scan_type) @@ -103,16 +58,12 @@ def get_scan_aggregation_report_url(self, aggregation_id: str, scan_type: str) - def get_zipped_file_scan_async_url_path(self, scan_type: str, should_use_sync_flow: bool = False) -> str: async_scan_type = self.scan_config.get_async_scan_type(scan_type) async_entity_type = self.scan_config.get_async_entity_type(scan_type) - scan_service_url_path = self.get_scan_service_url_path( - scan_type, should_use_scan_service=True, should_use_sync_flow=should_use_sync_flow - ) + scan_service_url_path = self.get_scan_service_url_path(scan_type, should_use_sync_flow=should_use_sync_flow) return f'{scan_service_url_path}/{async_scan_type}/{async_entity_type}' def get_zipped_file_scan_sync_url_path(self, scan_type: str) -> str: server_scan_type = self.scan_config.get_async_scan_type(scan_type) - scan_service_url_path = self.get_scan_service_url_path( - scan_type, should_use_scan_service=True, should_use_sync_flow=True - ) + scan_service_url_path = self.get_scan_service_url_path(scan_type, should_use_sync_flow=True) return f'{scan_service_url_path}/{server_scan_type}/repository' def zipped_file_scan_sync( @@ -124,6 +75,7 @@ def zipped_file_scan_sync( ) -> models.ScanResultsSyncFlow: files = {'file': ('multiple_files_scan.zip', zip_file.read())} + scan_parameters = deepcopy(scan_parameters) # avoid mutating the original dict if 'report' in scan_parameters: del scan_parameters['report'] # BE raises validation error instead of ignoring it @@ -180,16 +132,10 @@ def multiple_zipped_file_scan_async( return models.ScanInitializationResponseSchema().load(response.json()) def get_scan_details_path(self, scan_type: str, scan_id: str) -> str: - return f'{self.get_scan_service_url_path(scan_type, should_use_scan_service=True)}/{scan_id}' - - def get_scan_report_url_path(self, scan_id: str, scan_type: str) -> str: - return f'{self.get_scan_service_url_path(scan_type, should_use_scan_service=True)}/reportUrl/{scan_id}' + return f'{self.get_scan_service_url_path(scan_type)}/{scan_id}' def get_scan_aggregation_report_url_path(self, aggregation_id: str, scan_type: str) -> str: - return ( - f'{self.get_scan_service_url_path(scan_type, should_use_scan_service=True)}' - f'/reportUrlByAggregationId/{aggregation_id}' - ) + return f'{self.get_scan_service_url_path(scan_type)}' f'/reportUrlByAggregationId/{aggregation_id}' def get_scan_details(self, scan_type: str, scan_id: str) -> models.ScanDetailsResponse: path = self.get_scan_details_path(scan_type, scan_id) @@ -256,21 +202,13 @@ def get_detection_rules(self, detection_rules_ids: Union[Set[str], List[str]]) - return self.parse_detection_rules_response(response) - def get_scan_detections_path(self, scan_type: str) -> str: - return f'{self.scan_config.get_detections_prefix()}/{self.get_detections_service_controller_path(scan_type)}' + def get_scan_detections_path(self) -> str: + return f'{self.scan_config.get_detections_prefix()}/{self._DETECTIONS_SERVICE_CLI_CONTROLLER_PATH}' - @staticmethod - def get_scan_detections_list_path_suffix(scan_type: str) -> str: - # we don't use async flow for IaC scan yet - if scan_type == consts.IAC_SCAN_TYPE: - return '' - - return '/detections' + def get_scan_detections_list_path(self) -> str: + return f'{self.get_scan_detections_path()}/detections' - def get_scan_detections_list_path(self, scan_type: str) -> str: - return f'{self.get_scan_detections_path(scan_type)}{self.get_scan_detections_list_path_suffix(scan_type)}' - - def get_scan_raw_detections(self, scan_type: str, scan_id: str) -> List[dict]: + def get_scan_raw_detections(self, scan_id: str) -> List[dict]: params = {'scan_id': scan_id} page_size = 200 @@ -284,7 +222,7 @@ def get_scan_raw_detections(self, scan_type: str, scan_id: str) -> List[dict]: params['page_number'] = page_number response = self.scan_cycode_client.get( - url_path=self.get_scan_detections_list_path(scan_type), + url_path=self.get_scan_detections_list_path(), params=params, hide_response_content_log=self._hide_response_log, ).json() @@ -295,45 +233,15 @@ def get_scan_raw_detections(self, scan_type: str, scan_id: str) -> List[dict]: return raw_detections - def commit_range_zipped_file_scan( - self, scan_type: str, zip_file: InMemoryZip, scan_id: str - ) -> models.ZippedFileScanResult: - url_path = f'{self.get_scan_service_url_path(scan_type)}/commit-range-zipped-file' - files = {'file': ('multiple_files_scan.zip', zip_file.read())} - response = self.scan_cycode_client.post( - url_path=url_path, data={'scan_id': scan_id}, files=files, hide_response_content_log=self._hide_response_log - ) - return self.parse_zipped_file_scan_response(response) + def get_report_scan_status_path(self, scan_type: str, scan_id: str) -> str: + return f'{self.get_scan_service_url_path(scan_type)}/{scan_id}/status' - def get_report_scan_status_path(self, scan_type: str, scan_id: str, should_use_scan_service: bool = False) -> str: - return f'{self.get_scan_service_url_path(scan_type, should_use_scan_service)}/{scan_id}/status' - - def report_scan_status( - self, scan_type: str, scan_id: str, scan_status: dict, should_use_scan_service: bool = False - ) -> None: + def report_scan_status(self, scan_type: str, scan_id: str, scan_status: dict) -> None: self.scan_cycode_client.post( - url_path=self.get_report_scan_status_path( - scan_type, scan_id, should_use_scan_service=should_use_scan_service - ), + url_path=self.get_report_scan_status_path(scan_type, scan_id), body=scan_status, ) @staticmethod def parse_scan_response(response: Response) -> models.ScanResult: return models.ScanResultSchema().load(response.json()) - - @staticmethod - def parse_zipped_file_scan_response(response: Response) -> models.ZippedFileScanResult: - return models.ZippedFileScanResultSchema().load(response.json()) - - @staticmethod - def get_service_name(scan_type: str) -> Optional[str]: - # TODO(MarshalX): get_service_name should be removed from ScanClient? Because it exists in ScanConfig - if scan_type == consts.SECRET_SCAN_TYPE: - return 'secret' - if scan_type == consts.IAC_SCAN_TYPE: - return 'iac' - if scan_type == consts.SCA_SCAN_TYPE or scan_type == consts.SAST_SCAN_TYPE: - return 'scans' - - return None diff --git a/cycode/cyclient/scan_config_base.py b/cycode/cyclient/scan_config_base.py index 6dfa97ef..d60068ce 100644 --- a/cycode/cyclient/scan_config_base.py +++ b/cycode/cyclient/scan_config_base.py @@ -5,7 +5,7 @@ class ScanConfigBase(ABC): @abstractmethod - def get_service_name(self, scan_type: str, should_use_scan_service: bool = False) -> str: ... + def get_service_name(self, scan_type: str) -> str: ... @staticmethod def get_async_scan_type(scan_type: str) -> str: @@ -28,32 +28,16 @@ def get_detections_prefix(self) -> str: ... class DevScanConfig(ScanConfigBase): - def get_service_name(self, scan_type: str, should_use_scan_service: bool = False) -> str: - if should_use_scan_service: - return '5004' - if scan_type == consts.SECRET_SCAN_TYPE: - return '5025' - if scan_type == consts.IAC_SCAN_TYPE: - return '5026' - - # sca and sast - return '5004' + def get_service_name(self, scan_type: str) -> str: + return '5004' # scan service def get_detections_prefix(self) -> str: - return '5016' + return '5016' # detections service class DefaultScanConfig(ScanConfigBase): - def get_service_name(self, scan_type: str, should_use_scan_service: bool = False) -> str: - if should_use_scan_service: - return 'scans' - if scan_type == consts.SECRET_SCAN_TYPE: - return 'secret' - if scan_type == consts.IAC_SCAN_TYPE: - return 'iac' - - # sca and sast - return 'scans' + def get_service_name(self, scan_type: str) -> str: + return 'scans' # scan service def get_detections_prefix(self) -> str: return 'detections' diff --git a/tests/cli/commands/test_main_command.py b/tests/cli/commands/test_main_command.py index ba791f2e..db8fe86b 100644 --- a/tests/cli/commands/test_main_command.py +++ b/tests/cli/commands/test_main_command.py @@ -11,8 +11,7 @@ from cycode.cli.cli_types import OutputTypeOption from cycode.cli.utils.git_proxy import git_proxy from tests.conftest import CLI_ENV_VARS, TEST_FILES_PATH, ZIP_CONTENT_PATH -from tests.cyclient.mocked_responses.scan_client import mock_scan_responses -from tests.cyclient.test_scan_client import get_zipped_file_scan_response, get_zipped_file_scan_url +from tests.cyclient.mocked_responses.scan_client import mock_scan_async_responses _PATH_TO_SCAN = TEST_FILES_PATH.joinpath('zip_content').absolute() @@ -34,12 +33,12 @@ def test_passing_output_option(output: str, scan_client: 'ScanClient', api_token scan_type = consts.SECRET_SCAN_TYPE scan_id = uuid4() - mock_scan_responses(responses, scan_type, scan_client, scan_id, ZIP_CONTENT_PATH) - responses.add(get_zipped_file_scan_response(get_zipped_file_scan_url(scan_type, scan_client), ZIP_CONTENT_PATH)) + mock_scan_async_responses(responses, scan_type, scan_client, scan_id, ZIP_CONTENT_PATH) responses.add(api_token_response) args = ['--output', output, 'scan', '--soft-fail', 'path', str(_PATH_TO_SCAN)] - result = CliRunner().invoke(app, args, env=CLI_ENV_VARS) + env = {'PYTEST_TEST_UNIQUE_ID': str(scan_id), **CLI_ENV_VARS} + result = CliRunner().invoke(app, args, env=env) except_json = output == 'json' @@ -54,10 +53,7 @@ def test_passing_output_option(output: str, scan_client: 'ScanClient', api_token @responses.activate def test_optional_git_with_path_scan(scan_client: 'ScanClient', api_token_response: responses.Response) -> None: - mock_scan_responses(responses, consts.SECRET_SCAN_TYPE, scan_client, uuid4(), ZIP_CONTENT_PATH) - responses.add( - get_zipped_file_scan_response(get_zipped_file_scan_url(consts.SECRET_SCAN_TYPE, scan_client), ZIP_CONTENT_PATH) - ) + mock_scan_async_responses(responses, consts.SECRET_SCAN_TYPE, scan_client, uuid4(), ZIP_CONTENT_PATH) responses.add(api_token_response) # fake env without Git executable diff --git a/tests/cyclient/mocked_responses/scan_client.py b/tests/cyclient/mocked_responses/scan_client.py index 87643001..1726e74c 100644 --- a/tests/cyclient/mocked_responses/scan_client.py +++ b/tests/cyclient/mocked_responses/scan_client.py @@ -9,53 +9,6 @@ from tests.conftest import MOCKED_RESPONSES_PATH -def get_zipped_file_scan_url(scan_type: str, scan_client: ScanClient) -> str: - api_url = scan_client.scan_cycode_client.api_url - service_url = scan_client.get_zipped_file_scan_url_path(scan_type) - return f'{api_url}/{service_url}' - - -def get_zipped_file_scan_response( - url: str, zip_content_path: Path, scan_id: Optional[UUID] = None -) -> responses.Response: - if not scan_id: - scan_id = uuid4() - - json_response = { - 'did_detect': True, - 'scan_id': str(scan_id), # not always as expected due to _get_scan_id and passing scan_id to cxt of CLI - 'detections_per_file': [ - { - 'file_name': str(zip_content_path.joinpath('secrets.py')), - 'commit_id': None, - 'detections': [ - { - 'detection_type_id': '12345678-418f-47ee-abb0-012345678901', - 'detection_rule_id': '12345678-aea1-4304-a6e9-012345678901', - 'message': "Secret of type 'Slack Token' was found in filename 'secrets.py'", - 'type': 'slack-token', - 'is_research': False, - 'detection_details': { - 'sha512': 'sha hash', - 'length': 55, - 'start_position': 19, - 'line': 0, - 'committed_at': '0001-01-01T00:00:00+00:00', - 'file_path': str(zip_content_path), - 'file_name': 'secrets.py', - 'file_extension': '.py', - 'should_resolve_upon_branch_deletion': False, - }, - } - ], - } - ], - 'report_url': None, - } - - return responses.Response(method=responses.POST, url=url, json=json_response, status=200) - - def get_zipped_file_scan_async_url(scan_type: str, scan_client: ScanClient) -> str: api_url = scan_client.scan_cycode_client.api_url service_url = scan_client.get_zipped_file_scan_async_url_path(scan_type) @@ -73,15 +26,9 @@ def get_zipped_file_scan_async_response(url: str, scan_id: Optional[UUID] = None return responses.Response(method=responses.POST, url=url, json=json_response, status=200) -def get_scan_details_url(scan_id: Optional[UUID], scan_client: ScanClient) -> str: - api_url = scan_client.scan_cycode_client.api_url - service_url = scan_client.get_scan_details_path(str(scan_id)) - return f'{api_url}/{service_url}' - - -def get_scan_report_url(scan_id: Optional[UUID], scan_client: ScanClient, scan_type: str) -> str: +def get_scan_details_url(scan_type: str, scan_id: Optional[UUID], scan_client: ScanClient) -> str: api_url = scan_client.scan_cycode_client.api_url - service_url = scan_client.get_scan_report_url_path(str(scan_id), scan_type) + service_url = scan_client.get_scan_details_path(scan_type, str(scan_id)) return f'{api_url}/{service_url}' @@ -91,14 +38,6 @@ def get_scan_aggregation_report_url(aggregation_id: Optional[UUID], scan_client: return f'{api_url}/{service_url}' -def get_scan_report_url_response(url: str, scan_id: Optional[UUID] = None) -> responses.Response: - if not scan_id: - scan_id = uuid4() - json_response = {'report_url': f'https://app.domain/on-demand-scans/{scan_id}'} - - return responses.Response(method=responses.GET, url=url, json=json_response, status=200) - - def get_scan_aggregation_report_url_response(url: str, aggregation_id: Optional[UUID] = None) -> responses.Response: if not aggregation_id: aggregation_id = uuid4() @@ -135,10 +74,10 @@ def get_scan_details_response(url: str, scan_id: Optional[UUID] = None) -> respo return responses.Response(method=responses.GET, url=url, json=json_response, status=200) -def get_scan_detections_url(scan_client: ScanClient, scan_type: str) -> str: +def get_scan_detections_url(scan_client: ScanClient) -> str: api_url = scan_client.scan_cycode_client.api_url - service_url = scan_client.get_scan_detections_path(scan_type) - return f'{api_url}/{service_url}' + path = scan_client.get_scan_detections_list_path() + return f'{api_url}/{path}' def get_scan_detections_response(url: str, scan_id: UUID, zip_content_path: Path) -> responses.Response: @@ -181,20 +120,7 @@ def mock_scan_async_responses( responses_module.add( get_zipped_file_scan_async_response(get_zipped_file_scan_async_url(scan_type, scan_client), scan_id) ) - responses_module.add(get_scan_details_response(get_scan_details_url(scan_id, scan_client), scan_id)) - responses_module.add(get_detection_rules_response(get_detection_rules_url(scan_client))) - responses_module.add( - get_scan_detections_response(get_scan_detections_url(scan_client, scan_type), scan_id, zip_content_path) - ) - responses_module.add(get_report_scan_status_response(get_report_scan_status_url(scan_type, scan_id, scan_client))) - - -def mock_scan_responses( - responses_module: responses, scan_type: str, scan_client: ScanClient, scan_id: UUID, zip_content_path: Path -) -> None: - responses_module.add( - get_zipped_file_scan_response(get_zipped_file_scan_url(scan_type, scan_client), zip_content_path) - ) + responses_module.add(get_scan_details_response(get_scan_details_url(scan_type, scan_id, scan_client), scan_id)) responses_module.add(get_detection_rules_response(get_detection_rules_url(scan_client))) + responses_module.add(get_scan_detections_response(get_scan_detections_url(scan_client), scan_id, zip_content_path)) responses_module.add(get_report_scan_status_response(get_report_scan_status_url(scan_type, scan_id, scan_client))) - responses_module.add(get_scan_report_url_response(get_scan_report_url(scan_id, scan_client, scan_type))) diff --git a/tests/cyclient/scan_config/test_default_scan_config.py b/tests/cyclient/scan_config/test_default_scan_config.py index 7371250c..987c6c78 100644 --- a/tests/cyclient/scan_config/test_default_scan_config.py +++ b/tests/cyclient/scan_config/test_default_scan_config.py @@ -5,11 +5,10 @@ def test_get_service_name() -> None: default_scan_config = DefaultScanConfig() - assert default_scan_config.get_service_name(consts.SECRET_SCAN_TYPE) == 'secret' - assert default_scan_config.get_service_name(consts.IAC_SCAN_TYPE) == 'iac' + assert default_scan_config.get_service_name(consts.SECRET_SCAN_TYPE) == 'scans' + assert default_scan_config.get_service_name(consts.IAC_SCAN_TYPE) == 'scans' assert default_scan_config.get_service_name(consts.SCA_SCAN_TYPE) == 'scans' assert default_scan_config.get_service_name(consts.SAST_SCAN_TYPE) == 'scans' - assert default_scan_config.get_service_name(consts.SECRET_SCAN_TYPE, True) == 'scans' def test_get_detections_prefix() -> None: diff --git a/tests/cyclient/scan_config/test_dev_scan_config.py b/tests/cyclient/scan_config/test_dev_scan_config.py index 6ebb368b..f1cd484c 100644 --- a/tests/cyclient/scan_config/test_dev_scan_config.py +++ b/tests/cyclient/scan_config/test_dev_scan_config.py @@ -5,11 +5,10 @@ def test_get_service_name() -> None: dev_scan_config = DevScanConfig() - assert dev_scan_config.get_service_name(consts.SECRET_SCAN_TYPE) == '5025' - assert dev_scan_config.get_service_name(consts.IAC_SCAN_TYPE) == '5026' + assert dev_scan_config.get_service_name(consts.SECRET_SCAN_TYPE) == '5004' + assert dev_scan_config.get_service_name(consts.IAC_SCAN_TYPE) == '5004' assert dev_scan_config.get_service_name(consts.SCA_SCAN_TYPE) == '5004' assert dev_scan_config.get_service_name(consts.SAST_SCAN_TYPE) == '5004' - assert dev_scan_config.get_service_name(consts.SECRET_SCAN_TYPE, should_use_scan_service=True) == '5004' def test_get_detections_prefix() -> None: diff --git a/tests/cyclient/test_scan_client.py b/tests/cyclient/test_scan_client.py index a1c0d151..b3afbc18 100644 --- a/tests/cyclient/test_scan_client.py +++ b/tests/cyclient/test_scan_client.py @@ -1,167 +1,26 @@ -import os -from typing import List, Tuple from uuid import uuid4 import pytest -import requests import responses -from requests import Timeout -from requests.exceptions import ProxyError -from cycode.cli import consts from cycode.cli.cli_types import ScanTypeOption -from cycode.cli.exceptions.custom_exceptions import ( - CycodeError, - HttpUnauthorizedError, - RequestConnectionError, - RequestTimeout, -) -from cycode.cli.files_collector.models.in_memory_zip import InMemoryZip -from cycode.cli.files_collector.zip_documents import zip_documents -from cycode.cli.models import Document from cycode.cyclient.scan_client import ScanClient -from tests.conftest import ZIP_CONTENT_PATH from tests.cyclient.mocked_responses.scan_client import ( - get_scan_report_url, - get_scan_report_url_response, - get_zipped_file_scan_response, - get_zipped_file_scan_url, + get_scan_aggregation_report_url, + get_scan_aggregation_report_url_response, ) -def zip_scan_resources(scan_type: ScanTypeOption, scan_client: ScanClient) -> Tuple[str, InMemoryZip]: - url = get_zipped_file_scan_url(scan_type, scan_client) - zip_file = get_test_zip_file(scan_type) - - return url, zip_file - - -def get_test_zip_file(scan_type: ScanTypeOption) -> InMemoryZip: - # TODO(MarshalX): refactor scan_disk_files in code_scanner.py to reuse method here instead of this - test_documents: List[Document] = [] - for root, _, files in os.walk(ZIP_CONTENT_PATH): - for name in files: - path = os.path.join(root, name) - with open(path, 'r', encoding='UTF-8') as f: - test_documents.append(Document(path, f.read(), is_git_diff_format=False)) - - return zip_documents(scan_type, test_documents) - - -def test_get_service_name(scan_client: ScanClient) -> None: - # TODO(MarshalX): get_service_name should be removed from ScanClient? Because it exists in ScanConfig - assert scan_client.get_service_name(consts.SECRET_SCAN_TYPE) == 'secret' - assert scan_client.get_service_name(consts.IAC_SCAN_TYPE) == 'iac' - assert scan_client.get_service_name(consts.SCA_SCAN_TYPE) == 'scans' - assert scan_client.get_service_name(consts.SAST_SCAN_TYPE) == 'scans' - - -@pytest.mark.parametrize('scan_type', list(ScanTypeOption)) -@responses.activate -def test_zipped_file_scan( - scan_type: ScanTypeOption, scan_client: ScanClient, api_token_response: responses.Response -) -> None: - url, zip_file = zip_scan_resources(scan_type, scan_client) - expected_scan_id = uuid4() - - responses.add(api_token_response) # mock token based client - responses.add(get_zipped_file_scan_response(url, ZIP_CONTENT_PATH, expected_scan_id)) - - zipped_file_scan_response = scan_client.zipped_file_scan( - scan_type, zip_file, scan_id=str(expected_scan_id), scan_parameters={} - ) - assert zipped_file_scan_response.scan_id == str(expected_scan_id) - - @pytest.mark.parametrize('scan_type', list(ScanTypeOption)) @responses.activate def test_get_scan_report_url( scan_type: ScanTypeOption, scan_client: ScanClient, api_token_response: responses.Response ) -> None: - scan_id = uuid4() - url = get_scan_report_url(scan_id, scan_client, scan_type) - - responses.add(api_token_response) # mock token based client - responses.add(get_scan_report_url_response(url, scan_id)) - - scan_report_url_response = scan_client.get_scan_report_url(str(scan_id), scan_type) - assert scan_report_url_response.report_url == 'https://app.domain/on-demand-scans/{scan_id}'.format(scan_id=scan_id) - - -@pytest.mark.parametrize('scan_type', list(ScanTypeOption)) -@responses.activate -def test_zipped_file_scan_unauthorized_error( - scan_type: ScanTypeOption, scan_client: ScanClient, api_token_response: responses.Response -) -> None: - url, zip_file = zip_scan_resources(scan_type, scan_client) - expected_scan_id = uuid4().hex - - responses.add(api_token_response) # mock token based client - responses.add(method=responses.POST, url=url, status=401) - - with pytest.raises(HttpUnauthorizedError) as e_info: - scan_client.zipped_file_scan(scan_type, zip_file, scan_id=expected_scan_id, scan_parameters={}) - - assert e_info.value.status_code == 401 - - -@pytest.mark.parametrize('scan_type', list(ScanTypeOption)) -@responses.activate -def test_zipped_file_scan_bad_request_error( - scan_type: ScanTypeOption, scan_client: ScanClient, api_token_response: responses.Response -) -> None: - url, zip_file = zip_scan_resources(scan_type, scan_client) - expected_scan_id = uuid4().hex - - expected_status_code = 400 - expected_response_text = 'Bad Request' - - responses.add(api_token_response) # mock token based client - responses.add(method=responses.POST, url=url, status=expected_status_code, body=expected_response_text) - - with pytest.raises(CycodeError) as e_info: - scan_client.zipped_file_scan(scan_type, zip_file, scan_id=expected_scan_id, scan_parameters={}) - - assert e_info.value.status_code == expected_status_code - assert e_info.value.error_message == expected_response_text - - -@pytest.mark.parametrize('scan_type', list(ScanTypeOption)) -@responses.activate -def test_zipped_file_scan_timeout_error( - scan_type: ScanTypeOption, scan_client: ScanClient, api_token_response: responses.Response -) -> None: - scan_url, zip_file = zip_scan_resources(scan_type, scan_client) - expected_scan_id = uuid4().hex - - responses.add(responses.POST, scan_url, status=504) - - timeout_response = requests.post(scan_url, timeout=5) - if timeout_response.status_code == 504: - """bypass SAST""" - - responses.reset() - - timeout_error = Timeout() - timeout_error.response = timeout_response - - responses.add(api_token_response) # mock token based client - responses.add(method=responses.POST, url=scan_url, body=timeout_error, status=504) - - with pytest.raises(RequestTimeout): - scan_client.zipped_file_scan(scan_type, zip_file, scan_id=expected_scan_id, scan_parameters={}) - - -@pytest.mark.parametrize('scan_type', list(ScanTypeOption)) -@responses.activate -def test_zipped_file_scan_connection_error( - scan_type: ScanTypeOption, scan_client: ScanClient, api_token_response: responses.Response -) -> None: - url, zip_file = zip_scan_resources(scan_type, scan_client) - expected_scan_id = uuid4().hex + aggregation_id = uuid4() + url = get_scan_aggregation_report_url(aggregation_id, scan_client, scan_type) responses.add(api_token_response) # mock token based client - responses.add(method=responses.POST, url=url, body=ProxyError()) + responses.add(get_scan_aggregation_report_url_response(url, aggregation_id)) - with pytest.raises(RequestConnectionError): - scan_client.zipped_file_scan(scan_type, zip_file, scan_id=expected_scan_id, scan_parameters={}) + scan_report_url_response = scan_client.get_scan_aggregation_report_url(str(aggregation_id), scan_type) + assert scan_report_url_response.report_url == f'https://app.domain/cli-logs-aggregation/{aggregation_id}' diff --git a/tests/test_code_scanner.py b/tests/test_code_scanner.py index d16aad82..9ef09123 100644 --- a/tests/test_code_scanner.py +++ b/tests/test_code_scanner.py @@ -7,7 +7,6 @@ from cycode.cli import consts from cycode.cli.apps.scan.code_scanner import ( _try_get_aggregation_report_url_if_needed, - _try_get_report_url_if_needed, ) from cycode.cli.cli_types import ScanTypeOption from cycode.cli.files_collector.excluder import _is_relevant_file_to_scan @@ -16,8 +15,6 @@ from tests.cyclient.mocked_responses.scan_client import ( get_scan_aggregation_report_url, get_scan_aggregation_report_url_response, - get_scan_report_url, - get_scan_report_url_response, ) @@ -26,28 +23,6 @@ def test_is_relevant_file_to_scan_sca() -> None: assert _is_relevant_file_to_scan(consts.SCA_SCAN_TYPE, path) is True -@pytest.mark.parametrize('scan_type', list(ScanTypeOption)) -def test_try_get_report_url_if_needed_return_none(scan_type: ScanTypeOption, scan_client: ScanClient) -> None: - scan_id = uuid4().hex - result = _try_get_report_url_if_needed(scan_client, scan_id, consts.SECRET_SCAN_TYPE, scan_parameters={}) - assert result is None - - -@pytest.mark.parametrize('scan_type', list(ScanTypeOption)) -@responses.activate -def test_try_get_report_url_if_needed_return_result( - scan_type: ScanTypeOption, scan_client: ScanClient, api_token_response: responses.Response -) -> None: - scan_id = uuid4() - url = get_scan_report_url(scan_id, scan_client, scan_type) - responses.add(api_token_response) # mock token based client - responses.add(get_scan_report_url_response(url, scan_id)) - - scan_report_url_response = scan_client.get_scan_report_url(str(scan_id), scan_type) - result = _try_get_report_url_if_needed(scan_client, str(scan_id), scan_type, scan_parameters={'report': True}) - assert result == scan_report_url_response.report_url - - @pytest.mark.parametrize('scan_type', list(ScanTypeOption)) def test_try_get_aggregation_report_url_if_no_report_command_needed_return_none( scan_type: ScanTypeOption, scan_client: ScanClient From 3f0ecfc80f0d25bd0c20947def8a9b64bbad4746 Mon Sep 17 00:00:00 2001 From: Ilya Siamionau Date: Thu, 17 Apr 2025 21:52:11 +0200 Subject: [PATCH 2/4] add tests of the scan client using async flow --- tests/cyclient/test_scan_client.py | 159 +++++++++++++++++++++++++++-- 1 file changed, 152 insertions(+), 7 deletions(-) diff --git a/tests/cyclient/test_scan_client.py b/tests/cyclient/test_scan_client.py index b3afbc18..d58344fe 100644 --- a/tests/cyclient/test_scan_client.py +++ b/tests/cyclient/test_scan_client.py @@ -1,26 +1,171 @@ +import os +from typing import List, Tuple from uuid import uuid4 import pytest +import requests import responses +from requests.exceptions import ConnectionError as RequestsConnectionError from cycode.cli.cli_types import ScanTypeOption +from cycode.cli.exceptions.custom_exceptions import ( + HttpUnauthorizedError, + RequestConnectionError, + RequestHttpError, + RequestTimeout, +) +from cycode.cli.files_collector.models.in_memory_zip import InMemoryZip +from cycode.cli.models import Document from cycode.cyclient.scan_client import ScanClient +from tests.conftest import ZIP_CONTENT_PATH from tests.cyclient.mocked_responses.scan_client import ( get_scan_aggregation_report_url, get_scan_aggregation_report_url_response, + get_scan_details_response, + get_scan_details_url, + get_zipped_file_scan_async_response, + get_zipped_file_scan_async_url, ) +def zip_scan_resources(scan_type: str, scan_client: ScanClient) -> Tuple[str, InMemoryZip]: + url = get_zipped_file_scan_async_url(scan_type, scan_client) + zip_file = get_test_zip_file(scan_type) + + return url, zip_file + + +def get_test_zip_file(scan_type: str) -> InMemoryZip: + # TODO(MarshalX): refactor scan_disk_files in code_scanner.py to reuse method here instead of this + test_documents: List[Document] = [] + for root, _, files in os.walk(ZIP_CONTENT_PATH): + for name in files: + path = os.path.join(root, name) + with open(path, 'r', encoding='UTF-8') as f: + test_documents.append(Document(path, f.read(), is_git_diff_format=False)) + + from cycode.cli.files_collector.zip_documents import zip_documents + + return zip_documents(scan_type, test_documents) + + +@pytest.mark.parametrize('scan_type', list(ScanTypeOption)) +@responses.activate +def test_zipped_file_scan_async( + scan_type: str, scan_client: ScanClient, api_token_response: responses.Response +) -> None: + """Test the zipped_file_scan_async method for the async flow.""" + url, zip_file = zip_scan_resources(scan_type, scan_client) + expected_scan_id = uuid4() + + responses.add(api_token_response) # mock token based client + responses.add(get_zipped_file_scan_async_response(url, expected_scan_id)) + + # Call the method with the correct parameter order + scan_initialization_response = scan_client.zipped_file_scan_async( + zip_file=zip_file, scan_type=scan_type, scan_parameters={} + ) + assert scan_initialization_response.scan_id == str(expected_scan_id) + + +@pytest.mark.parametrize('scan_type', list(ScanTypeOption)) +@responses.activate +def test_get_scan_report_url(scan_type: str, scan_client: ScanClient, api_token_response: responses.Response) -> None: + """Test getting the scan report URL for the async flow.""" + scan_id = uuid4() + url = get_scan_aggregation_report_url(scan_id, scan_client, scan_type) + + responses.add(api_token_response) # mock token based client + responses.add(get_scan_aggregation_report_url_response(url, scan_id)) + + scan_report_url_response = scan_client.get_scan_aggregation_report_url(str(scan_id), scan_type) + assert scan_report_url_response.report_url == f'https://app.domain/cli-logs-aggregation/{scan_id}' + + @pytest.mark.parametrize('scan_type', list(ScanTypeOption)) @responses.activate -def test_get_scan_report_url( - scan_type: ScanTypeOption, scan_client: ScanClient, api_token_response: responses.Response +def test_zipped_file_scan_async_unauthorized_error( + scan_type: str, scan_client: ScanClient, api_token_response: responses.Response ) -> None: - aggregation_id = uuid4() - url = get_scan_aggregation_report_url(aggregation_id, scan_client, scan_type) + """Test handling of unauthorized errors in the async flow.""" + url, zip_file = zip_scan_resources(scan_type, scan_client) + + responses.add(api_token_response) # mock token based client + responses.add(method=responses.POST, url=url, status=401, body='Unauthorized') + + with pytest.raises(HttpUnauthorizedError) as e_info: + scan_client.zipped_file_scan_async(zip_file=zip_file, scan_type=scan_type, scan_parameters={}) + + assert e_info.value.status_code == 401 + + +@pytest.mark.parametrize('scan_type', list(ScanTypeOption)) +@responses.activate +def test_zipped_file_scan_async_bad_request_error( + scan_type: str, scan_client: ScanClient, api_token_response: responses.Response +) -> None: + """Test handling of bad request errors in the async flow.""" + url, zip_file = zip_scan_resources(scan_type, scan_client) + + expected_status_code = 400 + expected_response_text = 'Bad Request' + + responses.add(api_token_response) # mock token based client + responses.add(method=responses.POST, url=url, status=expected_status_code, body=expected_response_text) + + with pytest.raises(RequestHttpError) as e_info: + scan_client.zipped_file_scan_async(zip_file=zip_file, scan_type=scan_type, scan_parameters={}) + + assert e_info.value.status_code == expected_status_code + assert e_info.value.error_message == expected_response_text + + +@pytest.mark.parametrize('scan_type', list(ScanTypeOption)) +@responses.activate +def test_zipped_file_scan_async_timeout_error( + scan_type: str, scan_client: ScanClient, api_token_response: responses.Response +) -> None: + """Test handling of timeout errors in the async flow.""" + url, zip_file = zip_scan_resources(scan_type, scan_client) + + # Create a timeout response + timeout_error = requests.exceptions.Timeout('Connection timed out') + + responses.add(api_token_response) # mock token based client + responses.add(method=responses.POST, url=url, body=timeout_error) + + with pytest.raises(RequestTimeout): + scan_client.zipped_file_scan_async(zip_file=zip_file, scan_type=scan_type, scan_parameters={}) + + +@pytest.mark.parametrize('scan_type', list(ScanTypeOption)) +@responses.activate +def test_zipped_file_scan_async_connection_error( + scan_type: str, scan_client: ScanClient, api_token_response: responses.Response +) -> None: + """Test handling of connection errors in the async flow.""" + url, zip_file = zip_scan_resources(scan_type, scan_client) + + # Create a connection error response + connection_error = RequestsConnectionError('Connection refused') + + responses.add(api_token_response) # mock token based client + responses.add(method=responses.POST, url=url, body=connection_error) + + with pytest.raises(RequestConnectionError): + scan_client.zipped_file_scan_async(zip_file=zip_file, scan_type=scan_type, scan_parameters={}) + + +@pytest.mark.parametrize('scan_type', list(ScanTypeOption)) +@responses.activate +def test_get_scan_details(scan_type: str, scan_client: ScanClient, api_token_response: responses.Response) -> None: + """Test getting scan details in the async flow.""" + scan_id = uuid4() + url = get_scan_details_url(scan_type, scan_id, scan_client) responses.add(api_token_response) # mock token based client - responses.add(get_scan_aggregation_report_url_response(url, aggregation_id)) + responses.add(get_scan_details_response(url, scan_id)) - scan_report_url_response = scan_client.get_scan_aggregation_report_url(str(aggregation_id), scan_type) - assert scan_report_url_response.report_url == f'https://app.domain/cli-logs-aggregation/{aggregation_id}' + scan_details_response = scan_client.get_scan_details(scan_type, str(scan_id)) + assert scan_details_response.id == str(scan_id) + assert scan_details_response.scan_status == 'Completed' From e39e449640e2b9802db499f6a46f57987ae12b49 Mon Sep 17 00:00:00 2001 From: Ilya Siamionau Date: Thu, 17 Apr 2025 21:57:29 +0200 Subject: [PATCH 3/4] align tests with prev code base --- tests/cyclient/test_scan_client.py | 32 +++++++++++++++--------------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/tests/cyclient/test_scan_client.py b/tests/cyclient/test_scan_client.py index d58344fe..d81116fb 100644 --- a/tests/cyclient/test_scan_client.py +++ b/tests/cyclient/test_scan_client.py @@ -9,9 +9,9 @@ from cycode.cli.cli_types import ScanTypeOption from cycode.cli.exceptions.custom_exceptions import ( + CycodeError, HttpUnauthorizedError, RequestConnectionError, - RequestHttpError, RequestTimeout, ) from cycode.cli.files_collector.models.in_memory_zip import InMemoryZip @@ -28,14 +28,14 @@ ) -def zip_scan_resources(scan_type: str, scan_client: ScanClient) -> Tuple[str, InMemoryZip]: +def zip_scan_resources(scan_type: ScanTypeOption, scan_client: ScanClient) -> Tuple[str, InMemoryZip]: url = get_zipped_file_scan_async_url(scan_type, scan_client) zip_file = get_test_zip_file(scan_type) return url, zip_file -def get_test_zip_file(scan_type: str) -> InMemoryZip: +def get_test_zip_file(scan_type: ScanTypeOption) -> InMemoryZip: # TODO(MarshalX): refactor scan_disk_files in code_scanner.py to reuse method here instead of this test_documents: List[Document] = [] for root, _, files in os.walk(ZIP_CONTENT_PATH): @@ -52,7 +52,7 @@ def get_test_zip_file(scan_type: str) -> InMemoryZip: @pytest.mark.parametrize('scan_type', list(ScanTypeOption)) @responses.activate def test_zipped_file_scan_async( - scan_type: str, scan_client: ScanClient, api_token_response: responses.Response + scan_type: ScanTypeOption, scan_client: ScanClient, api_token_response: responses.Response ) -> None: """Test the zipped_file_scan_async method for the async flow.""" url, zip_file = zip_scan_resources(scan_type, scan_client) @@ -61,16 +61,15 @@ def test_zipped_file_scan_async( responses.add(api_token_response) # mock token based client responses.add(get_zipped_file_scan_async_response(url, expected_scan_id)) - # Call the method with the correct parameter order - scan_initialization_response = scan_client.zipped_file_scan_async( - zip_file=zip_file, scan_type=scan_type, scan_parameters={} - ) + scan_initialization_response = scan_client.zipped_file_scan_async(zip_file, scan_type, scan_parameters={}) assert scan_initialization_response.scan_id == str(expected_scan_id) @pytest.mark.parametrize('scan_type', list(ScanTypeOption)) @responses.activate -def test_get_scan_report_url(scan_type: str, scan_client: ScanClient, api_token_response: responses.Response) -> None: +def test_get_scan_report_url( + scan_type: ScanTypeOption, scan_client: ScanClient, api_token_response: responses.Response +) -> None: """Test getting the scan report URL for the async flow.""" scan_id = uuid4() url = get_scan_aggregation_report_url(scan_id, scan_client, scan_type) @@ -85,7 +84,7 @@ def test_get_scan_report_url(scan_type: str, scan_client: ScanClient, api_token_ @pytest.mark.parametrize('scan_type', list(ScanTypeOption)) @responses.activate def test_zipped_file_scan_async_unauthorized_error( - scan_type: str, scan_client: ScanClient, api_token_response: responses.Response + scan_type: ScanTypeOption, scan_client: ScanClient, api_token_response: responses.Response ) -> None: """Test handling of unauthorized errors in the async flow.""" url, zip_file = zip_scan_resources(scan_type, scan_client) @@ -102,7 +101,7 @@ def test_zipped_file_scan_async_unauthorized_error( @pytest.mark.parametrize('scan_type', list(ScanTypeOption)) @responses.activate def test_zipped_file_scan_async_bad_request_error( - scan_type: str, scan_client: ScanClient, api_token_response: responses.Response + scan_type: ScanTypeOption, scan_client: ScanClient, api_token_response: responses.Response ) -> None: """Test handling of bad request errors in the async flow.""" url, zip_file = zip_scan_resources(scan_type, scan_client) @@ -113,7 +112,7 @@ def test_zipped_file_scan_async_bad_request_error( responses.add(api_token_response) # mock token based client responses.add(method=responses.POST, url=url, status=expected_status_code, body=expected_response_text) - with pytest.raises(RequestHttpError) as e_info: + with pytest.raises(CycodeError) as e_info: scan_client.zipped_file_scan_async(zip_file=zip_file, scan_type=scan_type, scan_parameters={}) assert e_info.value.status_code == expected_status_code @@ -123,12 +122,11 @@ def test_zipped_file_scan_async_bad_request_error( @pytest.mark.parametrize('scan_type', list(ScanTypeOption)) @responses.activate def test_zipped_file_scan_async_timeout_error( - scan_type: str, scan_client: ScanClient, api_token_response: responses.Response + scan_type: ScanTypeOption, scan_client: ScanClient, api_token_response: responses.Response ) -> None: """Test handling of timeout errors in the async flow.""" url, zip_file = zip_scan_resources(scan_type, scan_client) - # Create a timeout response timeout_error = requests.exceptions.Timeout('Connection timed out') responses.add(api_token_response) # mock token based client @@ -141,7 +139,7 @@ def test_zipped_file_scan_async_timeout_error( @pytest.mark.parametrize('scan_type', list(ScanTypeOption)) @responses.activate def test_zipped_file_scan_async_connection_error( - scan_type: str, scan_client: ScanClient, api_token_response: responses.Response + scan_type: ScanTypeOption, scan_client: ScanClient, api_token_response: responses.Response ) -> None: """Test handling of connection errors in the async flow.""" url, zip_file = zip_scan_resources(scan_type, scan_client) @@ -158,7 +156,9 @@ def test_zipped_file_scan_async_connection_error( @pytest.mark.parametrize('scan_type', list(ScanTypeOption)) @responses.activate -def test_get_scan_details(scan_type: str, scan_client: ScanClient, api_token_response: responses.Response) -> None: +def test_get_scan_details( + scan_type: ScanTypeOption, scan_client: ScanClient, api_token_response: responses.Response +) -> None: """Test getting scan details in the async flow.""" scan_id = uuid4() url = get_scan_details_url(scan_type, scan_id, scan_client) From 72407b02e0cc0516a2218cadac357233a5d73fc0 Mon Sep 17 00:00:00 2001 From: Ilya Siamionau Date: Thu, 17 Apr 2025 22:02:09 +0200 Subject: [PATCH 4/4] make _should_use_sync_flow silent --- cycode/cli/apps/scan/code_scanner.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/cycode/cli/apps/scan/code_scanner.py b/cycode/cli/apps/scan/code_scanner.py index fcaec7a5..67185dce 100644 --- a/cycode/cli/apps/scan/code_scanner.py +++ b/cycode/cli/apps/scan/code_scanner.py @@ -103,6 +103,9 @@ def set_issue_detected_by_scan_results(ctx: typer.Context, scan_results: List[Lo def _should_use_sync_flow(command_scan_type: str, scan_type: str, sync_option: bool) -> bool: """Decide whether to use sync flow or async flow for the scan. + Note: + Passing `--sync` option does not mean that sync flow will be used in all cases. + The logic: - for IAC scan, sync flow is always used - for SAST scan, sync flow is not supported @@ -112,7 +115,7 @@ def _should_use_sync_flow(command_scan_type: str, scan_type: str, sync_option: b return False if command_scan_type not in {'path', 'repository'}: - raise ValueError(f'Sync flow is not available for "{command_scan_type}" command type. Remove --sync option.') + return False if scan_type == consts.IAC_SCAN_TYPE: # sync in the only available flow for IAC scan; we do not use detector directly anymore