From 08e9722aa0bc2df851d940c9b82e659007a22f8a Mon Sep 17 00:00:00 2001 From: Markus Einarsson <86156109+einarssonm@users.noreply.github.com> Date: Fri, 10 Oct 2025 23:28:15 +0200 Subject: [PATCH] Add extraction of single ZIP input (slow parsing workaround) --- Dockerfile | 1 + src/log2timeline.py | 27 +++++++++++++++++++++++++++ 2 files changed, 28 insertions(+) diff --git a/Dockerfile b/Dockerfile index a0f9dbc..b076d58 100644 --- a/Dockerfile +++ b/Dockerfile @@ -13,6 +13,7 @@ RUN apt-get update && apt-get install -y --no-install-recommends \ software-properties-common \ python3-pip \ python3-poetry \ + p7zip-full \ gpg-agent \ wget \ tzdata \ diff --git a/src/log2timeline.py b/src/log2timeline.py index 1d17eb7..f870f09 100644 --- a/src/log2timeline.py +++ b/src/log2timeline.py @@ -20,6 +20,7 @@ from celery import signals from celery.utils.log import get_task_logger from openrelik_common.logging import Logger +from openrelik_worker_common.archive_utils import extract_archive from openrelik_worker_common.file_utils import create_output_file from openrelik_worker_common.task_utils import ( create_task_result, @@ -84,6 +85,12 @@ "type": "textarea", "required": False, }, + { + "name": "extract-single-zip", + "label": "Extract single ZIP input", + "description": "Select if single ZIP input should be extracted (slow $MFT parsing workaround)", + "type": "checkbox", + }, ], } @@ -201,6 +208,26 @@ def log2timeline( filename = os.path.basename(input_file.get("path")) os.link(input_file.get("path"), f"{temp_dir}/{filename}") command.append(temp_dir) + + # Handle extraction of single ZIP input before launching log2timeline.py (slow $MFT parsing workaround) + elif input_files[0].get("path") and input_files[0].get("path").lower().endswith(".zip") and task_config and task_config.get("extract-single-zip") is True: + logger.info(f"Extracting {input_files[0].get('path')}") + log_file = create_output_file( + output_path, + display_name=f"extract_archives_{input_files[0].get('display_name')}.log", + ) + + try: + (command_string, temp_dir) = extract_archive( + input_files[0], output_path, log_file.path + ) + except Exception as e: + logger.error(f"extract_archive failed: {e}") + raise + + logger.info(f"Executed extract_archive command: {command_string}") + command.append(temp_dir) + else: command.append(input_files[0].get("path"))