Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 45 additions & 1 deletion src/azure-cli-core/azure/cli/core/commands/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,7 @@ class AzCliCommandInvoker(CommandInvoker):

# pylint: disable=too-many-statements,too-many-locals,too-many-branches
def execute(self, args):
args_copy = args[:]
from knack.events import (EVENT_INVOKER_PRE_CMD_TBL_CREATE, EVENT_INVOKER_POST_CMD_TBL_CREATE,
EVENT_INVOKER_CMD_TBL_LOADED, EVENT_INVOKER_PRE_PARSE_ARGS,
EVENT_INVOKER_POST_PARSE_ARGS,
Expand Down Expand Up @@ -586,7 +587,8 @@ def execute(self, args):
args[0] = '--help'

self.parser.enable_autocomplete()

if '--what-if' in (args_copy):
return self._what_if(args_copy)
self.cli_ctx.raise_event(EVENT_INVOKER_PRE_PARSE_ARGS, args=args)
parsed_args = self.parser.parse_args(args)
self.cli_ctx.raise_event(EVENT_INVOKER_POST_PARSE_ARGS, command=parsed_args.command, args=parsed_args)
Expand Down Expand Up @@ -691,6 +693,48 @@ def execute(self, args):
table_transformer=self.commands_loader.command_table[parsed_args.command].table_transformer,
is_query_active=self.data['query_active'])

def _what_if(self, args):
# DEBUG: Add logging to see if this method is called
print(f"DEBUG: _what_if called with command: {args}")
if '--what-if' in args:
print("DEBUG: Entering what-if mode")
from azure.cli.core.what_if import show_what_if
try:
# Get subscription ID with priority: --subscription parameter > current login subscription
if '--subscription' in args:
index = args.index('--subscription')
if index + 1 < len(args):
subscription_value = args[index + 1]
subscription_id = subscription_value
else:
from azure.cli.core.commands.client_factory import get_subscription_id
subscription_id = get_subscription_id(self.cli_ctx)
print(f"DEBUG: Using current login subscription ID: {subscription_id}")

args = ["az"] + args if args[0] != 'az' else args
command = " ".join(args)
what_if_result = show_what_if(self.cli_ctx, command, subscription_id=subscription_id)

# Ensure output format is set for proper formatting
# Default to 'json' if not already set
if 'output' not in self.cli_ctx.invocation.data or self.cli_ctx.invocation.data['output'] is None:
self.cli_ctx.invocation.data['output'] = 'json'

# Return the formatted what-if output as the result
# Similar to the normal flow in execute() method
return CommandResultItem(
what_if_result,
table_transformer=None,
is_query_active=self.data.get('query_active', False),
exit_code=0
)
except (CLIError, ValueError, KeyError) as ex:
# If what-if service fails, still show an informative message
return CommandResultItem(None, exit_code=1,
error=CLIError(f'What-if preview failed: {str(ex)}\n'
f'Note: This was a preview operation. '
f'No actual changes were made.'))

@staticmethod
def _extract_parameter_names(args):
# note: name start with more than 2 '-' will be treated as value e.g. certs in PEM format
Expand Down
10 changes: 10 additions & 0 deletions src/azure-cli-core/azure/cli/core/commands/parameters.py
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,16 @@ def get_location_type(cli_ctx):
return location_type


def get_what_if_type():
what_if_type = CLIArgumentType(
options_list=['--what-if'],
help="Preview the changes that will be made without actually executing the command. "
"This will call the what-if service to compare the current state with the expected state after execution.",
is_preview=True
)
return what_if_type


deployment_name_type = CLIArgumentType(
help=argparse.SUPPRESS,
required=False,
Expand Down
243 changes: 243 additions & 0 deletions src/azure-cli-core/azure/cli/core/what_if.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,243 @@
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------

import threading
import time
import sys
import json
from requests import Request, Session
from knack.util import CLIError


def read_script_file(script_path):
try:
with open(script_path, 'r', encoding='utf-8') as f:
return f.read()
except FileNotFoundError:
raise CLIError(f"Script file not found: {script_path}")
except Exception as ex:
raise CLIError(f"Error reading script file: {ex}")


def _get_auth_headers(cli_ctx, subscription_id):
from azure.cli.core._profile import Profile

resource = cli_ctx.cloud.endpoints.active_directory_resource_id
profile = Profile(cli_ctx=cli_ctx)

try:
token_result = profile.get_raw_token(resource, subscription=subscription_id)
token_info, _, _ = token_result
token_type, token, _ = token_info
except Exception as token_ex:
raise CLIError(f"Failed to get authentication token: {token_ex}")

return {
'Authorization': f'{token_type} {token}',
'Content-Type': 'application/json'
}


def _make_what_if_request(payload, headers_dict, cli_ctx=None):
request_completed = threading.Event()

def _rotating_progress():
"""Simulate a rotating progress indicator."""
spinner_chars = ["⠋", "⠙", "⠹", "⠸", "⠼", "⠴", "⠦", "⠧", "⠇", "⠏"]
fallback_chars = ["|", "/", "-", "\\"]

try:
"⠋".encode(sys.stderr.encoding or 'utf-8')
chars = spinner_chars
except (UnicodeEncodeError, UnicodeDecodeError, LookupError):
chars = fallback_chars

use_color = cli_ctx and getattr(cli_ctx, 'enable_color', False)
if use_color:
try:
CYAN = '\033[36m'
GREEN = '\033[32m'
YELLOW = '\033[33m'
BLUE = '\033[34m'
RESET = '\033[0m'
BOLD = '\033[1m'
except (UnicodeError, AttributeError):
use_color = False

if not use_color:
CYAN = GREEN = YELLOW = BLUE = RESET = BOLD = ''

idx = 0
start_time = time.time()

# Simulate different stages, can be improved with real stages if available
while not request_completed.is_set():
elapsed = time.time() - start_time
if elapsed < 10:
status = f"{CYAN}Connecting to what-if service{RESET}"
spinner_color = CYAN
elif elapsed < 30:
status = f"{BLUE}Analyzing Azure CLI script{RESET}"
spinner_color = BLUE
elif elapsed < 60:
status = f"{YELLOW}Processing what-if analysis{RESET}"
spinner_color = YELLOW
else:
status = f"{GREEN}Finalizing results{RESET}"
spinner_color = GREEN
elapsed_str = f"{BOLD}({elapsed:.0f}s){RESET}"
spinner = f"{spinner_color}{chars[idx % len(chars)]}{RESET}"
progress_line = f"{spinner} {status}... {elapsed_str}"
sys.stderr.write(f"\033[2K\r{progress_line}")
sys.stderr.flush()
idx += 1
time.sleep(0.12)

sys.stderr.write("\033[2K\r")
sys.stderr.flush()

try:
function_app_url = "https://azcli-script-insight.azurewebsites.net"

progress_thread = threading.Thread(target=_rotating_progress)
progress_thread.daemon = True
progress_thread.start()

session = Session()
req = Request(method="POST", url=f"{function_app_url}/api/what_if_cli_preview",
headers=headers_dict, data=json.dumps(payload))
prepared = session.prepare_request(req)
response = session.send(prepared)
request_completed.set()
progress_thread.join(timeout=0.5)

return response

except Exception as ex:
request_completed.set()
if 'progress_thread' in locals():
progress_thread.join(timeout=0.5)
raise CLIError(f"Failed to connect to the what-if service: {ex}")


def convert_json_to_what_if_result(what_if_json_result):
from azure.cli.command_modules.resource._formatters import _change_type_to_weight, _property_change_type_to_weight
from collections import namedtuple

enum_keys = list(_change_type_to_weight.keys())
enum_mapping = {}
for enum_obj in enum_keys:
str_repr = str(enum_obj).lower()
if 'create' in str_repr:
enum_mapping['Create'] = enum_obj
elif 'delete' in str_repr:
enum_mapping['Delete'] = enum_obj
elif 'modify' in str_repr:
enum_mapping['Modify'] = enum_obj
elif 'deploy' in str_repr:
enum_mapping['Deploy'] = enum_obj
elif 'no_change' in str_repr or 'nochange' in str_repr:
enum_mapping['NoChange'] = enum_obj
elif 'ignore' in str_repr:
enum_mapping['Ignore'] = enum_obj
elif 'unsupported' in str_repr:
enum_mapping['Unsupported'] = enum_obj
elif 'no_effect' in str_repr or 'noeffect' in str_repr:
enum_mapping['NoEffect'] = enum_obj

property_enum_keys = list(_property_change_type_to_weight.keys())
property_enum_mapping = {}
for enum_obj in property_enum_keys:
str_repr = str(enum_obj).lower()
if 'create' in str_repr:
property_enum_mapping['Create'] = enum_obj
elif 'delete' in str_repr:
property_enum_mapping['Delete'] = enum_obj
elif 'modify' in str_repr:
property_enum_mapping['Modify'] = enum_obj
elif 'array' in str_repr:
property_enum_mapping['Array'] = enum_obj
elif 'no_effect' in str_repr or 'noeffect' in str_repr:
property_enum_mapping['NoEffect'] = enum_obj

WhatIfOperationResult = namedtuple('WhatIfOperationResult', ['changes', 'potential_changes', 'diagnostics'])
ResourceChange = namedtuple('ResourceChange', ['change_type', 'resource_id', 'before', 'after', 'delta'])
PropertyChange = namedtuple('PropertyChange', ['property_change_type', 'path', 'before', 'after', 'children'])

def _map_change_type_string(change_type_str):
return enum_mapping.get(change_type_str)

def _map_property_change_type_string(property_change_type_str):
return property_enum_mapping.get(property_change_type_str)

def _create_property_change(change_data):
property_change_type = _map_property_change_type_string(
change_data.get('propertyChangeType', 'NoEffect'))
path = change_data.get('path', '')
before = change_data.get('before')
after = change_data.get('after')

children = []
children_data = change_data.get('children', [])
for child_data in children_data:
children.append(_create_property_change(child_data))

return PropertyChange(property_change_type, path, before, after, children)

def _create_resource_change(change_data):
change_type = _map_change_type_string(change_data.get('changeType', 'Unknown'))
resource_id = change_data.get('resourceId', '')
before = change_data.get('before')
after = change_data.get('after')

delta = []
delta_data = change_data.get('delta', [])
for property_data in delta_data:
delta.append(_create_property_change(property_data))

return ResourceChange(change_type, resource_id, before, after, delta)

changes = []
for change_data in what_if_json_result.get('changes', []):
changes.append(_create_resource_change(change_data))

potential_changes = []
for change_data in what_if_json_result.get('potential_changes', []):
potential_changes.append(_create_resource_change(change_data))

return WhatIfOperationResult(changes, potential_changes, [])


def show_what_if(cli_ctx, azcli_script: str, subscription_id: str = None, no_pretty_print=False):
from azure.cli.core.commands.client_factory import get_subscription_id
from azure.cli.command_modules.resource._formatters import format_what_if_operation_result

if not subscription_id:
subscription_id = get_subscription_id(cli_ctx)

payload = {
"azcli_script": azcli_script,
"subscription_id": subscription_id
}

headers_dict = _get_auth_headers(cli_ctx, subscription_id)
response = _make_what_if_request(payload, headers_dict, cli_ctx)

try:
raw_results = response.json()
except ValueError as ex:
raise CLIError(f"Failed to parse response from what-if service: {ex}, raw response: {response.text}")

success = raw_results.get('success')
if success is False:
raise CLIError(f"Errors from what-if service: {raw_results}")
if success is True:
what_if_result = raw_results.get('what_if_result', {})
what_if_operation_result = convert_json_to_what_if_result(what_if_result)
if no_pretty_print:
return what_if_result
print(format_what_if_operation_result(what_if_operation_result, cli_ctx.enable_color))
return what_if_result
raise CLIError(f"Unexpected response from what-if service, got: {raw_results}")
2 changes: 2 additions & 0 deletions src/azure-cli/azure/cli/command_modules/sql/_params.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
get_enum_type,
get_resource_name_completion_list,
get_location_type,
get_what_if_type,
tags_type,
resource_group_name_type
)
Expand Down Expand Up @@ -1913,6 +1914,7 @@ def _configure_security_policy_storage_params(arg_ctx):
with self.argument_context('sql server create') as c:
c.argument('location',
arg_type=get_location_type_with_default_from_resource_group(self.cli_ctx))
c.argument('what_if', get_what_if_type())

# Create args that will be used to build up the Server object
create_args_for_complex_type(
Expand Down
1 change: 1 addition & 0 deletions src/azure-cli/azure/cli/command_modules/sql/custom.py
Original file line number Diff line number Diff line change
Expand Up @@ -4372,6 +4372,7 @@ def server_create(
external_admin_principal_type=None,
external_admin_sid=None,
external_admin_name=None,
what_if=None, # pylint: disable=unused-argument
**kwargs):
'''
Creates a server.
Expand Down
10 changes: 10 additions & 0 deletions src/azure-cli/azure/cli/command_modules/util/_help.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,3 +92,13 @@
- name: List resource groups by bringing your own access token
text: az demo byo-access-token --access-token "eyJ0eXAiO..." --subscription-id 00000000-0000-0000-0000-000000000000
"""

helps['what-if'] = """
type: command
short-summary: Create a sandboxed what-if simulation of Azure CLI scripts to visualize infrastructure changes before execution.
examples:
- name: Simulate a what-if scenario for a provided script
text: az what-if --script-path "/path/to/your/script.sh"
- name: Simulate a what-if scenario for a specific subscription
text: az what-if --script-path "/path/to/your/script.sh" --subscription 00000000-0000-0000-0000-000000000000
"""
4 changes: 4 additions & 0 deletions src/azure-cli/azure/cli/command_modules/util/_params.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,3 +51,7 @@ def load_arguments(self, _):
with self.argument_context('demo byo-access-token') as c:
c.argument('access_token', help="Your own access token")
c.argument('subscription_id', help="Subscription ID under which to list resource groups")

with self.argument_context('what-if') as c:
c.argument('script_path', help="Specify the path to a script file containing Azure CLI commands to be executed.")
c.argument('no_pretty_print', help="Disable pretty-printing of the output.")
3 changes: 3 additions & 0 deletions src/azure-cli/azure/cli/command_modules/util/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,6 @@ def load_command_table(self, _):
with self.command_group('demo secret-store') as g:
g.custom_command('save', 'secret_store_save')
g.custom_command('load', 'secret_store_load')

with self.command_group('') as g:
g.custom_command('what-if', 'show_what_if', is_preview=True)
9 changes: 9 additions & 0 deletions src/azure-cli/azure/cli/command_modules/util/custom.py
Original file line number Diff line number Diff line change
Expand Up @@ -370,3 +370,12 @@ def get_token(self, *scopes, **kwargs): # pylint: disable=unused-argument
from azure.cli.core.auth.util import AccessToken
# Assume the access token expires in 1 year / 31536000 seconds
return AccessToken(self.access_token, int(time.time()) + 31536000)


def show_what_if(cmd, script_path, no_pretty_print=False):
from azure.cli.core.what_if import show_what_if as _show_what_if, read_script_file
from azure.cli.core.commands.client_factory import get_subscription_id

subscription_id = get_subscription_id(cmd.cli_ctx)
script_content = read_script_file(script_path)
return _show_what_if(cmd.cli_ctx, script_content, subscription_id=subscription_id, no_pretty_print=no_pretty_print)
Loading
Loading