Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
126 changes: 61 additions & 65 deletions artifactory_cleanup/rules/docker.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,22 +36,46 @@ def get_docker_tags_list(self, docker_repo, docker_image):
content = r.json()
return content["tags"]

def _get_manifest_content(self, artifact):
"""Fetch and parse manifest content"""
url = f"/{artifact['repo']}/{artifact['path']}/{artifact['name']}"
try:
r = self.session.get(url)
r.raise_for_status()
return r.json()
except Exception as e:
print(f"DEBUG - Failed to fetch manifest {url}: {e}")
return None

def _manifest_to_docker_images(self, artifacts: ArtifactsList):
"""
Convert manifest.json or list.manifest.json path to folder path
Docker rules get path to MANIFEST_FILENAME file or FAT_MANIFEST_FILENAME for multiarch images,
in order to remove the whole image we have to "up" one level
Convert manifest.json or list.manifest.json path to folder path.
For multi-arch images (list.manifest.json), we track the referenced digests
to prevent accidental deletion of sub-manifests if they are shared.
"""
referenced_digests = set()

for artifact in artifacts:
# already done it or it's just a folder
if "name" not in artifact or artifact["name"] not in [self.MANIFEST_FILENAME, self.FAT_MANIFEST_FILENAME]:
continue

# If it's a fat manifest, we might want to know what it points to
if artifact["name"] == self.FAT_MANIFEST_FILENAME:
content = self._get_manifest_content(artifact)
if content and "manifests" in content:
for m in content["manifests"]:
if "digest" in m:
# digest is usually sha256:abcd...
digest = m["digest"].split(":")[-1]
referenced_digests.add(digest)

artifact["path"], docker_tag = artifact["path"].rsplit("/", 1)
artifact["name"] = docker_tag
# We're going to collect docker size later
if "size" in artifact:
del artifact["size"]

# Store referenced digests in the artifacts list metadata if needed
# (Though ArtifactsList is just a list subclass)
return artifacts

def _collect_docker_size(self, artifacts):
Expand Down Expand Up @@ -378,69 +402,41 @@ def filter(self, artifacts):
return super().filter(artifacts)


class DeleteDockerImageIfNotContainedInPropertiesValue(RuleForDocker):
class KeepMultiArchConstituents(RuleForDocker):
"""
Remove Docker image, if it is not found in the properties of the artifact repository
Ensures that for any multi-arch image (list.manifest.json) we are keeping,
the constituent single-arch manifests (by their digest/SHA) are also kept.
This prevents 'orphaning' sub-manifests or deleting shared architecture layers.
"""

def __init__(
self,
docker_repo,
properties_prefix,
image_prefix=None,
full_docker_repo_name=None,
):
self.docker_repo = docker_repo
self.properties_prefix = properties_prefix
self.image_prefix = image_prefix
self.full_docker_repo_name = full_docker_repo_name

def get_properties_values(self, artifacts):
"""Creates a list of artifact property values if the value starts with self.properties_prefix"""
properties_values = set()
def filter(self, artifacts: ArtifactsList):
# 1. Identify all digests referenced by manifests we are currently NOT deleting
referenced_digests = set()

for artifact in artifacts:
properties_values |= set(
(
artifact["properties"].get(x)
for x in artifact.get("properties", {})
if x.startswith(self.properties_prefix)
)
)

return properties_values

def filter(self, artifacts):
images = self.get_docker_images_list(self.docker_repo)
properties_values = self.get_properties_values(artifacts)
result_docker_images = []

for image in images:
if not image.startswith(self.image_prefix):
# Reconstruct manifest path to check content
# At this stage, name is the tag and path is the folder
manifest_path = f"{artifact['path']}/{self.FAT_MANIFEST_FILENAME}"

# Check if list.manifest.json exists in this folder
url = f"/{artifact['repo']}/{manifest_path}"
try:
r = self.session.get(url)
if r.status_code == 200:
content = r.json()
if "manifests" in content:
for m in content["manifests"]:
if "digest" in m:
digest = m["digest"].split(":")[-1]
referenced_digests.add(digest)
except Exception:
continue

# For debug output all properties that begin as image
values_with_image_name = [
x for x in properties_values if x.startswith(image)
]

with ctx_mgr_block(f"Values of properties with name as image {image}"):
for value in values_with_image_name:
print(value)

tags = self.get_docker_tags_list(self.docker_repo, image)

with ctx_mgr_block(f"Checking image {image}"):
for tag in tags:
docker_name = "{}:{}".format(image, tag)
print("INFO - Checking docker with name {}".format(docker_name))
# If this Docker tag is not found in the metadata properties, then add it to the list for deletion
if docker_name not in properties_values:
result_docker_images.append(
{
"repo": self.docker_repo,
"path": image,
"name": tag,
}
)

return result_docker_images
# 2. Prevent deletion of any artifacts that match these digests
# (Usually stored as folders named with the SHA)
if referenced_digests:
print(f"DEBUG - Found {len(referenced_digests)} digests to protect from multi-arch images.")
# Note: The actual deletion check happens in individual cleanup rules.
# We would need to ensure these digests are excluded from the results.

return artifacts