+
+ Comment thread
+ Comment
+ deleted from
+
+
+ Content title
+
+
+
+ content that no longer exists
+
+
+
+
+
+
+
+
+
+
+ Item details
+
+
+
+
+
+
+
Original ID
+
ID
+
+
+
Original path
+
+ Path
+
+
+
+
Parent path
+
+ Parent
+
+
+
+
Deletion date
+
+ Date
+
+
+
+
+
+
+
Comment text
+
+
+
Comment text
+
+
+
+
+
Comment author
+
+ Author
+
+
+
+
+
+
+
Number of items
+
+ Count
+ contained items
+
+
+
+
+
+
+
+
+
+
+
+
+
+ Restore item
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ Folder contents
+
+ 0
+
+
+
+
+
+ These items were contained in this folder when it was deleted.
+ You can restore them individually to any location.
+
+
+
+
+
+
+
+
Title
+
Type
+
Original path
+
Size
+
Actions
+
+
+
+
+
+ Title
+
+
+ Type
+
+
+ Path
+
+
+ Size
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ Comment thread contents
+
+ 0
+
+
+
+
+
+ This comment thread contains the following comments.
+ All comments will be restored together with their reply relationships.
+
+
+
+
+
+
+
+
Author
+
Comment text
+
In reply to
+
Created
+
+
+
+
+
+ Author
+
+
+
Comment text
+
+
+
+ 1234
+
+
+ Top level comment
+
+
+
+ Date
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/Products/CMFPlone/configure.zcml b/Products/CMFPlone/configure.zcml
index ce2dfc190b..2b3091a842 100644
--- a/Products/CMFPlone/configure.zcml
+++ b/Products/CMFPlone/configure.zcml
@@ -159,4 +159,17 @@
for="zope.pagetemplate.engine.ZopeBaseEngine"
/>
+
+
+
+
+
+
diff --git a/Products/CMFPlone/controlpanel/browser/configure.zcml b/Products/CMFPlone/controlpanel/browser/configure.zcml
index 7cca9f0b3a..1a7dc5a9c8 100644
--- a/Products/CMFPlone/controlpanel/browser/configure.zcml
+++ b/Products/CMFPlone/controlpanel/browser/configure.zcml
@@ -348,4 +348,13 @@
permission="cmf.ManagePortal"
/>
+
+
+
+
diff --git a/Products/CMFPlone/controlpanel/browser/recyclebin.py b/Products/CMFPlone/controlpanel/browser/recyclebin.py
new file mode 100644
index 0000000000..ea24ea0ef0
--- /dev/null
+++ b/Products/CMFPlone/controlpanel/browser/recyclebin.py
@@ -0,0 +1,17 @@
+from plone.app.registry.browser.controlpanel import ControlPanelFormWrapper
+from plone.app.registry.browser.controlpanel import RegistryEditForm
+from plone.base import PloneMessageFactory as _
+from plone.base.interfaces.recyclebin import IRecycleBinControlPanelSettings
+from plone.z3cform import layout
+
+
+class RecyclebinControlPanelForm(RegistryEditForm):
+ schema = IRecycleBinControlPanelSettings
+ schema_prefix = "plone-recyclebin"
+ label = _("Recycle bin settings")
+ description = _("Settings for the Plone recycle bin")
+
+
+RecyclebinControlPanelView = layout.wrap_form(
+ RecyclebinControlPanelForm, ControlPanelFormWrapper
+)
diff --git a/Products/CMFPlone/events.py b/Products/CMFPlone/events.py
index 77baf900f6..3a6c5ee9d2 100644
--- a/Products/CMFPlone/events.py
+++ b/Products/CMFPlone/events.py
@@ -1,8 +1,13 @@
from plone.base.interfaces import IReorderedEvent
from plone.base.interfaces import ISiteManagerCreatedEvent
+from plone.base.interfaces.recyclebin import IRecycleBin
from plone.base.utils import get_installer
+from Products.CMFCore.interfaces import IContentish
+from zope.component import adapter
+from zope.component import queryUtility
from zope.interface import implementer
from zope.interface.interfaces import ObjectEvent
+from zope.lifecycleevent.interfaces import IObjectRemovedEvent
@implementer(ISiteManagerCreatedEvent)
@@ -37,3 +42,31 @@ def removeBase(event):
https://dev.plone.org/ticket/13705
"""
event.request.response.base = None
+
+
+@adapter(IContentish, IObjectRemovedEvent)
+def handle_content_removal(obj, event):
+ """Event handler for content removal
+
+ This intercepts standard content removal and puts the item in the recycle bin
+ instead of letting it be deleted if the recycle bin is enabled.
+ """
+ # Ignore if the object is being moved
+ if getattr(obj, "_v_is_being_moved", False):
+ return
+
+ # Get the recycle bin
+ recycle_bin = queryUtility(IRecycleBin)
+ if recycle_bin is None or not recycle_bin.is_enabled():
+ return
+
+ # Only process if this is a direct deletion (not part of container deletion)
+ if event.newParent is not None:
+ return
+
+ # Get original information
+ original_container = event.oldParent
+ original_path = "/".join(obj.getPhysicalPath())
+
+ # Add to recycle bin - let any exceptions propagate to make problems visible
+ recycle_bin.add_item(obj, original_container, original_path)
diff --git a/Products/CMFPlone/profiles/default/actions.xml b/Products/CMFPlone/profiles/default/actions.xml
index 46e0a043cd..4b257cb975 100644
--- a/Products/CMFPlone/profiles/default/actions.xml
+++ b/Products/CMFPlone/profiles/default/actions.xml
@@ -494,6 +494,24 @@
True
+
diff --git a/Products/CMFPlone/profiles/dependencies/registry.xml b/Products/CMFPlone/profiles/dependencies/registry.xml
index 5f596cebda..9970513a3b 100644
--- a/Products/CMFPlone/profiles/dependencies/registry.xml
+++ b/Products/CMFPlone/profiles/dependencies/registry.xml
@@ -126,5 +126,12 @@
{"actionOptions": {"displayInModal": false}}
+
+ True
+ 30
+ 100
+
diff --git a/Products/CMFPlone/recyclebin.py b/Products/CMFPlone/recyclebin.py
new file mode 100644
index 0000000000..bc98a9fb3d
--- /dev/null
+++ b/Products/CMFPlone/recyclebin.py
@@ -0,0 +1,980 @@
+from AccessControl import getSecurityManager
+from Acquisition import aq_base
+from BTrees.OOBTree import OOBTree
+from BTrees.OOBTree import OOTreeSet
+from datetime import datetime
+from datetime import timedelta
+from DateTime import DateTime
+from persistent import Persistent
+from plone.base.interfaces.recyclebin import IRecycleBin
+from plone.base.interfaces.recyclebin import IRecycleBinControlPanelSettings
+from plone.registry.interfaces import IRegistry
+from Products.CMFCore.utils import getToolByName
+from zope.annotation.interfaces import IAnnotations
+from zope.component import getUtility
+from zope.component.hooks import getSite
+from zope.interface import implementer
+
+import logging
+import uuid
+
+
+logger = logging.getLogger("Products.CMFPlone.RecycleBin")
+
+ANNOTATION_KEY = "Products.CMFPlone.RecycleBin"
+
+
+class RecycleBinStorage(Persistent):
+ """Storage class for RecycleBin using BTrees for better performance"""
+
+ def __init__(self):
+ self.items = OOBTree()
+ # Add a sorted index that stores (deletion_date, item_id) tuples
+ # This will automatically maintain items sorted by date
+ self._sorted_index = OOTreeSet()
+
+ def __getitem__(self, key):
+ return self.items[key]
+
+ def __setitem__(self, key, value):
+ # When adding or updating an item, update the sorted index
+ if key in self.items:
+ # If updating an existing item, remove old index entry first
+ old_value = self.items[key]
+ self._remove_from_index(key, old_value)
+
+ # Add the item to main storage
+ self.items[key] = value
+
+ # Add to sorted index if it has a deletion_date
+ self._add_to_index(key, value)
+
+ def __delitem__(self, key):
+ # When deleting an item, also remove it from the sorted index
+ if key in self.items:
+ item = self.items[key]
+ self._remove_from_index(key, item)
+
+ # Remove from main storage
+ del self.items[key]
+
+ def _add_to_index(self, key, value):
+ """Add an item to the sorted index"""
+ if "deletion_date" in value:
+ try:
+ # Store as (date, id) for automatic sorting
+ self._sorted_index.add((value["deletion_date"], key))
+ except TypeError:
+ # Skip if the date is not comparable
+ logger.warning(
+ f"Could not index item {key} by date: {value.get('deletion_date')}"
+ )
+
+ def _remove_from_index(self, key, value):
+ """Remove an item from the sorted index"""
+ if "deletion_date" in value:
+ try:
+ sort_key = (value["deletion_date"], key)
+ if sort_key in self._sorted_index:
+ self._sorted_index.remove(sort_key)
+ except (KeyError, TypeError):
+ # Ignore errors if the entry doesn't exist or date is not comparable
+ pass
+
+ def __contains__(self, key):
+ return key in self.items
+
+ def __len__(self):
+ return len(self.items)
+
+ def get(self, key, default=None):
+ return self.items.get(key, default)
+
+ def keys(self):
+ return self.items.keys()
+
+ def values(self):
+ return self.items.values()
+
+ def get_items(self):
+ """Return all items as key-value pairs"""
+ return self.items.items()
+
+ def get_items_sorted_by_date(self, reverse=True):
+ """Return items sorted by deletion date
+
+ Args:
+ reverse: If True, return newest items first (default),
+ if False, return oldest items first
+
+ Returns:
+ Generator yielding (item_id, item_data) tuples
+ """
+ sorted_keys = list(self._sorted_index)
+
+ # If we want newest first (reverse=True), reverse the list
+ if reverse:
+ sorted_keys.reverse()
+
+ # Yield items in the requested order
+ for date, item_id in sorted_keys:
+ if item_id in self.items: # Double check item still exists
+ yield (item_id, self.items[item_id])
+
+
+@implementer(IRecycleBin)
+class RecycleBin:
+ """Stores deleted content items"""
+
+ def __init__(self):
+ """Initialize the recycle bin utility
+
+ It will get the context (Plone site) on demand using getSite()
+ """
+ pass
+
+ def _get_context(self):
+ """Get the context (Plone site)"""
+ return getSite()
+
+ def _get_storage(self):
+ """Get the storage for recycled items"""
+ context = self._get_context()
+ annotations = IAnnotations(context)
+
+ if ANNOTATION_KEY not in annotations:
+ annotations[ANNOTATION_KEY] = RecycleBinStorage()
+
+ return annotations[ANNOTATION_KEY]
+
+ # Update property for storage to use _get_storage
+ @property
+ def storage(self):
+ return self._get_storage()
+
+ def _get_settings(self):
+ """Get recycle bin settings from registry"""
+ registry = getUtility(IRegistry)
+ return registry.forInterface(
+ IRecycleBinControlPanelSettings, prefix="plone-recyclebin"
+ )
+
+ def is_enabled(self):
+ """Check if recycle bin is enabled"""
+ try:
+ settings = self._get_settings()
+ return settings.recycling_enabled
+ except Exception as e:
+ logger.error(
+ f"Error checking recycle bin settings: {str(e)}. Recycling is disabled."
+ )
+ return False
+
+ def _get_item_title(self, obj, item_type=None):
+ """Helper method to get a meaningful title for an item"""
+ if hasattr(obj, "objectIds") or item_type == "Collection":
+ # For folders and collections
+ return (
+ obj.Title()
+ if hasattr(obj, "Title")
+ else getattr(obj, "title", "Unknown")
+ )
+ elif item_type == "CommentTree":
+ # For comment trees, generate a title including the number of comments
+ comment_count = len(obj.get("comments", []))
+ root_comment = None
+
+ # Try to find the root comment to get its text
+ for comment, _ in obj.get("comments", []):
+ if getattr(comment, "comment_id", None) == obj.get("root_comment_id"):
+ root_comment = comment
+ break
+
+ # If we found the root comment, get a preview of its text
+ comment_preview = ""
+ if root_comment and hasattr(root_comment, "text"):
+ # Take the first 30 characters of the text as a preview
+ text = getattr(root_comment, "text", "")
+ if text:
+ if len(text) > 30:
+ comment_preview = text[:30] + "..."
+ else:
+ comment_preview = text
+
+ # Create a meaningful title
+ if comment_preview:
+ return f'Comment thread: "{comment_preview}" ({comment_count} comments)'
+ else:
+ return f"Comment thread ({comment_count} comments)"
+ else:
+ # For regular items, use Title() if available
+ return (
+ obj.Title()
+ if hasattr(obj, "Title")
+ else getattr(obj, "title", "Unknown")
+ )
+
+ def _process_folder_children(self, folder_obj, folder_path):
+ """Helper method to process folder children recursively"""
+ folder_children = {}
+ for child_id in folder_obj.objectIds():
+ child = folder_obj[child_id]
+ child_path = f"{folder_path}/{child_id}"
+ # Store basic data for this child
+ child_data = {
+ "id": child_id,
+ "title": self._get_item_title(child),
+ "type": getattr(child, "portal_type", "Unknown"),
+ "path": child_path,
+ "parent_path": folder_path,
+ "deletion_date": datetime.now(),
+ "size": getattr(child, "get_size", lambda: 0)(),
+ "object": child,
+ }
+
+ # If this child is also a folder, process its children
+ if hasattr(child, "objectIds") and child.objectIds():
+ nested_children = self._process_folder_children(child, child_path)
+ if nested_children:
+ child_data["children"] = nested_children
+ child_data["children_count"] = len(nested_children)
+
+ folder_children[child_id] = child_data
+ return folder_children
+
+ def add_item(
+ self,
+ obj,
+ original_container,
+ original_path,
+ item_type=None,
+ process_children=True,
+ ):
+ """Add deleted item to recycle bin"""
+ if not self.is_enabled():
+ return None
+
+ # Get the original id but if not found then generate a unique ID for the recycled item
+ item_id = (
+ obj.getId()
+ if hasattr(obj, "getId")
+ else getattr(obj, "id", str(uuid.uuid4()))
+ )
+
+ # Add a workflow history entry about the deletion if possible
+ self._update_workflow_history(obj, "deletion")
+
+ # Generate a meaningful title
+ item_title = self._get_item_title(obj, item_type)
+
+ # Handle folders and collections specially
+ children = {}
+ if process_children and (
+ hasattr(obj, "objectIds") or item_type == "Collection"
+ ):
+ if hasattr(obj, "objectIds"):
+ # Process all children recursively
+ children = self._process_folder_children(obj, original_path)
+
+ # Store metadata about the deletion
+ parent_path = (
+ "/".join(original_container.getPhysicalPath())
+ if original_container
+ else "/".join(original_path.split("/")[:-1])
+ )
+
+ storage_data = {
+ "id": item_id,
+ "title": item_title,
+ "type": item_type or getattr(obj, "portal_type", "Unknown"),
+ "path": original_path,
+ "parent_path": parent_path,
+ "deletion_date": datetime.now(),
+ "size": getattr(obj, "get_size", lambda: 0)(),
+ "object": aq_base(obj), # Store the actual object with no acquisition chain
+ }
+
+ # Add children data if this was a folder/collection
+ if children:
+ storage_data["children"] = children
+ storage_data["children_count"] = len(children)
+
+ # Generate a unique recycle ID
+ recycle_id = str(uuid.uuid4())
+ self.storage[recycle_id] = storage_data
+
+ # Check if we need to clean up old items
+ self._check_size_limits()
+ self._purge_expired_items()
+
+ return recycle_id
+
+ def get_items(self):
+ """Return all items in recycle bin"""
+ items = []
+ # Use the pre-sorted index to get items by date (newest first)
+ for item_id, data in self.storage.get_items_sorted_by_date(reverse=True):
+ # Only copy the essential metadata instead of the entire data dictionary
+ item_data = {
+ "recycle_id": item_id,
+ "id": data.get("id", ""),
+ "title": data.get("title", ""),
+ "type": data.get("type", "Unknown"),
+ "path": data.get("path", ""),
+ "parent_path": data.get("parent_path", ""),
+ "deletion_date": data.get("deletion_date"),
+ "size": data.get("size", 0),
+ }
+
+ # Copy any other metadata but not the actual object
+ for key, value in data.items():
+ if key != "object" and key not in item_data:
+ item_data[key] = value
+
+ items.append(item_data)
+
+ return items
+
+ def get_item(self, item_id):
+ """Get a specific deleted item by ID"""
+ return self.storage.get(item_id)
+
+ def _update_workflow_history(self, obj, action_type, item_data=None):
+ """Add a workflow history entry about deletion or restoration
+
+ Args:
+ obj: The content object
+ action_type: Either 'deletion' or 'restoration'
+ item_data: The recyclebin storage data (needed for restoration to show deletion date)
+ """
+ if not hasattr(obj, "workflow_history"):
+ return
+
+ workflow_tool = getToolByName(self._get_context(), "portal_workflow")
+ chains = workflow_tool.getChainFor(obj)
+
+ if not chains:
+ return
+
+ workflow_id = chains[0]
+ history = obj.workflow_history.get(workflow_id, ())
+
+ if not history:
+ return
+
+ history = list(history)
+ current_state = history[-1].get("review_state", None) if history else None
+ user_id = getSecurityManager().getUser().getId() or "System"
+
+ entry = {
+ "action": (
+ "Moved to recycle bin"
+ if action_type == "deletion"
+ else "Restored from recycle bin"
+ ),
+ "actor": user_id,
+ "comments": (
+ "Item was deleted and moved to recycle bin"
+ if action_type == "deletion"
+ else "Restored from recycle bin after deletion"
+ ),
+ "time": DateTime(),
+ "review_state": current_state,
+ }
+
+ # Add the entry and update the history
+ history.append(entry)
+ obj.workflow_history[workflow_id] = tuple(history)
+
+ def _find_target_container(self, target_container, parent_path):
+ """Helper to find the target container for restoration
+
+ Returns a tuple (success, container, error_message) where:
+ - success: Boolean indicating if the container was found
+ - container: The container object (None if not found)
+ - error_message: Error message if success is False
+ """
+ site = self._get_context()
+ if target_container is None:
+ # Try to get the original parent
+ try:
+ target_container = site.unrestrictedTraverse(parent_path)
+ return True, target_container, None
+ except (KeyError, AttributeError):
+ # We need an explicit target container if original parent is gone
+ error_message = (
+ f"Original parent container at {parent_path} no longer exists. "
+ "You must specify a target_container to restore this item."
+ )
+ return False, None, error_message
+ return True, target_container, None
+
+ def _cleanup_child_references(self, item_data):
+ """Clean up any child items associated with a parent that was restored"""
+ if "children" in item_data and isinstance(item_data["children"], dict):
+ logger.info(
+ f"Cleaning up {len(item_data['children'])} child items from recyclebin"
+ )
+
+ # Define a function to recursively process nested folders
+ def cleanup_children(children_dict):
+ for child_id, child_data in children_dict.items():
+ # Clean up any entries that might match this child
+ child_path = child_data.get("path")
+ child_orig_id = child_data.get("id")
+
+ for storage_id, storage_data in list(self.storage.get_items()):
+ if (
+ storage_data.get("path") == child_path
+ or storage_data.get("id") == child_orig_id
+ ):
+ logger.info(
+ f"Removing child item {child_orig_id} from recyclebin"
+ )
+ if storage_id in self.storage:
+ del self.storage[storage_id]
+
+ # If this child is also a folder, recursively process its children
+ if "children" in child_data and isinstance(
+ child_data["children"], dict
+ ):
+ cleanup_children(child_data["children"])
+
+ # Start the recursive cleanup
+ cleanup_children(item_data["children"])
+
+ def _handle_existing_object(self, obj_id, target_container, obj):
+ """Handle cases where an object with the same ID already exists in target"""
+ if obj_id in target_container:
+ # Check if explicit restoration is requested
+ if getattr(obj, "_v_restoring_from_recyclebin", False):
+ # We were explicitly asked to restore this item, so delete existing item first
+ logger.info(
+ f"Removing existing object {obj_id} to restore recycled version"
+ )
+ target_container._delObject(obj_id)
+ else:
+ # Raise a meaningful exception instead of generating a new ID
+ raise ValueError(
+ f"Cannot restore item '{obj_id}' because an item with this ID already exists in the target location. "
+ f"To replace the existing item with the recycled one, use the recycle bin interface."
+ )
+
+ def restore_item(self, item_id, target_container=None):
+ """Restore item to original location or specified container"""
+ if item_id not in self.storage:
+ return None
+
+ item_data = self.storage[item_id]
+ obj = item_data["object"]
+ obj_id = item_data["id"]
+ item_type = item_data.get("type", None)
+
+ # Special handling for CommentTree (comments with replies)
+ if item_type == "CommentTree":
+ return self._restore_comment_tree(item_id, item_data, target_container)
+
+ # Special handling for Discussion Item (Comments)
+ if item_data.get("type") == "Discussion Item":
+ return self._restore_comment(item_id, item_data, target_container)
+
+ # Regular content object restoration
+ # Find the container to restore to
+ success, target_container, error_message = self._find_target_container(
+ target_container, item_data["parent_path"]
+ )
+
+ # If we couldn't find the target container, return the error message
+ if not success:
+ return {"success": False, "error": error_message}
+
+ # Make sure we don't overwrite existing content
+ self._handle_existing_object(obj_id, target_container, obj)
+
+ # Set the new ID if it was changed
+ if obj_id != item_data["id"]:
+ obj.id = obj_id
+
+ # Add object to the target container
+ target_container[obj_id] = obj
+
+ # Add a workflow history entry about the restoration
+ restored_obj = target_container[obj_id]
+ self._update_workflow_history(restored_obj, "restoration", item_data)
+ restored_obj.reindexObject()
+
+ # Remove from recycle bin
+ del self.storage[item_id]
+
+ # Clean up any child items
+ self._cleanup_child_references(item_data)
+
+ return restored_obj
+
+ def _find_parent_comment(
+ self, comment, original_in_reply_to, conversation, id_mapping=None
+ ):
+ """Helper method to find parent comment during restoration"""
+ id_mapping = id_mapping or {}
+ if original_in_reply_to is None or original_in_reply_to == 0:
+ return False, None
+
+ # First check if parent exists directly (not previously deleted)
+ if original_in_reply_to in conversation:
+ return True, original_in_reply_to
+
+ # Then check if it was restored with a different ID using mapping
+ if str(original_in_reply_to) in id_mapping:
+ # Use the ID mapping to find the new ID
+ new_parent_id = id_mapping[str(original_in_reply_to)]
+ return True, new_parent_id
+
+ # Look through all comments for original_id matching our in_reply_to
+ for comment_id in conversation.keys():
+ comment_obj = conversation[comment_id]
+ comment_original_id = getattr(comment_obj, "original_id", None)
+ if comment_original_id is not None and str(comment_original_id) == str(
+ original_in_reply_to
+ ):
+ # Found the parent with a new ID
+ return True, comment_id
+
+ # No parent found
+ return False, None
+
+ def _restore_comment(self, item_id, item_data, target_container=None):
+ """Enhanced restoration method for comments that preserves reply relationships"""
+ obj = item_data["object"]
+ site = self._get_context()
+
+ # Try to find the original conversation
+ parent_path = item_data["parent_path"]
+ try:
+ conversation = site.unrestrictedTraverse(parent_path)
+ except (KeyError, AttributeError):
+ logger.warning(
+ f"Cannot restore comment {item_id}: conversation no longer exists at {parent_path}"
+ )
+ return None
+
+ # Restore comment back to conversation
+ from plone.app.discussion.interfaces import IConversation
+
+ if not IConversation.providedBy(conversation):
+ logger.warning(
+ f"Cannot restore comment {item_id}: parent is not a conversation"
+ )
+ return None
+
+ # Store the original comment ID before restoration
+ original_id = getattr(obj, "comment_id", None)
+ original_in_reply_to = getattr(obj, "in_reply_to", None)
+
+ # Track comment relationships using a request-based dictionary
+ from zope.globalrequest import getRequest
+
+ request = getRequest()
+ if request and not hasattr(request, "_comment_restore_mapping"):
+ request._comment_restore_mapping = {}
+
+ # Initialize mapping if needed
+ mapping = getattr(request, "_comment_restore_mapping", {})
+ conversation_path = "/".join(conversation.getPhysicalPath())
+ if conversation_path not in mapping:
+ mapping[conversation_path] = {}
+
+ id_mapping = mapping[conversation_path]
+
+ # Check if the parent comment exists in the conversation
+ parent_found, new_parent_id = self._find_parent_comment(
+ obj, original_in_reply_to, conversation, id_mapping
+ )
+
+ # Update the in_reply_to reference or make it a top-level comment
+ if parent_found:
+ obj.in_reply_to = new_parent_id
+ else:
+ # If no parent was found, make this a top-level comment
+ obj.in_reply_to = None
+
+ # Store the original ID for future reference
+ if not hasattr(obj, "original_id"):
+ obj.original_id = original_id
+
+ # Add the comment to the conversation
+ new_id = conversation.addComment(obj)
+
+ # Store the mapping of original ID to new ID
+ if original_id is not None:
+ id_mapping[str(original_id)] = new_id
+
+ # Remove from recycle bin
+ del self.storage[item_id]
+
+ # Return the restored comment
+ return conversation[new_id]
+
+ def _restore_comment_tree(self, item_id, item_data, target_container=None):
+ """Restore a comment tree with all its replies while preserving relationships"""
+ comment_tree = item_data["object"]
+ root_comment_id = comment_tree.get("root_comment_id")
+ comments_to_restore = comment_tree.get("comments", [])
+
+ logger.info(
+ f"Attempting to restore comment tree {item_id} with root_comment_id: {root_comment_id}"
+ )
+ logger.info(f"Found {len(comments_to_restore)} comments to restore")
+
+ if not comments_to_restore:
+ logger.warning(
+ f"Cannot restore comment tree {item_id}: no comments found in tree"
+ )
+ return None
+
+ site = self._get_context()
+
+ # Try to find the original conversation
+ parent_path = item_data["parent_path"]
+ try:
+ conversation = site.unrestrictedTraverse(parent_path)
+ except (KeyError, AttributeError):
+ logger.warning(
+ f"Cannot restore comment tree {item_id}: conversation no longer exists at {parent_path}"
+ )
+ return None
+
+ # Restore comments back to conversation
+ from plone.app.discussion.interfaces import IConversation
+
+ if not IConversation.providedBy(conversation):
+ logger.warning(
+ f"Cannot restore comment tree {item_id}: parent is not a conversation"
+ )
+ return None
+
+ # First extract all comments and create a mapping of original IDs
+ # to comment objects for quick lookup
+ comment_dict = {}
+ id_mapping = {} # Will map original IDs to new IDs
+
+ # Process comments to build reference dictionary
+ for comment_obj, _ in comments_to_restore:
+ # Store original values we'll need for restoration
+ original_id = getattr(comment_obj, "comment_id", None)
+ original_in_reply_to = getattr(comment_obj, "in_reply_to", None)
+
+ logger.info(
+ f"Processing comment with ID: {original_id}, in_reply_to: {original_in_reply_to}"
+ )
+
+ comment_obj.original_id = (
+ original_id # Store original ID for future reference
+ )
+
+ # Store in dictionary for quick access
+ comment_dict[original_id] = {
+ "comment": comment_obj,
+ "in_reply_to": original_in_reply_to,
+ }
+
+ # Find the root comment
+ root_comment = None
+ if root_comment_id in comment_dict:
+ root_comment = comment_dict[root_comment_id]["comment"]
+ else:
+ # Try to find a top-level comment to use as root
+ for comment_id, comment_data in comment_dict.items():
+ in_reply_to = comment_data["in_reply_to"]
+ if in_reply_to == 0 or in_reply_to is None:
+ # Found a top-level comment, use as root
+ root_comment = comment_data["comment"]
+ root_comment_id = comment_id
+ break
+
+ # If still no root, use the first comment
+ if not root_comment and comment_dict:
+ first_key = list(comment_dict.keys())[0]
+ root_comment = comment_dict[first_key]["comment"]
+ root_comment_id = first_key
+
+ if not root_comment:
+ logger.error(
+ f"Cannot restore comment tree {item_id}: no valid root comment could be determined"
+ )
+ return None
+
+ # Check if the parent comment exists
+ original_in_reply_to = getattr(root_comment, "in_reply_to", None)
+ parent_found, new_parent_id = self._find_parent_comment(
+ root_comment, original_in_reply_to, conversation
+ )
+
+ if parent_found:
+ root_comment.in_reply_to = new_parent_id
+ else:
+ root_comment.in_reply_to = None
+
+ # Add the root comment to the conversation
+ new_root_id = conversation.addComment(root_comment)
+ id_mapping[root_comment_id] = new_root_id
+
+ # Now restore all child comments, skipping the root comment
+ remaining_comments = {
+ k: v for k, v in comment_dict.items() if k != root_comment_id
+ }
+
+ # Track successfully restored comments
+ restored_count = 1 # Start with 1 for root
+
+ # Keep trying to restore comments until no more can be restored
+ max_passes = 10 # Limit passes to avoid infinite loops
+ current_pass = 0
+
+ while remaining_comments and current_pass < max_passes:
+ current_pass += 1
+ restored_in_pass = 0
+
+ # Copy keys to avoid modifying dict during iteration
+ comment_ids = list(remaining_comments.keys())
+
+ for comment_id in comment_ids:
+ comment_data = remaining_comments[comment_id]
+ comment_obj = comment_data["comment"]
+ original_in_reply_to = comment_data["in_reply_to"]
+
+ # Try to find the parent in our mapping
+ parent_found = False
+ new_parent_id = None
+
+ # If original parent was the root comment
+ if str(original_in_reply_to) == str(root_comment_id):
+ parent_found = True
+ new_parent_id = new_root_id
+ # Or if it was another already restored comment
+ elif str(original_in_reply_to) in id_mapping:
+ parent_found = True
+ new_parent_id = id_mapping[str(original_in_reply_to)]
+ # Or try to find it directly in the conversation
+ else:
+ parent_found, new_parent_id = self._find_parent_comment(
+ comment_obj, original_in_reply_to, conversation, id_mapping
+ )
+
+ if parent_found:
+ # We found the parent, update reference and restore
+ comment_obj.in_reply_to = new_parent_id
+
+ # Store original ID for future reference
+ if not hasattr(comment_obj, "original_id"):
+ comment_obj.original_id = comment_id
+
+ # Add to conversation
+ try:
+ new_id = conversation.addComment(comment_obj)
+ id_mapping[comment_id] = new_id
+ del remaining_comments[comment_id]
+ restored_in_pass += 1
+ except Exception as e:
+ logger.error(f"Error restoring comment {comment_id}: {e}")
+
+ # If we didn't restore any comments in this pass and still have comments left,
+ # something is wrong with the parent references
+ if restored_in_pass == 0 and remaining_comments:
+ # Make any remaining comments top-level comments
+ restored_in_pass += self._handle_orphaned_comments(
+ remaining_comments, conversation, id_mapping
+ )
+
+ # Break out of the loop since we've tried our best
+ break
+
+ restored_count += restored_in_pass
+
+ # If all comments were restored, exit the loop
+ if not remaining_comments:
+ break
+
+ # Clean up and return
+ del self.storage[item_id]
+ logger.info(f"Restored {restored_count} comments from comment tree {item_id}")
+
+ # Return the root comment as the result
+ return conversation.get(new_root_id) if new_root_id in conversation else None
+
+ def purge_item(self, item_id) -> bool:
+ """Permanently delete an item from the recycle bin
+
+ Args:
+ item_id: The ID of the item in the recycle bin
+
+ Returns:
+ Boolean indicating success
+ """
+ if item_id not in self.storage:
+ logger.warning(f"Cannot purge item {item_id}: not found in recycle bin")
+ return False
+
+ try:
+ # Purge any nested children first if this is a folder
+ item_data = self.storage[item_id]
+ item_path = item_data.get("path", "")
+
+ if "children" in item_data and isinstance(item_data["children"], dict):
+ # Find and purge standalone recycle bin entries for each child
+ def purge_children(children_dict, parent_path):
+ for child_id, child_data in list(children_dict.items()):
+ child_path = f"{parent_path}/{child_id}"
+
+ # Find any standalone entries for this child in the recycle bin
+ for rec_id, rec_data in list(self.storage.get_items()):
+ if rec_id != item_id and rec_data.get("path") == child_path:
+ logger.info(
+ f"Purging standalone entry for child: {child_path} (ID: {rec_id})"
+ )
+ del self.storage[rec_id]
+
+ # If this child has children, recursively purge them first
+ if "children" in child_data and isinstance(
+ child_data["children"], dict
+ ):
+ purge_children(child_data["children"], child_path)
+
+ # Start the recursive purge of children
+ purge_children(item_data["children"], item_path)
+
+ # Remove the main item from storage - the object will be garbage collected
+ del self.storage[item_id]
+ logger.info(f"Item {item_id} purged from recycle bin")
+ return True
+ except Exception as e:
+ logger.error(f"Error purging item {item_id}: {str(e)}")
+ return False
+
+ def _purge_expired_items(self):
+ """Purge items that exceed the retention period
+
+ Returns:
+ Number of items purged
+ """
+ try:
+ settings = self._get_settings()
+ retention_days = settings.retention_period
+
+ # If retention_period is 0, auto-purging is disabled
+ if retention_days <= 0:
+ logger.debug("Auto-purging is disabled (retention_period = 0)")
+ return 0
+
+ cutoff_date = datetime.now() - timedelta(days=retention_days)
+ purge_count = 0
+
+ # Use sorted index for efficient date-based removal (oldest first)
+ for item_id, data in list(
+ self.storage.get_items_sorted_by_date(reverse=False)
+ ):
+ deletion_date = data.get("deletion_date")
+
+ # If item is older than retention period, purge it
+ if deletion_date and deletion_date < cutoff_date:
+ if self.purge_item(item_id):
+ purge_count += 1
+ logger.info(
+ f"Item {item_id} purged due to retention policy (deleted on {deletion_date})"
+ )
+ else:
+ # Since items are sorted by date, once we find an item newer than
+ # the cutoff date, we can stop checking
+ break
+
+ return purge_count
+
+ except Exception as e:
+ logger.error(f"Error purging expired items: {str(e)}")
+ return 0
+
+ def _check_size_limits(self):
+ """Check if the recycle bin exceeds size limits and purge oldest items if needed
+
+ This method enforces the maximum size limit for the recycle bin by removing
+ the oldest items when the limit is exceeded.
+ """
+ try:
+ settings = self._get_settings()
+ max_size_mb = settings.maximum_size
+
+ # If max_size is 0, size limiting is disabled
+ if max_size_mb <= 0:
+ logger.debug("Size limiting is disabled (maximum_size = 0)")
+ return
+
+ max_size_bytes = max_size_mb * 1024 * 1024 # Convert MB to bytes
+ total_size = 0
+ items_by_date = []
+
+ # Get items sorted by date (oldest first) and calculate total size
+ for item_id, data in self.storage.get_items_sorted_by_date(reverse=False):
+ size = data.get("size", 0)
+ total_size += size
+ items_by_date.append((item_id, size))
+
+ # If we're under the limit, nothing to do
+ if total_size <= max_size_bytes:
+ return
+
+ # Log the size excess
+ logger.info(
+ f"Recycle bin size ({total_size / (1024 * 1024):.2f} MB) exceeds limit ({max_size_mb} MB)"
+ )
+
+ # Remove oldest items until we're under the limit
+ items_purged = 0
+ for item_id, size in items_by_date:
+ # Stop once we're under the limit
+ if total_size <= max_size_bytes:
+ break
+
+ if self.purge_item(item_id):
+ total_size -= size
+ items_purged += 1
+
+ if items_purged:
+ logger.info(
+ f"Purged {items_purged} oldest item{'s' if items_purged != 1 else ''} due to size constraints"
+ )
+
+ except Exception as e:
+ logger.error(f"Error enforcing size limits: {str(e)}")
+
+ def _handle_orphaned_comments(self, remaining_comments, conversation, id_mapping):
+ """Handle comments whose parents cannot be found
+
+ Makes orphaned comments top-level comments rather than losing them.
+
+ Args:
+ remaining_comments: Dictionary of remaining comments to process
+ conversation: The conversation container
+ id_mapping: Mapping of original IDs to new IDs
+ """
+ orphaned_count = 0
+ for comment_id, comment_data in list(remaining_comments.items()):
+ try:
+ comment_obj = comment_data["comment"]
+ # Make it a top-level comment
+ comment_obj.in_reply_to = None
+
+ # Ensure original_id is preserved
+ if not hasattr(comment_obj, "original_id"):
+ comment_obj.original_id = comment_id
+
+ # Add to conversation
+ new_id = conversation.addComment(comment_obj)
+ id_mapping[comment_id] = new_id
+ del remaining_comments[comment_id]
+ orphaned_count += 1
+ logger.info(
+ f"Restored orphaned comment {comment_id} as top-level comment"
+ )
+ except Exception as e:
+ logger.error(f"Error restoring orphaned comment {comment_id}: {str(e)}")
+
+ return orphaned_count
diff --git a/Products/CMFPlone/tests/robot/test_controlpanel_actions.robot b/Products/CMFPlone/tests/robot/test_controlpanel_actions.robot
index 68ad1f1007..904a1630f8 100644
--- a/Products/CMFPlone/tests/robot/test_controlpanel_actions.robot
+++ b/Products/CMFPlone/tests/robot/test_controlpanel_actions.robot
@@ -91,7 +91,7 @@ I add a new action
Type Text //input[@name="form.widgets.id"] favorites
Click //div[contains(@class,'pattern-modal-buttons')]/button
Wait For Condition Text //body contains favorites
- Click //*[@id="content-core"]/section[6]/section/ol/li[8]/form/a
+ Click //*[@id="content-core"]/section[6]/section/ol/li[9]/form/a
Wait For Condition Text //body contains Action Settings
Type Text //input[@name="form.widgets.title"] My favorites
Type Text //input[@name="form.widgets.url_expr"] string:\${globals_view/navigationRootUrl}/favorites
@@ -111,7 +111,7 @@ I delete an action
Click //*[@id="content-core"]/section[2]/section/ol/li[1]/form/button[@name="delete"]
I change category of an action
- Click //*[@id="content-core"]/section[6]/section/ol/li[7]/form/a
+ Click //*[@id="content-core"]/section[6]/section/ol/li[8]/form/a
Wait For Condition Text //body contains Action Settings
Select Options By //select[@name="form.widgets.category:list"] value portal_tabs
Click //div[contains(@class,'pattern-modal-buttons')]/button
diff --git a/Products/CMFPlone/tests/test_recyclebin.py b/Products/CMFPlone/tests/test_recyclebin.py
new file mode 100644
index 0000000000..fee8df9030
--- /dev/null
+++ b/Products/CMFPlone/tests/test_recyclebin.py
@@ -0,0 +1,594 @@
+from datetime import datetime
+from datetime import timedelta
+from plone.app.testing import IntegrationTesting
+from plone.app.testing import login
+from plone.app.testing import PLONE_FIXTURE
+from plone.app.testing import setRoles
+from plone.app.testing import TEST_USER_ID
+from plone.app.testing import TEST_USER_NAME
+from plone.base.interfaces.recyclebin import IRecycleBin
+from plone.registry.interfaces import IRegistry
+from Products.CMFPlone.controlpanel.browser.recyclebin import (
+ IRecycleBinControlPanelSettings,
+)
+from Products.CMFPlone.recyclebin import ANNOTATION_KEY
+from unittest import mock
+from zope.annotation.interfaces import IAnnotations
+from zope.component import getUtility
+
+import unittest
+
+
+class RecycleBinTestCase(unittest.TestCase):
+ """Base test case for RecycleBin tests"""
+
+ layer = IntegrationTesting(
+ bases=(PLONE_FIXTURE,), name="RecycleBinTests:Integration"
+ )
+
+ def setUp(self):
+ """Set up the test environment"""
+ self.portal = self.layer["portal"]
+ self.request = self.layer["request"]
+
+ # Log in as a manager
+ setRoles(self.portal, TEST_USER_ID, ["Manager"])
+ login(self.portal, TEST_USER_NAME)
+
+ # Get the registry to access recycle bin settings
+ self.registry = getUtility(IRegistry)
+
+ # Enable the recycle bin
+ self.registry.forInterface(
+ IRecycleBinControlPanelSettings, prefix="plone-recyclebin"
+ ).recycling_enabled = True
+
+ # Set a short retention period for testing
+ self.registry.forInterface(
+ IRecycleBinControlPanelSettings, prefix="plone-recyclebin"
+ ).retention_period = 30
+
+ # Set a reasonable maximum size
+ self.registry.forInterface(
+ IRecycleBinControlPanelSettings, prefix="plone-recyclebin"
+ ).maximum_size = 100 # 100 MB
+
+ # Get the recycle bin utility
+ self.recyclebin = getUtility(IRecycleBin)
+
+ # Clear any existing items from the recycle bin
+ annotations = IAnnotations(self.portal)
+ if ANNOTATION_KEY in annotations:
+ del annotations[ANNOTATION_KEY]
+
+ def tearDown(self):
+ """Clean up after the test"""
+ # Clear the recycle bin
+ annotations = IAnnotations(self.portal)
+ if ANNOTATION_KEY in annotations:
+ del annotations[ANNOTATION_KEY]
+
+
+class RecycleBinSetupTests(RecycleBinTestCase):
+ """Tests for RecycleBin setup and configuration"""
+
+ def test_recyclebin_enabled(self):
+ """Test that the recycle bin is initialized and enabled"""
+ self.assertTrue(self.recyclebin.is_enabled())
+
+ def test_recyclebin_storage(self):
+ """Test that the storage is correctly initialized"""
+ storage = self.recyclebin.storage
+ self.assertEqual(len(storage), 0)
+ self.assertEqual(list(storage.keys()), [])
+
+ def test_recyclebin_settings(self):
+ """Test that the settings are correctly initialized"""
+ settings = self.recyclebin._get_settings()
+ self.assertTrue(settings.recycling_enabled)
+ self.assertEqual(settings.retention_period, 30)
+ self.assertEqual(settings.maximum_size, 100)
+
+
+class RecycleBinContentTests(RecycleBinTestCase):
+ """Tests for deleting and restoring basic content items"""
+
+ def setUp(self):
+ """Set up test content"""
+ super().setUp()
+
+ # Create a page
+ self.portal.invokeFactory("Document", "test-page", title="Test Page")
+ self.page = self.portal["test-page"]
+
+ # Create a news item
+ self.portal.invokeFactory("News Item", "test-news", title="Test News")
+ self.news = self.portal["test-news"]
+
+ def test_delete_restore_page(self):
+ """Test deleting and restoring a page"""
+ # Get the original path
+ page_path = "/".join(self.page.getPhysicalPath())
+ page_id = self.page.getId()
+ page_title = self.page.Title()
+
+ # Delete the page by adding it to the recycle bin
+ recycle_id = self.recyclebin.add_item(self.page, self.portal, page_path)
+
+ # Verify it was added to the recycle bin
+ self.assertIsNotNone(recycle_id)
+ self.assertIn(recycle_id, self.recyclebin.storage)
+
+ # Verify the page metadata was stored correctly
+ item_data = self.recyclebin.storage[recycle_id]
+ self.assertEqual(item_data["id"], page_id)
+ self.assertEqual(item_data["title"], page_title)
+ self.assertEqual(item_data["type"], "Document")
+ self.assertEqual(item_data["path"], page_path)
+ self.assertIsInstance(item_data["deletion_date"], datetime)
+
+ # Verify the page is in the recycle bin listing
+ items = self.recyclebin.get_items()
+ self.assertEqual(len(items), 1)
+ self.assertEqual(items[0]["id"], page_id)
+ self.assertEqual(items[0]["recycle_id"], recycle_id)
+
+ # Verify we can get the item directly
+ item = self.recyclebin.get_item(recycle_id)
+ self.assertEqual(item["id"], page_id)
+
+ # Remove the original page from the portal to simulate deletion
+ del self.portal[page_id]
+ self.assertNotIn(page_id, self.portal)
+
+ # Restore the page
+ restored_page = self.recyclebin.restore_item(recycle_id)
+
+ # Verify the page was restored
+ self.assertIsNotNone(restored_page)
+ self.assertEqual(restored_page.getId(), page_id)
+ self.assertEqual(restored_page.Title(), page_title)
+
+ # Verify the page is back in the portal
+ self.assertIn(page_id, self.portal)
+
+ # Verify the item was removed from the recycle bin
+ self.assertNotIn(recycle_id, self.recyclebin.storage)
+ items = self.recyclebin.get_items()
+
+ def test_delete_restore_news(self):
+ """Test deleting and restoring a news item"""
+ # Get the original path
+ news_path = "/".join(self.news.getPhysicalPath())
+ news_id = self.news.getId()
+ news_title = self.news.Title()
+
+ # Delete the news item by adding it to the recycle bin
+ recycle_id = self.recyclebin.add_item(self.news, self.portal, news_path)
+
+ # Verify it was added to the recycle bin
+ self.assertIsNotNone(recycle_id)
+ self.assertIn(recycle_id, self.recyclebin.storage)
+
+ # Verify the news metadata was stored correctly
+ item_data = self.recyclebin.storage[recycle_id]
+ self.assertEqual(item_data["id"], news_id)
+ self.assertEqual(item_data["title"], news_title)
+ self.assertEqual(item_data["type"], "News Item")
+ self.assertEqual(item_data["path"], news_path)
+ self.assertIsInstance(item_data["deletion_date"], datetime)
+
+ # Remove the original news item from the portal to simulate deletion
+ del self.portal[news_id]
+ self.assertNotIn(news_id, self.portal)
+
+ # Restore the news item
+ restored_news = self.recyclebin.restore_item(recycle_id)
+
+ # Verify the news item was restored
+ self.assertIsNotNone(restored_news)
+ self.assertEqual(restored_news.getId(), news_id)
+ self.assertEqual(restored_news.Title(), news_title)
+
+ # Verify the news item is back in the portal
+ self.assertIn(news_id, self.portal)
+
+ # Verify the item was removed from the recycle bin
+ self.assertNotIn(recycle_id, self.recyclebin.storage)
+
+ def test_purge_item(self):
+ """Test purging an item from the recycle bin"""
+ # Delete the page
+ page_path = "/".join(self.page.getPhysicalPath())
+ recycle_id = self.recyclebin.add_item(self.page, self.portal, page_path)
+
+ # Verify it was added to the recycle bin
+ self.assertIn(recycle_id, self.recyclebin.storage)
+
+ # Purge the item
+ result = self.recyclebin.purge_item(recycle_id)
+
+ # Verify the item was purged
+ self.assertTrue(result)
+ self.assertNotIn(recycle_id, self.recyclebin.storage)
+
+ # Verify the item is not in the listing
+ items = self.recyclebin.get_items()
+ self.assertEqual(len(items), 0)
+
+
+class RecycleBinFolderTests(RecycleBinTestCase):
+ """Tests for deleting and restoring folder structures"""
+
+ def setUp(self):
+ """Set up test content"""
+ super().setUp()
+
+ # Create a folder
+ self.portal.invokeFactory("Folder", "test-folder", title="Test Folder")
+ self.folder = self.portal["test-folder"]
+
+ # Add content to the folder
+ self.folder.invokeFactory("Document", "folder-page", title="Folder Page")
+ self.folder.invokeFactory("News Item", "folder-news", title="Folder News")
+
+ def test_delete_restore_folder(self):
+ """Test deleting and restoring a folder with content"""
+ # Get the original path
+ folder_path = "/".join(self.folder.getPhysicalPath())
+ folder_id = self.folder.getId()
+ folder_title = self.folder.Title()
+
+ # Delete the folder by adding it to the recycle bin
+ recycle_id = self.recyclebin.add_item(self.folder, self.portal, folder_path)
+
+ # Verify it was added to the recycle bin
+ self.assertIsNotNone(recycle_id)
+ self.assertIn(recycle_id, self.recyclebin.storage)
+
+ # Verify the folder metadata was stored correctly
+ item_data = self.recyclebin.storage[recycle_id]
+ self.assertEqual(item_data["id"], folder_id)
+ self.assertEqual(item_data["title"], folder_title)
+ self.assertEqual(item_data["type"], "Folder")
+ self.assertEqual(item_data["path"], folder_path)
+ self.assertIsInstance(item_data["deletion_date"], datetime)
+
+ # Verify the children were tracked
+ self.assertIn("children", item_data)
+ self.assertEqual(item_data["children_count"], 2)
+ self.assertIn("folder-page", item_data["children"])
+ self.assertIn("folder-news", item_data["children"])
+
+ # Remove the original folder from the portal to simulate deletion
+ del self.portal[folder_id]
+ self.assertNotIn(folder_id, self.portal)
+
+ # Restore the folder
+ restored_folder = self.recyclebin.restore_item(recycle_id)
+
+ # Verify the folder was restored
+ self.assertIsNotNone(restored_folder)
+ self.assertEqual(restored_folder.getId(), folder_id)
+ self.assertEqual(restored_folder.Title(), folder_title)
+
+ # Verify the folder is back in the portal
+ self.assertIn(folder_id, self.portal)
+
+ # Verify the contents were restored
+ self.assertIn("folder-page", restored_folder)
+ self.assertIn("folder-news", restored_folder)
+ self.assertEqual(restored_folder["folder-page"].Title(), "Folder Page")
+ self.assertEqual(restored_folder["folder-news"].Title(), "Folder News")
+
+ # Verify the item was removed from the recycle bin
+ self.assertNotIn(recycle_id, self.recyclebin.storage)
+
+ def test_purge_folder_with_contents(self):
+ """Test purging a folder with content completely removes all related items"""
+ # Get the original path
+ folder_path = "/".join(self.folder.getPhysicalPath())
+ page_path = "/".join(self.folder["folder-page"].getPhysicalPath())
+ news_path = "/".join(self.folder["folder-news"].getPhysicalPath())
+
+ # Delete the folder and its contents by adding them individually to the recycle bin
+ # This simulates how the recycle bin typically receives items when a folder is deleted
+ folder_recycle_id = self.recyclebin.add_item(
+ self.folder, self.portal, folder_path
+ )
+ page_recycle_id = self.recyclebin.add_item(
+ self.folder["folder-page"], self.folder, page_path
+ )
+ news_recycle_id = self.recyclebin.add_item(
+ self.folder["folder-news"], self.folder, news_path
+ )
+
+ # Verify all items were added to the recycle bin
+ self.assertIn(folder_recycle_id, self.recyclebin.storage)
+ self.assertIn(page_recycle_id, self.recyclebin.storage)
+ self.assertIn(news_recycle_id, self.recyclebin.storage)
+
+ # Get all items before purging
+ before_items = self.recyclebin.get_items()
+ self.assertEqual(len(before_items), 3)
+
+ # Purge just the folder item
+ result = self.recyclebin.purge_item(folder_recycle_id)
+ self.assertTrue(result)
+
+ # Verify all related items were purged
+ self.assertNotIn(folder_recycle_id, self.recyclebin.storage)
+ self.assertNotIn(page_recycle_id, self.recyclebin.storage)
+ self.assertNotIn(news_recycle_id, self.recyclebin.storage)
+
+ # Verify no items remain in the listing
+ after_items = self.recyclebin.get_items()
+ self.assertEqual(len(after_items), 0)
+
+
+class RecycleBinNestedFolderTests(RecycleBinTestCase):
+ """Tests for deleting and restoring nested folder structures"""
+
+ def setUp(self):
+ """Set up test content"""
+ super().setUp()
+
+ # Create a parent folder
+ self.portal.invokeFactory("Folder", "parent-folder", title="Parent Folder")
+ self.parent_folder = self.portal["parent-folder"]
+
+ # Create a nested folder
+ self.parent_folder.invokeFactory("Folder", "child-folder", title="Child Folder")
+ self.child_folder = self.parent_folder["child-folder"]
+
+ # Add content to the nested folder
+ self.child_folder.invokeFactory("Document", "nested-page", title="Nested Page")
+ self.child_folder.invokeFactory("News Item", "nested-news", title="Nested News")
+
+ # Create another level of nesting
+ self.child_folder.invokeFactory(
+ "Folder", "grandchild-folder", title="Grandchild Folder"
+ )
+ self.grandchild_folder = self.child_folder["grandchild-folder"]
+
+ # Add content to the grandchild folder
+ self.grandchild_folder.invokeFactory("Document", "deep-page", title="Deep Page")
+
+ def test_delete_restore_nested_folder(self):
+ """Test deleting and restoring a nested folder structure"""
+ # Get the original paths
+ parent_path = "/".join(self.parent_folder.getPhysicalPath())
+ parent_id = self.parent_folder.getId()
+
+ # Delete the parent folder by adding it to the recycle bin
+ recycle_id = self.recyclebin.add_item(
+ self.parent_folder, self.portal, parent_path
+ )
+
+ # Verify it was added to the recycle bin
+ self.assertIsNotNone(recycle_id)
+ self.assertIn(recycle_id, self.recyclebin.storage)
+
+ # Verify the parent folder metadata was stored correctly
+ item_data = self.recyclebin.storage[recycle_id]
+ self.assertEqual(item_data["id"], parent_id)
+ self.assertEqual(item_data["type"], "Folder")
+
+ # Verify the children were tracked
+ self.assertIn("children", item_data)
+ self.assertEqual(item_data["children_count"], 1)
+ self.assertIn("child-folder", item_data["children"])
+
+ # Verify the nested children were tracked
+ child_data = item_data["children"]["child-folder"]
+ self.assertIn("children", child_data)
+ self.assertEqual(child_data["children_count"], 3)
+ self.assertIn("nested-page", child_data["children"])
+ self.assertIn("nested-news", child_data["children"])
+ self.assertIn("grandchild-folder", child_data["children"])
+
+ # Verify the deepest level was tracked
+ grandchild_data = child_data["children"]["grandchild-folder"]
+ self.assertIn("children", grandchild_data)
+ self.assertEqual(grandchild_data["children_count"], 1)
+ self.assertIn("deep-page", grandchild_data["children"])
+
+ # Remove the parent folder from the portal to simulate deletion
+ del self.portal[parent_id]
+ self.assertNotIn(parent_id, self.portal)
+
+ # Restore the parent folder
+ restored_folder = self.recyclebin.restore_item(recycle_id)
+
+ # Verify the parent folder was restored
+ self.assertIsNotNone(restored_folder)
+ self.assertEqual(restored_folder.getId(), parent_id)
+ self.assertIn(parent_id, self.portal)
+
+ # Verify the child folder was restored
+ self.assertIn("child-folder", restored_folder)
+ restored_child = restored_folder["child-folder"]
+
+ # Verify the nested content was restored
+ self.assertIn("nested-page", restored_child)
+ self.assertIn("nested-news", restored_child)
+ self.assertIn("grandchild-folder", restored_child)
+
+ # Verify the deepest level was restored
+ restored_grandchild = restored_child["grandchild-folder"]
+ self.assertIn("deep-page", restored_grandchild)
+
+ # Verify the item was removed from the recycle bin
+ self.assertNotIn(recycle_id, self.recyclebin.storage)
+
+ def test_delete_restore_middle_folder(self):
+ """Test deleting and restoring a middle-level folder"""
+ # Get the original paths
+ child_path = "/".join(self.child_folder.getPhysicalPath())
+ child_id = self.child_folder.getId()
+
+ # Delete the child folder by adding it to the recycle bin
+ recycle_id = self.recyclebin.add_item(
+ self.child_folder, self.parent_folder, child_path
+ )
+
+ # Verify it was added to the recycle bin
+ self.assertIsNotNone(recycle_id)
+ self.assertIn(recycle_id, self.recyclebin.storage)
+
+ # Verify the child folder metadata was stored correctly
+ item_data = self.recyclebin.storage[recycle_id]
+ self.assertEqual(item_data["id"], child_id)
+ self.assertEqual(item_data["type"], "Folder")
+
+ # Verify the nested children were tracked
+ self.assertIn("children", item_data)
+ self.assertEqual(item_data["children_count"], 3)
+
+ # Remove the child folder from the parent folder to simulate deletion
+ del self.parent_folder[child_id]
+ self.assertNotIn(child_id, self.parent_folder)
+
+ # Restore the child folder
+ restored_folder = self.recyclebin.restore_item(recycle_id)
+
+ # Verify the child folder was restored
+ self.assertIsNotNone(restored_folder)
+ self.assertEqual(restored_folder.getId(), child_id)
+ self.assertIn(child_id, self.parent_folder)
+
+ # Verify the nested content was restored
+ self.assertIn("nested-page", restored_folder)
+ self.assertIn("nested-news", restored_folder)
+ self.assertIn("grandchild-folder", restored_folder)
+
+ # Verify the deepest level was restored
+ restored_grandchild = restored_folder["grandchild-folder"]
+ self.assertIn("deep-page", restored_grandchild)
+
+ # Verify the item was removed from the recycle bin
+ self.assertNotIn(recycle_id, self.recyclebin.storage)
+
+
+class RecycleBinExpirationTests(RecycleBinTestCase):
+ """Tests for recyclebin expiration and size limit functionality"""
+
+ def test_purge_expired_items(self):
+ """Test purging expired items based on retention period"""
+ # Create a page
+ self.portal.invokeFactory("Document", "expired-page", title="Expired Page")
+ page = self.portal["expired-page"]
+ page_path = "/".join(page.getPhysicalPath())
+
+ # Add it to the recycle bin
+ recycle_id = self.recyclebin.add_item(page, self.portal, page_path)
+
+ # Verify it was added
+ self.assertIn(recycle_id, self.recyclebin.storage)
+
+ # Mock the deletion date to be older than the retention period
+ with mock.patch.dict(
+ self.recyclebin.storage[recycle_id],
+ {"deletion_date": datetime.now() - timedelta(days=31)},
+ ):
+ # Call _purge_expired_items
+ purged_count = self.recyclebin._purge_expired_items()
+
+ # Verify the item was purged
+ self.assertEqual(purged_count, 1)
+ self.assertNotIn(recycle_id, self.recyclebin.storage)
+
+
+class RecycleBinRestoreEdgeCaseTests(RecycleBinTestCase):
+ """Tests for edge cases when restoring items"""
+
+ def test_restore_with_parent_gone(self):
+ """Test restoring an item when its parent container is gone"""
+ # Create a folder and a document inside it
+ self.portal.invokeFactory("Folder", "temp-folder", title="Temporary Folder")
+ folder = self.portal["temp-folder"]
+ folder.invokeFactory("Document", "orphan-page", title="Orphan Page")
+ page = folder["orphan-page"]
+ page_path = "/".join(page.getPhysicalPath())
+
+ # Add the page to the recycle bin
+ recycle_id = self.recyclebin.add_item(page, folder, page_path)
+
+ # Delete the folder to simulate parent container being gone
+ del self.portal["temp-folder"]
+
+ # Trying to restore without a target container should return an error dictionary
+ result = self.recyclebin.restore_item(recycle_id)
+ self.assertIsInstance(result, dict)
+ self.assertFalse(
+ result.get("success", True)
+ ) # Should be marked as unsuccessful
+ self.assertIn("error", result) # Should contain an error message
+
+ # Now restore with an explicit target container
+ restored_page = self.recyclebin.restore_item(
+ recycle_id, target_container=self.portal
+ )
+
+ # Verify the page was restored to the portal
+ self.assertIsNotNone(restored_page)
+ self.assertEqual(restored_page.getId(), "orphan-page")
+ self.assertIn("orphan-page", self.portal)
+
+ def test_restore_with_name_conflict(self):
+ """Test restoring an item when an item with same id already exists"""
+ # Create a page
+ self.portal.invokeFactory("Document", "conflict-page2", title="Original Page")
+ page = self.portal["conflict-page2"]
+ page_path = "/".join(page.getPhysicalPath())
+ page_id = page.getId()
+
+ # Add it to the recycle bin
+ recycle_id = self.recyclebin.add_item(page, self.portal, page_path)
+
+ # Remove the original page from the portal to simulate deletion
+ del self.portal[page_id]
+ self.assertNotIn(page_id, self.portal)
+
+ # Create another page with the same ID
+ self.portal.invokeFactory(
+ "Document", "conflict-page2", title="Replacement Page"
+ )
+
+ # Since the ID already exists, it should raise an error
+ with self.assertRaises(ValueError):
+ # Restore the item
+ self.recyclebin.restore_item(recycle_id)
+
+ def test_restore_with_parent_gone_to_target(self):
+ """Test restoring an item when its parent container is gone, should restore to target container"""
+ # Create a folder and a document inside it
+ self.portal.invokeFactory("Folder", "parent-folder", title="Parent Folder")
+ folder = self.portal["parent-folder"]
+ folder.invokeFactory("Document", "child-page", title="Child Page")
+ page = folder["child-page"]
+ page_path = "/".join(page.getPhysicalPath())
+
+ # Add the page to the recycle bin
+ recycle_id = self.recyclebin.add_item(page, folder, page_path)
+
+ # Delete the folder to simulate parent container being gone
+ del self.portal["parent-folder"]
+
+ # Create a new target folder
+ self.portal.invokeFactory("Folder", "target-folder", title="Target Folder")
+ target_folder = self.portal["target-folder"]
+
+ # Now restore with the target folder as container
+ restored_page = self.recyclebin.restore_item(
+ recycle_id, target_container=target_folder
+ )
+
+ # Verify the page was restored to the target folder
+ self.assertIsNotNone(restored_page)
+ self.assertEqual(restored_page.getId(), "child-page")
+ self.assertIn("child-page", target_folder)
+ self.assertEqual(target_folder["child-page"].Title(), "Child Page")
+
+ # Verify the item was removed from the recycle bin
+ self.assertNotIn(recycle_id, self.recyclebin.storage)
diff --git a/news/2966.feature b/news/2966.feature
new file mode 100644
index 0000000000..30d4db4c5b
--- /dev/null
+++ b/news/2966.feature
@@ -0,0 +1 @@
+Added recycle bin feature. @rohnsha0
\ No newline at end of file