diff --git a/src/azure-cli-core/azure/cli/core/commands/__init__.py b/src/azure-cli-core/azure/cli/core/commands/__init__.py index fb2a9a3dece..e90ac3d6ff5 100644 --- a/src/azure-cli-core/azure/cli/core/commands/__init__.py +++ b/src/azure-cli-core/azure/cli/core/commands/__init__.py @@ -506,6 +506,7 @@ class AzCliCommandInvoker(CommandInvoker): # pylint: disable=too-many-statements,too-many-locals,too-many-branches def execute(self, args): + args_copy = args[:] from knack.events import (EVENT_INVOKER_PRE_CMD_TBL_CREATE, EVENT_INVOKER_POST_CMD_TBL_CREATE, EVENT_INVOKER_CMD_TBL_LOADED, EVENT_INVOKER_PRE_PARSE_ARGS, EVENT_INVOKER_POST_PARSE_ARGS, @@ -586,7 +587,8 @@ def execute(self, args): args[0] = '--help' self.parser.enable_autocomplete() - + if '--what-if' in (args_copy): + return self._what_if(args_copy) self.cli_ctx.raise_event(EVENT_INVOKER_PRE_PARSE_ARGS, args=args) parsed_args = self.parser.parse_args(args) self.cli_ctx.raise_event(EVENT_INVOKER_POST_PARSE_ARGS, command=parsed_args.command, args=parsed_args) @@ -691,6 +693,48 @@ def execute(self, args): table_transformer=self.commands_loader.command_table[parsed_args.command].table_transformer, is_query_active=self.data['query_active']) + def _what_if(self, args): + # DEBUG: Add logging to see if this method is called + print(f"DEBUG: _what_if called with command: {args}") + if '--what-if' in args: + print("DEBUG: Entering what-if mode") + from azure.cli.core.what_if import show_what_if + try: + # Get subscription ID with priority: --subscription parameter > current login subscription + if '--subscription' in args: + index = args.index('--subscription') + if index + 1 < len(args): + subscription_value = args[index + 1] + subscription_id = subscription_value + else: + from azure.cli.core.commands.client_factory import get_subscription_id + subscription_id = get_subscription_id(self.cli_ctx) + print(f"DEBUG: Using current login subscription ID: {subscription_id}") + + args = ["az"] + args if args[0] != 'az' else args + command = " ".join(args) + what_if_result = show_what_if(self.cli_ctx, command, subscription_id=subscription_id) + + # Ensure output format is set for proper formatting + # Default to 'json' if not already set + if 'output' not in self.cli_ctx.invocation.data or self.cli_ctx.invocation.data['output'] is None: + self.cli_ctx.invocation.data['output'] = 'json' + + # Return the formatted what-if output as the result + # Similar to the normal flow in execute() method + return CommandResultItem( + what_if_result, + table_transformer=None, + is_query_active=self.data.get('query_active', False), + exit_code=0 + ) + except (CLIError, ValueError, KeyError) as ex: + # If what-if service fails, still show an informative message + return CommandResultItem(None, exit_code=1, + error=CLIError(f'What-if preview failed: {str(ex)}\n' + f'Note: This was a preview operation. ' + f'No actual changes were made.')) + @staticmethod def _extract_parameter_names(args): # note: name start with more than 2 '-' will be treated as value e.g. certs in PEM format diff --git a/src/azure-cli-core/azure/cli/core/commands/parameters.py b/src/azure-cli-core/azure/cli/core/commands/parameters.py index c098d1a42a1..c165dac37ba 100644 --- a/src/azure-cli-core/azure/cli/core/commands/parameters.py +++ b/src/azure-cli-core/azure/cli/core/commands/parameters.py @@ -268,6 +268,16 @@ def get_location_type(cli_ctx): return location_type +def get_what_if_type(): + what_if_type = CLIArgumentType( + options_list=['--what-if'], + help="Preview the changes that will be made without actually executing the command. " + "This will call the what-if service to compare the current state with the expected state after execution.", + is_preview=True + ) + return what_if_type + + deployment_name_type = CLIArgumentType( help=argparse.SUPPRESS, required=False, diff --git a/src/azure-cli-core/azure/cli/core/what_if.py b/src/azure-cli-core/azure/cli/core/what_if.py new file mode 100644 index 00000000000..1272fff2bea --- /dev/null +++ b/src/azure-cli-core/azure/cli/core/what_if.py @@ -0,0 +1,243 @@ +# -------------------------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for license information. +# -------------------------------------------------------------------------------------------- + +import threading +import time +import sys +import json +from requests import Request, Session +from knack.util import CLIError + + +def read_script_file(script_path): + try: + with open(script_path, 'r', encoding='utf-8') as f: + return f.read() + except FileNotFoundError: + raise CLIError(f"Script file not found: {script_path}") + except Exception as ex: + raise CLIError(f"Error reading script file: {ex}") + + +def _get_auth_headers(cli_ctx, subscription_id): + from azure.cli.core._profile import Profile + + resource = cli_ctx.cloud.endpoints.active_directory_resource_id + profile = Profile(cli_ctx=cli_ctx) + + try: + token_result = profile.get_raw_token(resource, subscription=subscription_id) + token_info, _, _ = token_result + token_type, token, _ = token_info + except Exception as token_ex: + raise CLIError(f"Failed to get authentication token: {token_ex}") + + return { + 'Authorization': f'{token_type} {token}', + 'Content-Type': 'application/json' + } + + +def _make_what_if_request(payload, headers_dict, cli_ctx=None): + request_completed = threading.Event() + + def _rotating_progress(): + """Simulate a rotating progress indicator.""" + spinner_chars = ["⠋", "⠙", "⠹", "⠸", "⠼", "⠴", "⠦", "⠧", "⠇", "⠏"] + fallback_chars = ["|", "/", "-", "\\"] + + try: + "⠋".encode(sys.stderr.encoding or 'utf-8') + chars = spinner_chars + except (UnicodeEncodeError, UnicodeDecodeError, LookupError): + chars = fallback_chars + + use_color = cli_ctx and getattr(cli_ctx, 'enable_color', False) + if use_color: + try: + CYAN = '\033[36m' + GREEN = '\033[32m' + YELLOW = '\033[33m' + BLUE = '\033[34m' + RESET = '\033[0m' + BOLD = '\033[1m' + except (UnicodeError, AttributeError): + use_color = False + + if not use_color: + CYAN = GREEN = YELLOW = BLUE = RESET = BOLD = '' + + idx = 0 + start_time = time.time() + + # Simulate different stages, can be improved with real stages if available + while not request_completed.is_set(): + elapsed = time.time() - start_time + if elapsed < 10: + status = f"{CYAN}Connecting to what-if service{RESET}" + spinner_color = CYAN + elif elapsed < 30: + status = f"{BLUE}Analyzing Azure CLI script{RESET}" + spinner_color = BLUE + elif elapsed < 60: + status = f"{YELLOW}Processing what-if analysis{RESET}" + spinner_color = YELLOW + else: + status = f"{GREEN}Finalizing results{RESET}" + spinner_color = GREEN + elapsed_str = f"{BOLD}({elapsed:.0f}s){RESET}" + spinner = f"{spinner_color}{chars[idx % len(chars)]}{RESET}" + progress_line = f"{spinner} {status}... {elapsed_str}" + sys.stderr.write(f"\033[2K\r{progress_line}") + sys.stderr.flush() + idx += 1 + time.sleep(0.12) + + sys.stderr.write("\033[2K\r") + sys.stderr.flush() + + try: + function_app_url = "https://azcli-script-insight.azurewebsites.net" + + progress_thread = threading.Thread(target=_rotating_progress) + progress_thread.daemon = True + progress_thread.start() + + session = Session() + req = Request(method="POST", url=f"{function_app_url}/api/what_if_cli_preview", + headers=headers_dict, data=json.dumps(payload)) + prepared = session.prepare_request(req) + response = session.send(prepared) + request_completed.set() + progress_thread.join(timeout=0.5) + + return response + + except Exception as ex: + request_completed.set() + if 'progress_thread' in locals(): + progress_thread.join(timeout=0.5) + raise CLIError(f"Failed to connect to the what-if service: {ex}") + + +def convert_json_to_what_if_result(what_if_json_result): + from azure.cli.command_modules.resource._formatters import _change_type_to_weight, _property_change_type_to_weight + from collections import namedtuple + + enum_keys = list(_change_type_to_weight.keys()) + enum_mapping = {} + for enum_obj in enum_keys: + str_repr = str(enum_obj).lower() + if 'create' in str_repr: + enum_mapping['Create'] = enum_obj + elif 'delete' in str_repr: + enum_mapping['Delete'] = enum_obj + elif 'modify' in str_repr: + enum_mapping['Modify'] = enum_obj + elif 'deploy' in str_repr: + enum_mapping['Deploy'] = enum_obj + elif 'no_change' in str_repr or 'nochange' in str_repr: + enum_mapping['NoChange'] = enum_obj + elif 'ignore' in str_repr: + enum_mapping['Ignore'] = enum_obj + elif 'unsupported' in str_repr: + enum_mapping['Unsupported'] = enum_obj + elif 'no_effect' in str_repr or 'noeffect' in str_repr: + enum_mapping['NoEffect'] = enum_obj + + property_enum_keys = list(_property_change_type_to_weight.keys()) + property_enum_mapping = {} + for enum_obj in property_enum_keys: + str_repr = str(enum_obj).lower() + if 'create' in str_repr: + property_enum_mapping['Create'] = enum_obj + elif 'delete' in str_repr: + property_enum_mapping['Delete'] = enum_obj + elif 'modify' in str_repr: + property_enum_mapping['Modify'] = enum_obj + elif 'array' in str_repr: + property_enum_mapping['Array'] = enum_obj + elif 'no_effect' in str_repr or 'noeffect' in str_repr: + property_enum_mapping['NoEffect'] = enum_obj + + WhatIfOperationResult = namedtuple('WhatIfOperationResult', ['changes', 'potential_changes', 'diagnostics']) + ResourceChange = namedtuple('ResourceChange', ['change_type', 'resource_id', 'before', 'after', 'delta']) + PropertyChange = namedtuple('PropertyChange', ['property_change_type', 'path', 'before', 'after', 'children']) + + def _map_change_type_string(change_type_str): + return enum_mapping.get(change_type_str) + + def _map_property_change_type_string(property_change_type_str): + return property_enum_mapping.get(property_change_type_str) + + def _create_property_change(change_data): + property_change_type = _map_property_change_type_string( + change_data.get('propertyChangeType', 'NoEffect')) + path = change_data.get('path', '') + before = change_data.get('before') + after = change_data.get('after') + + children = [] + children_data = change_data.get('children', []) + for child_data in children_data: + children.append(_create_property_change(child_data)) + + return PropertyChange(property_change_type, path, before, after, children) + + def _create_resource_change(change_data): + change_type = _map_change_type_string(change_data.get('changeType', 'Unknown')) + resource_id = change_data.get('resourceId', '') + before = change_data.get('before') + after = change_data.get('after') + + delta = [] + delta_data = change_data.get('delta', []) + for property_data in delta_data: + delta.append(_create_property_change(property_data)) + + return ResourceChange(change_type, resource_id, before, after, delta) + + changes = [] + for change_data in what_if_json_result.get('changes', []): + changes.append(_create_resource_change(change_data)) + + potential_changes = [] + for change_data in what_if_json_result.get('potential_changes', []): + potential_changes.append(_create_resource_change(change_data)) + + return WhatIfOperationResult(changes, potential_changes, []) + + +def show_what_if(cli_ctx, azcli_script: str, subscription_id: str = None, no_pretty_print=False): + from azure.cli.core.commands.client_factory import get_subscription_id + from azure.cli.command_modules.resource._formatters import format_what_if_operation_result + + if not subscription_id: + subscription_id = get_subscription_id(cli_ctx) + + payload = { + "azcli_script": azcli_script, + "subscription_id": subscription_id + } + + headers_dict = _get_auth_headers(cli_ctx, subscription_id) + response = _make_what_if_request(payload, headers_dict, cli_ctx) + + try: + raw_results = response.json() + except ValueError as ex: + raise CLIError(f"Failed to parse response from what-if service: {ex}, raw response: {response.text}") + + success = raw_results.get('success') + if success is False: + raise CLIError(f"Errors from what-if service: {raw_results}") + if success is True: + what_if_result = raw_results.get('what_if_result', {}) + what_if_operation_result = convert_json_to_what_if_result(what_if_result) + if no_pretty_print: + return what_if_result + print(format_what_if_operation_result(what_if_operation_result, cli_ctx.enable_color)) + return what_if_result + raise CLIError(f"Unexpected response from what-if service, got: {raw_results}") diff --git a/src/azure-cli/azure/cli/command_modules/sql/_params.py b/src/azure-cli/azure/cli/command_modules/sql/_params.py index bb6064eac02..ce2c94119fa 100644 --- a/src/azure-cli/azure/cli/command_modules/sql/_params.py +++ b/src/azure-cli/azure/cli/command_modules/sql/_params.py @@ -40,6 +40,7 @@ get_enum_type, get_resource_name_completion_list, get_location_type, + get_what_if_type, tags_type, resource_group_name_type ) @@ -1913,6 +1914,7 @@ def _configure_security_policy_storage_params(arg_ctx): with self.argument_context('sql server create') as c: c.argument('location', arg_type=get_location_type_with_default_from_resource_group(self.cli_ctx)) + c.argument('what_if', get_what_if_type()) # Create args that will be used to build up the Server object create_args_for_complex_type( diff --git a/src/azure-cli/azure/cli/command_modules/sql/custom.py b/src/azure-cli/azure/cli/command_modules/sql/custom.py index 17d2d4162e6..0a4506f19e2 100644 --- a/src/azure-cli/azure/cli/command_modules/sql/custom.py +++ b/src/azure-cli/azure/cli/command_modules/sql/custom.py @@ -4372,6 +4372,7 @@ def server_create( external_admin_principal_type=None, external_admin_sid=None, external_admin_name=None, + what_if=None, # pylint: disable=unused-argument **kwargs): ''' Creates a server. diff --git a/src/azure-cli/azure/cli/command_modules/util/_help.py b/src/azure-cli/azure/cli/command_modules/util/_help.py index 84c2a529b68..241abcef88d 100644 --- a/src/azure-cli/azure/cli/command_modules/util/_help.py +++ b/src/azure-cli/azure/cli/command_modules/util/_help.py @@ -92,3 +92,13 @@ - name: List resource groups by bringing your own access token text: az demo byo-access-token --access-token "eyJ0eXAiO..." --subscription-id 00000000-0000-0000-0000-000000000000 """ + +helps['what-if'] = """ +type: command +short-summary: Create a sandboxed what-if simulation of Azure CLI scripts to visualize infrastructure changes before execution. +examples: +- name: Simulate a what-if scenario for a provided script + text: az what-if --script-path "/path/to/your/script.sh" +- name: Simulate a what-if scenario for a specific subscription + text: az what-if --script-path "/path/to/your/script.sh" --subscription 00000000-0000-0000-0000-000000000000 +""" diff --git a/src/azure-cli/azure/cli/command_modules/util/_params.py b/src/azure-cli/azure/cli/command_modules/util/_params.py index 3e0aae88cd1..8c6930253d4 100644 --- a/src/azure-cli/azure/cli/command_modules/util/_params.py +++ b/src/azure-cli/azure/cli/command_modules/util/_params.py @@ -51,3 +51,7 @@ def load_arguments(self, _): with self.argument_context('demo byo-access-token') as c: c.argument('access_token', help="Your own access token") c.argument('subscription_id', help="Subscription ID under which to list resource groups") + + with self.argument_context('what-if') as c: + c.argument('script_path', help="Specify the path to a script file containing Azure CLI commands to be executed.") + c.argument('no_pretty_print', help="Disable pretty-printing of the output.") diff --git a/src/azure-cli/azure/cli/command_modules/util/commands.py b/src/azure-cli/azure/cli/command_modules/util/commands.py index 6f0c14030fc..396d9e6a390 100644 --- a/src/azure-cli/azure/cli/command_modules/util/commands.py +++ b/src/azure-cli/azure/cli/command_modules/util/commands.py @@ -22,3 +22,6 @@ def load_command_table(self, _): with self.command_group('demo secret-store') as g: g.custom_command('save', 'secret_store_save') g.custom_command('load', 'secret_store_load') + + with self.command_group('') as g: + g.custom_command('what-if', 'show_what_if', is_preview=True) diff --git a/src/azure-cli/azure/cli/command_modules/util/custom.py b/src/azure-cli/azure/cli/command_modules/util/custom.py index ea2f2ea0bd0..95602ff7b23 100644 --- a/src/azure-cli/azure/cli/command_modules/util/custom.py +++ b/src/azure-cli/azure/cli/command_modules/util/custom.py @@ -370,3 +370,12 @@ def get_token(self, *scopes, **kwargs): # pylint: disable=unused-argument from azure.cli.core.auth.util import AccessToken # Assume the access token expires in 1 year / 31536000 seconds return AccessToken(self.access_token, int(time.time()) + 31536000) + + +def show_what_if(cmd, script_path, no_pretty_print=False): + from azure.cli.core.what_if import show_what_if as _show_what_if, read_script_file + from azure.cli.core.commands.client_factory import get_subscription_id + + subscription_id = get_subscription_id(cmd.cli_ctx) + script_content = read_script_file(script_path) + return _show_what_if(cmd.cli_ctx, script_content, subscription_id=subscription_id, no_pretty_print=no_pretty_print) diff --git a/src/azure-cli/azure/cli/command_modules/util/tests/latest/test_whatif.py b/src/azure-cli/azure/cli/command_modules/util/tests/latest/test_whatif.py new file mode 100644 index 00000000000..bc64705a38a --- /dev/null +++ b/src/azure-cli/azure/cli/command_modules/util/tests/latest/test_whatif.py @@ -0,0 +1,63 @@ +# -------------------------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for license information. +# -------------------------------------------------------------------------------------------- + +import unittest +import os +from unittest.mock import patch, Mock +from azure.cli.testsdk import ScenarioTest + +TEST_DIR = os.path.dirname(os.path.realpath(__file__)) + + +class WhatIfTest(ScenarioTest): + + def setUp(self): + super().setUp() + self.test_script_path = os.path.join(TEST_DIR, 'test_whatif_script.sh') + + @patch('azure.cli.core.commands.client_factory.get_subscription_id') + @patch('azure.cli.core.what_if._get_auth_headers') + @patch('azure.cli.core.what_if._make_what_if_request') + def test_what_if_command(self, mock_make_request, mock_get_headers, mock_get_subscription_id): + mock_get_subscription_id.return_value = 'test-subscription-id' + mock_get_headers.return_value = {'Authorization': 'Bearer test-token'} + mock_response = Mock() + mock_response.json.return_value = { + "success": True, + "what_if_result": { + "changes": [ + { + "changeType": "Create", + "resourceId": "/subscriptions/test/resourceGroups/myrg/providers/Microsoft.Compute/virtualMachines/MyVM_01", + "before": None, + "after": { + "name": "MyVM_01", + "type": "Microsoft.Compute/virtualMachines", + "location": "eastus" + }, + "delta": [] + } + ], + "potential_changes": [], + "diagnostics": [] + } + } + mock_make_request.return_value = mock_response + + result = self.cmd('az what-if --script-path "{}" --no-pretty-print'.format(self.test_script_path)) + output = result.get_output_in_json() + + self.assertIsInstance(output, dict) + self.assertIn("changes", output) + self.assertEqual(len(output["changes"]), 1) + self.assertEqual(output["changes"][0]["changeType"], "Create") + + mock_get_subscription_id.assert_called_once() + mock_get_headers.assert_called_once() + mock_make_request.assert_called_once() + + +if __name__ == '__main__': + unittest.main() diff --git a/src/azure-cli/azure/cli/command_modules/util/tests/latest/test_whatif_script.sh b/src/azure-cli/azure/cli/command_modules/util/tests/latest/test_whatif_script.sh new file mode 100644 index 00000000000..b121a971c94 --- /dev/null +++ b/src/azure-cli/azure/cli/command_modules/util/tests/latest/test_whatif_script.sh @@ -0,0 +1,3 @@ +az vm create --resource-group myrg --name MyVM_01 --image UbuntuLTS --size Standard_D2s_v3 --admin-username azureuser --generate-ssh-keys + +az functionapp update --name myfunctionapp --resource-group myrg --set tags.Environment=Test \ No newline at end of file diff --git a/src/azure-cli/azure/cli/command_modules/vm/_params.py b/src/azure-cli/azure/cli/command_modules/vm/_params.py index 545ed1f9dfa..a22e88bd539 100644 --- a/src/azure-cli/azure/cli/command_modules/vm/_params.py +++ b/src/azure-cli/azure/cli/command_modules/vm/_params.py @@ -13,7 +13,7 @@ from azure.cli.core.commands.validators import ( get_default_location_from_resource_group, validate_file_or_dict) from azure.cli.core.commands.parameters import ( - get_location_type, get_resource_name_completion_list, tags_type, get_three_state_flag, + get_location_type, get_what_if_type, get_resource_name_completion_list, tags_type, get_three_state_flag, file_type, get_enum_type, zone_type, zones_type) from azure.cli.command_modules.vm._actions import _resource_not_exists from azure.cli.command_modules.vm._completers import ( @@ -415,6 +415,7 @@ def load_arguments(self, _): c.argument('workspace', is_preview=True, arg_group='Monitor', help='Name or ID of Log Analytics Workspace. If you specify the workspace through its name, the workspace should be in the same resource group with the vm, otherwise a new workspace will be created.') with self.argument_context('vm update') as c: + c.argument('what_if', get_what_if_type()) c.argument('os_disk', min_api='2017-12-01', help="Managed OS disk ID or name to swap to") c.argument('write_accelerator', nargs='*', min_api='2017-12-01', help="enable/disable disk write accelerator. Use singular value 'true/false' to apply across, or specify individual disks, e.g.'os=true 1=true 2=true' for os disk and data disks with lun of 1 & 2") @@ -1063,6 +1064,7 @@ def load_arguments(self, _): for scope in ['vm create', 'vmss create']: with self.argument_context(scope) as c: c.argument('location', get_location_type(self.cli_ctx), help='Location in which to create VM and related resources. If default location is not configured, will default to the resource group\'s location') + c.argument('what_if', get_what_if_type()) c.argument('tags', tags_type) c.argument('no_wait', help='Do not wait for the long-running operation to finish.') c.argument('validate', options_list=['--validate'], help='Generate and validate the ARM template without creating any resources.', action='store_true') diff --git a/src/azure-cli/azure/cli/command_modules/vm/custom.py b/src/azure-cli/azure/cli/command_modules/vm/custom.py index 53320a0128f..255d7673e54 100644 --- a/src/azure-cli/azure/cli/command_modules/vm/custom.py +++ b/src/azure-cli/azure/cli/command_modules/vm/custom.py @@ -933,7 +933,7 @@ def create_vm(cmd, vm_name, resource_group_name, image=None, size='Standard_DS1_ enable_user_redeploy_scheduled_events=None, zone_placement_policy=None, include_zones=None, exclude_zones=None, align_regional_disks_to_vm_zone=None, wire_server_mode=None, imds_mode=None, wire_server_access_control_profile_reference_id=None, imds_access_control_profile_reference_id=None, - key_incarnation_id=None, add_proxy_agent_extension=None): + key_incarnation_id=None, add_proxy_agent_extension=None, what_if=False): from azure.cli.core.commands.client_factory import get_subscription_id from azure.cli.core.util import random_string, hash_string diff --git a/src/azure-cli/service_name.json b/src/azure-cli/service_name.json index e260e066dc8..e8a059e7a2c 100644 --- a/src/azure-cli/service_name.json +++ b/src/azure-cli/service_name.json @@ -548,5 +548,10 @@ "Command": "az webapp", "AzureServiceName": "App Services", "URL": "https://learn.microsoft.com/rest/api/appservice/webapps" + }, + { + "Command": "az what-if", + "AzureServiceName": "Azure CLI", + "URL": "" } ]