Skip to content
This repository was archived by the owner on Jul 10, 2024. It is now read-only.
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 39 additions & 30 deletions anchorecli/cli/image.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
import anchorecli.clients.apiexternal
import anchorecli.cli.utils

from collections import OrderedDict

config = {}
_logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -354,62 +356,69 @@ def query_vuln(input_image, vuln_type, vendor_only):
anchorecli.cli.utils.doexit(ecode)


@image.command(name='del', short_help="Delete an image")
@click.argument('input_image', required=False)
@image.command(name='del', short_help="Delete one or more images")
@click.argument('input_images', required=False, nargs=-1)
@click.option('--force', is_flag=True, help="Force deletion of image by cancelling any subscription/notification settings prior to image delete")
@click.option('--all', is_flag=True, help="Delete all images")
def delete(input_image, force, all):
def delete(input_images, force, all):
"""
INPUT_IMAGE: Input image can be in the following formats: Image Digest, ImageID or registry/repo:tag
"""
ecode = 0

if all:
try:
try:
image_digests = set() # gathering image digests to be bundled into delete request
input_list = list() # for preserving same order as input in the output

if all:
ret = anchorecli.clients.apiexternal.get_images(config)
ecode = anchorecli.cli.utils.get_ecode(ret)
if not ret['success']:
raise Exception(json.dumps(ret['error'], indent=4))

for image in ret['payload']:
if image['imageDigest']:
ret = anchorecli.clients.apiexternal.delete_image(config, imageDigest=image['imageDigest'], force=force)
if ret['success']:
for image_detail in image['image_detail']:
fulltag = image_detail.pop('registry', "None") + "/" + image_detail.pop('repo', "None") + ":" + image_detail.pop('tag', "None")
print(fulltag)
else:
raise Exception(json.dumps(ret['error'], indent=4))
image_digests.add(image['imageDigest'])
for image_detail in image['image_detail']:
fulltag = image_detail.pop('registry', "None") + "/" + image_detail.pop('repo', "None") + ":" + image_detail.pop('tag', "None")
input_list.append((fulltag, image['imageDigest']))

except Exception as err:
print(anchorecli.cli.utils.format_error_output(config, 'image_delete_all', {}, err))
if not ecode:
ecode = 2
else:
try:
if input_image is None:
else:
if not input_images:
raise Exception("Missing argument INPUT_IMAGE")

itype, image, imageDigest = anchorecli.cli.utils.discover_inputimage(config, input_image)
for input_image in OrderedDict.fromkeys(input_images).keys():
itype, image, imageDigest = anchorecli.cli.utils.discover_inputimage(config, input_image)

if imageDigest:
ret = anchorecli.clients.apiexternal.delete_image(config, imageDigest=imageDigest, force=force)
ecode = anchorecli.cli.utils.get_ecode(ret)
else:
ecode = 1
raise Exception("cannot use input image string: no discovered imageDigest")
if imageDigest:
# ret = anchorecli.clients.apiexternal.delete_image(config, imageDigest=imageDigest, force=force)
input_list.append((image, imageDigest))
image_digests.add(imageDigest)
else:
input_list.append((image, imageDigest))

if image_digests:
# TODO batch them into groups of 100 or so?
ret = anchorecli.clients.apiexternal.delete_images(config, imageDigests=list(image_digests), force=force)
ecode = anchorecli.cli.utils.get_ecode(ret)
if ret:
if ret['success']:
print(anchorecli.cli.utils.format_output(config, 'image_delete', {}, ret['payload']))
print(anchorecli.cli.utils.format_output(config, 'image_delete', input_list, ret['payload']))
else:
raise Exception(json.dumps(ret['error'], indent=4))
else:
raise Exception("operation failed with empty response")
elif input_list:
print(anchorecli.cli.utils.format_output(config, 'image_delete', input_list, []))
else:
raise Exception("operation failed")

except Exception as err:
except Exception as err:
if all:
print(anchorecli.cli.utils.format_error_output(config, 'image_delete_all', {}, err))
else:
print(anchorecli.cli.utils.format_error_output(config, 'image_delete', {}, err))
if not ecode:
ecode = 2
if not ecode:
ecode = 2

anchorecli.cli.utils.doexit(ecode)
17 changes: 17 additions & 0 deletions anchorecli/cli/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -836,6 +836,23 @@ def format_output(config, op, params, payload):
ret = t.get_string(sortby='Created')+"\n"
elif op in ['user_setpassword']:
ret = "Password (re)set success"
elif op == 'image_delete':
header = ['Input', 'Image Digest', 'Delete Status', 'Details']
t = plain_column_table(header)
if isinstance(payload, list):
for input_image, image_digest in params:
if image_digest:
record = next((item for item in payload if item['digest'] == image_digest), None)
if record:
row = [str(input_image), str(image_digest), str(record['status']), str(record['detail'])]
else:
row = [str(input_image), str(image_digest), 'delete_failed', 'cannot delete input image, no response from server']
else:
row = [str(input_image), str(image_digest), 'delete_failed', 'cannot use input image string, no discovered imageDigest']
t.add_row(row)
ret = t.get_string() + "\n"
else:
ret = 'Success'
elif op in ['delete_system_service'] or re.match(".*_delete$", op) or re.match(".*_activate$", op) or re.match(".*_deactivate$", op) or re.match(".*_enable$", op) or re.match(".*_disable$", op):
# NOTE this should always be the last in the if/elif conditional
ret = 'Success'
Expand Down
26 changes: 26 additions & 0 deletions anchorecli/clients/apiexternal.py
Original file line number Diff line number Diff line change
Expand Up @@ -488,6 +488,32 @@ def delete_image(config, imageDigest=None, force=False):

return(ret)

def delete_images(config, imageDigests=None, force=False):
userId = config['user']
password = config['pass']
base_url = config['url']

if not imageDigests:
raise Exception("must specify a valid imageDigest to delete")

base_url = re.sub("/$", "", base_url)
url = '/'.join([base_url, "images"])
url = url + "?imageDigests={}".format(','.join(imageDigests))

if force:
url = url+"&force=True"

set_account_header(config)

try:
_logger.debug("DELETE url=%s", str(url))
r = requests.delete(url, auth=(userId, password), verify=config['ssl_verify'], headers=header_overrides)
ret = anchorecli.clients.common.make_client_result(r, raw=False)
except Exception as err:
raise err

return(ret)

# policy clients

def add_policy(config, policybundle={}, detail=False):
Expand Down