From d415a41bc3bab4dd4d9d9951c904e86849acefb9 Mon Sep 17 00:00:00 2001
From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com>
Date: Mon, 9 Mar 2026 17:06:10 +0000
Subject: [PATCH 01/13] Initial plan
From cb291e6c90fcce7dfde442eefac138c26b939c29 Mon Sep 17 00:00:00 2001
From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com>
Date: Mon, 9 Mar 2026 17:23:51 +0000
Subject: [PATCH 02/13] Add media support for LLM prompter: handle zip archives
with images/video/audio
Co-authored-by: sal-uva <10960315+sal-uva@users.noreply.github.com>
---
common/lib/llm.py | 116 ++-
processors/machine_learning/llm_prompter.py | 811 +++++++++++++-------
2 files changed, 612 insertions(+), 315 deletions(-)
diff --git a/common/lib/llm.py b/common/lib/llm.py
index ed949578b..581b312d3 100644
--- a/common/lib/llm.py
+++ b/common/lib/llm.py
@@ -1,4 +1,6 @@
import json
+import base64
+import mimetypes
import requests
from pathlib import Path
from typing import List, Optional, Union
@@ -128,6 +130,7 @@ def generate_text(
system_prompt: Optional[str] = None,
temperature: float = 0.1,
files: Optional[List[Union[str, Path, dict]]] = None,
+ media_files: Optional[List[Union[str, Path]]] = None,
) -> BaseMessage:
"""
Supports string input or LangChain message list, with optional multimodal files.
@@ -135,7 +138,8 @@ def generate_text(
:param messages: Text prompt or list of LangChain messages
:param system_prompt: Optional system prompt
:param temperature: Temperature for generation
- :param files: Optional list of file paths or content dicts for multimodal input
+ :param files: Optional list of media URLs for multimodal input
+ :param media_files: Optional list of local file paths for multimodal input (base64-encoded)
:returns: Generated response message
"""
if isinstance(messages, str):
@@ -144,8 +148,12 @@ def generate_text(
lc_messages.append(SystemMessage(content=system_prompt))
# Create multimodal content if files are provided
- if files:
- multimodal_content = self.create_multimodal_content(messages, files)
+ if files or media_files:
+ multimodal_content = self.create_multimodal_content(
+ messages,
+ media_urls=files,
+ media_files=media_files,
+ )
lc_messages.append(HumanMessage(content=multimodal_content))
else:
lc_messages.append(HumanMessage(content=messages))
@@ -166,32 +174,48 @@ def generate_text(
def create_multimodal_content(
self,
text: str,
- image_urls: Optional[List[str]] = None,
+ media_urls: Optional[List[str]] = None,
+ media_files: Optional[List[Union[str, Path]]] = None,
) -> List[dict]:
"""
- Create multimodal content structure for LangChain messages with media URLs.
- Only supports image URLs for now.
+ Create multimodal content structure for LangChain messages with media URLs
+ and/or local media files (base64-encoded).
+
+ Supports images, video, and audio depending on the provider and model.
:param text: Text content
- :param image_urls: List of media URLs (http/https)
+ :param media_urls: List of media URLs (http/https)
+ :param media_files: List of local file paths to encode as base64
:returns: List of content blocks
"""
content = []
- # Add image URLs first
- if image_urls:
- for url in image_urls:
+ # Add media URLs
+ if media_urls:
+ for url in media_urls:
if not isinstance(url, str):
- raise ValueError(f"Image URL must be a string, got {type(url)}")
+ raise ValueError(f"Media URL must be a string, got {type(url)}")
- # Format based on provider
- if self.provider == "anthropic":
- content.append(
- {"type": "image", "source": {"type": "url", "url": url}}
- )
- else:
- # OpenAI-style format
- content.append({"type": "image_url", "image_url": {"url": url}})
+ mime_type = mimetypes.guess_type(url)[0] or "image/jpeg"
+ media_category = mime_type.split("/")[0] # "image", "video", or "audio"
+ content.append(self._format_media_block(url=url, mime_type=mime_type, media_category=media_category))
+
+ # Add base64-encoded local files
+ if media_files:
+ for file_path in media_files:
+ file_path = Path(file_path)
+ if not file_path.exists():
+ raise ValueError(f"Media file not found: {file_path}")
+
+ mime_type = mimetypes.guess_type(str(file_path))[0] or "application/octet-stream"
+ media_category = mime_type.split("/")[0]
+
+ with file_path.open("rb") as f:
+ b64_data = base64.b64encode(f.read()).decode("utf-8")
+
+ content.append(self._format_media_block(
+ b64_data=b64_data, mime_type=mime_type, media_category=media_category
+ ))
# Add text content
if text:
@@ -199,6 +223,60 @@ def create_multimodal_content(
return content
+ def _format_media_block(
+ self,
+ url: Optional[str] = None,
+ b64_data: Optional[str] = None,
+ mime_type: str = "image/jpeg",
+ media_category: str = "image",
+ ) -> dict:
+ """
+ Format a single media block for the appropriate provider.
+
+ :param url: Media URL (if URL-based)
+ :param b64_data: Base64-encoded data (if file-based)
+ :param mime_type: MIME type of the media
+ :param media_category: "image", "video", or "audio"
+ :returns: Provider-formatted content block
+ """
+ if self.provider == "anthropic":
+ if media_category == "image":
+ if url:
+ return {"type": "image", "source": {"type": "url", "url": url}}
+ else:
+ return {"type": "image", "source": {
+ "type": "base64", "media_type": mime_type, "data": b64_data
+ }}
+ else:
+ # Anthropic uses document blocks for video/audio
+ if url:
+ return {"type": "document", "source": {"type": "url", "url": url}}
+ else:
+ return {"type": "document", "source": {
+ "type": "base64", "media_type": mime_type, "data": b64_data
+ }}
+ elif self.provider == "google":
+ if url:
+ return {"type": "image_url", "image_url": {"url": url}}
+ else:
+ data_uri = f"data:{mime_type};base64,{b64_data}"
+ if media_category == "image":
+ return {"type": "image_url", "image_url": {"url": data_uri}}
+ else:
+ # Google Gemini supports inline_data for all media types via data URIs
+ return {"type": "image_url", "image_url": {"url": data_uri}}
+ else:
+ # OpenAI-style format (OpenAI, Mistral, DeepSeek, Ollama, LM Studio, vLLM)
+ if url:
+ return {"type": "image_url", "image_url": {"url": url}}
+ else:
+ data_uri = f"data:{mime_type};base64,{b64_data}"
+ if media_category == "audio" and self.provider == "openai":
+ return {"type": "input_audio", "input_audio": {
+ "data": b64_data, "format": mime_type.split("/")[-1]
+ }}
+ return {"type": "image_url", "image_url": {"url": data_uri}}
+
def set_structure(self, json_schema):
if not json_schema:
raise ValueError("json_schema is None")
diff --git a/processors/machine_learning/llm_prompter.py b/processors/machine_learning/llm_prompter.py
index 57d8ee496..651bbcb13 100644
--- a/processors/machine_learning/llm_prompter.py
+++ b/processors/machine_learning/llm_prompter.py
@@ -8,6 +8,7 @@
import jsonschema
import requests
+from pathlib import Path
from json import JSONDecodeError
from jsonschema.exceptions import ValidationError, SchemaError
from datetime import datetime, timedelta
@@ -74,6 +75,16 @@ def get_options(cls, parent_dataset=None, config=None) -> dict:
shared_llm_default = ""
shared_llm_models = {}
+ # Determine if the parent dataset is a media archive (zip with images/video/audio)
+ is_media_parent = False
+ media_type = "media"
+ if parent_dataset:
+ parent_extension = parent_dataset.get_extension()
+ parent_media_type = parent_dataset.get_media_type()
+ if parent_extension == "zip" and parent_media_type in ("image", "video", "audio"):
+ is_media_parent = True
+ media_type = parent_media_type
+
options = {
"ethics_warning1": {
"type": UserInput.OPTION_INFO,
@@ -203,7 +214,37 @@ def get_options(cls, parent_dataset=None, config=None) -> dict:
"default": shared_llm_default,
"requires": "api_or_local==hosted",
},
- "prompt_info": {
+ }
+
+ if is_media_parent:
+ # Media-specific options: show info about media files being attached
+ options["media_info"] = {
+ "type": UserInput.OPTION_INFO,
+ "help": f"📎 Media files attached "
+ f"The parent dataset contains {media_type} files that will be sent "
+ f"to the LLM with each prompt. Make sure to use a model that supports "
+ f"{media_type} input (e.g. vision models for images). "
+ f"Not all models support all media types — if the model cannot process "
+ f"{media_type} files, an error will be returned during processing.",
+ }
+ options["system_prompt"] = {
+ "type": UserInput.OPTION_TEXT_LARGE,
+ "help": "System prompt",
+ "tooltip": "[optional] A system prompt can be used to give the LLM general instructions, for instance "
+ "on the tone of the text. This processor may edit the system prompt to "
+ "ensure correct output. System prompts are included in the results file.",
+ "default": "",
+ }
+ options["prompt"] = {
+ "type": UserInput.OPTION_TEXT_LARGE,
+ "help": "User prompt",
+ "tooltip": f"Describe what the model should do with each {media_type} file. "
+ f"No column brackets needed — {media_type} files are attached automatically.",
+ "default": "",
+ }
+ else:
+ # Text-based dataset options: column brackets, media URL toggle, batching
+ options["prompt_info"] = {
"type": UserInput.OPTION_INFO,
"help": "How to prompt "
"Use `[brackets]` with column names to insert dataset items in the prompt. You "
@@ -216,84 +257,91 @@ def get_options(cls, parent_dataset=None, config=None) -> dict:
"e.g. [Prompt Compass](https://github.com/ErikBorra/PromptCompass/blob/main/prompts.json#L136) "
"or the [Anthropic Prompt Library](https://docs.anthropic.com/en/resources/prompt-library/"
"library).",
- },
- "system_prompt": {
+ }
+ options["system_prompt"] = {
"type": UserInput.OPTION_TEXT_LARGE,
"help": "System prompt",
"tooltip": "[optional] A system prompt can be used to give the LLM general instructions, for instance "
"on the tone of the text. This processor may edit the system prompt to "
"ensure correct output. System prompts are included in the results file.",
"default": "",
- },
- "prompt": {
+ }
+ options["prompt"] = {
"type": UserInput.OPTION_TEXT_LARGE,
"help": "User prompt",
"tooltip": "Use [brackets] with columns names.",
"default": "",
- },
- "use_media": {
+ }
+ options["use_media"] = {
"type": UserInput.OPTION_TOGGLE,
"help": "Add images",
"tooltip": "Add media URLs for multi-modal processing. Requires a model that supports vision.",
"default": False,
- },
- "media_columns": {
+ }
+ options["media_columns"] = {
"type": UserInput.OPTION_TEXT,
"help": "Columns with image URL(s)",
"default": "",
"inline": True,
"tooltip": "Multiple columns can be selected.",
"requires": "use_media==true",
- },
- "structured_output": {
- "type": UserInput.OPTION_TOGGLE,
- "help": "Output structured JSON",
- "tooltip": "Output in a JSON format instead of text. Note that your chosen model may not support "
- "structured output.",
- "default": False,
- },
- "json_schema_info": {
- "type": UserInput.OPTION_INFO,
- "help": "Insert a JSON Schema for structured outputs. These define the output that "
- "the LLM will adhere to. [See instructions and examples on how to write a JSON Schema]"
- "(https://json-schema.org/learn/miscellaneous-examples) and [OpenAI's documentation]"
- "(https://platform.openai.com/docs/guides/structured-outputs?api-mode=chat#supported-schemas).",
- "requires": "structured_output==true",
- },
- "json_schema": {
- "type": UserInput.OPTION_TEXT_LARGE,
- "help": "JSON schema",
- "tooltip": "[required] A JSON schema that the structured output will adhere to",
- "requires": "structured_output==true",
- "default": "",
- },
- "temperature": {
- "type": UserInput.OPTION_TEXT,
- "help": "Temperature",
- "default": 0.1,
- "coerce_type": float,
- "max": 2.0,
- "tooltip": "Temperature indicates how strict the model will gravitate towards the most "
- "probable next token. A score close to 0 returns more predictable "
- "outputs while a score close to 1 leads to more creative outputs. Not supported by all models.",
- },
- "truncate_input": {
+ }
+
+ # Common options for both text and media datasets
+ options["structured_output"] = {
+ "type": UserInput.OPTION_TOGGLE,
+ "help": "Output structured JSON",
+ "tooltip": "Output in a JSON format instead of text. Note that your chosen model may not support "
+ "structured output.",
+ "default": False,
+ }
+ options["json_schema_info"] = {
+ "type": UserInput.OPTION_INFO,
+ "help": "Insert a JSON Schema for structured outputs. These define the output that "
+ "the LLM will adhere to. [See instructions and examples on how to write a JSON Schema]"
+ "(https://json-schema.org/learn/miscellaneous-examples) and [OpenAI's documentation]"
+ "(https://platform.openai.com/docs/guides/structured-outputs?api-mode=chat#supported-schemas).",
+ "requires": "structured_output==true",
+ }
+ options["json_schema"] = {
+ "type": UserInput.OPTION_TEXT_LARGE,
+ "help": "JSON schema",
+ "tooltip": "[required] A JSON schema that the structured output will adhere to",
+ "requires": "structured_output==true",
+ "default": "",
+ }
+ options["temperature"] = {
+ "type": UserInput.OPTION_TEXT,
+ "help": "Temperature",
+ "default": 0.1,
+ "coerce_type": float,
+ "max": 2.0,
+ "tooltip": "Temperature indicates how strict the model will gravitate towards the most "
+ "probable next token. A score close to 0 returns more predictable "
+ "outputs while a score close to 1 leads to more creative outputs. Not supported by all models.",
+ }
+
+ if not is_media_parent:
+ options["truncate_input"] = {
"type": UserInput.OPTION_TEXT,
"help": "Max chars in input value",
"default": 0,
"coerce_type": int,
"tooltip": "This value determines how many characters an inserted dataset value may have. 0 = unlimited.",
"requires": "use_media==false",
- },
- "max_tokens": {
- "type": UserInput.OPTION_TEXT,
- "help": "Max output tokens",
- "default": 10000,
- "coerce_type": int,
- "tooltip": "As a rule of thumb, one token generally corresponds to ~4 characters of "
- "text for common English text. This includes tokens spent for reasoning.",
- },
- "batches": {
+ }
+
+ options["max_tokens"] = {
+ "type": UserInput.OPTION_TEXT,
+ "help": "Max output tokens",
+ "default": 10000,
+ "coerce_type": int,
+ "tooltip": "As a rule of thumb, one token generally corresponds to ~4 characters of "
+ "text for common English text. This includes tokens spent for reasoning.",
+ }
+
+ if not is_media_parent:
+ options["batches"] = {
"type": UserInput.OPTION_TEXT,
"help": "Items per prompt",
"coerce_type": int,
@@ -301,8 +349,8 @@ def get_options(cls, parent_dataset=None, config=None) -> dict:
"tooltip": "How many dataset items to insert into the prompt. These will be inserted as a list "
"wherever the column brackets are used (e.g. '[body]').",
"requires": "use_media==false",
- },
- "batch_info": {
+ }
+ options["batch_info"] = {
"type": UserInput.OPTION_INFO,
"help": "Note on batching: Batching may increase speed but reduce accuracy. Models "
"need to support structured output for batching. This processor uses JSON schemas to ensure "
@@ -310,43 +358,43 @@ def get_options(cls, parent_dataset=None, config=None) -> dict:
"values. Describe the dataset values in plurals in your prompt when batching. If you use "
"multiple column brackets in your prompt, rows with any empty values are skipped.",
"requires": "use_media==false",
- },
- "ethics_warning3": {
- "type": UserInput.OPTION_INFO,
- "requires": "api_or_local==api",
- "help": "When using LLMs through commercial parties, always consider anonymising your data and "
- "whether local open-source LLMs are also an option.",
- },
- "save_annotations": {
- "type": UserInput.OPTION_ANNOTATION,
- "label": "prompt outputs",
- "default": False,
- },
- "hide_think": {
- "type": UserInput.OPTION_TOGGLE,
- "help": "Hide reasoning",
- "default": False,
- "tooltip": "Some models include reasoning in their output, between tags. This option "
- "removes this tag and its contents from the output.",
- },
- "limit": {
- "type": UserInput.OPTION_TEXT,
- "help": "Only annotate this many items, then stop",
- "default": 0,
- "coerce_type": int,
- "min": 0,
- "delegated": True,
- },
- "annotation_label": {
- "type": UserInput.OPTION_TEXT,
- "help": "Label for the annotations to add to the dataset",
- "default": "",
- "delegated": True,
- },
+ }
+
+ options["ethics_warning3"] = {
+ "type": UserInput.OPTION_INFO,
+ "requires": "api_or_local==api",
+ "help": "When using LLMs through commercial parties, always consider anonymising your data and "
+ "whether local open-source LLMs are also an option.",
+ }
+ options["save_annotations"] = {
+ "type": UserInput.OPTION_ANNOTATION,
+ "label": "prompt outputs",
+ "default": False,
+ }
+ options["hide_think"] = {
+ "type": UserInput.OPTION_TOGGLE,
+ "help": "Hide reasoning",
+ "default": False,
+ "tooltip": "Some models include reasoning in their output, between tags. This option "
+ "removes this tag and its contents from the output.",
+ }
+ options["limit"] = {
+ "type": UserInput.OPTION_TEXT,
+ "help": "Only annotate this many items, then stop",
+ "default": 0,
+ "coerce_type": int,
+ "min": 0,
+ "delegated": True,
+ }
+ options["annotation_label"] = {
+ "type": UserInput.OPTION_TEXT,
+ "help": "Label for the annotations to add to the dataset",
+ "default": "",
+ "delegated": True,
}
# Get the media columns for the select media columns option
- if parent_dataset and parent_dataset.get_columns():
+ if not is_media_parent and parent_dataset and parent_dataset.get_columns():
columns = parent_dataset.get_columns()
options["media_columns"]["type"] = UserInput.OPTION_MULTI
options["media_columns"]["options"] = {v: v for v in columns}
@@ -360,7 +408,13 @@ def is_compatible_with(cls, module=None, config=None):
:param module: Module to determine compatibility with
"""
- return module.get_extension() in ["csv", "ndjson"]
+ # Text-based datasets
+ if module.get_extension() in ["csv", "ndjson"]:
+ return True
+ # Media datasets (zip archives with images, video, or audio)
+ if module.get_extension() == "zip" and module.get_media_type() in ("image", "video", "audio"):
+ return True
+ return False
def process(self):
@@ -373,10 +427,19 @@ def process(self):
modal_location = self.parameters.get("api_or_local", "api")
hide_think = self.parameters.get("hide_think", False)
- # Optional media columns for files
- media_columns = self.parameters.get("media_columns", []) if self.parameters.get("use_media") else []
- if type(media_columns) is str:
- media_columns = [media_columns]
+ # Check if the source dataset is a media archive (zip with images/video/audio)
+ is_media_archive = (
+ self.source_dataset.get_extension() == "zip"
+ and self.source_dataset.get_media_type() in ("image", "video", "audio")
+ )
+ media_archive_type = self.source_dataset.get_media_type() if is_media_archive else None
+
+ # Optional media columns for files (only for text-based datasets)
+ media_columns = []
+ if not is_media_archive:
+ media_columns = self.parameters.get("media_columns", []) if self.parameters.get("use_media") else []
+ if type(media_columns) is str:
+ media_columns = [media_columns]
temperature = float(self.parameters.get("temperature", 0.1))
temperature = min(max(temperature, 0), 2)
@@ -389,7 +452,7 @@ def process(self):
# Set value for batch length in prompts
batches = max(1, min(self.parameters.get("batches", 1), self.source_dataset.num_rows))
use_batches = batches > 1
- if media_columns: # no batching for media files
+ if media_columns or is_media_archive: # no batching for media files
use_batches = False
if not use_batches:
self.dataset.delete_parameter("batches")
@@ -470,14 +533,14 @@ def process(self):
# Prompt validation
base_prompt = self.parameters.get("prompt", "")
- if not base_prompt and not (system_prompt_base and media_columns):
+ if not base_prompt and not (system_prompt_base and (media_columns or is_media_archive)):
self.dataset.finish_with_error("You need to insert a valid user prompt")
return
self.dataset.update_status("Prompt: %s" % base_prompt)
# Get column values in prompt. These can be one or multiple, and multiple within a bracket as well.
columns_to_use = re.findall(r"\[.*?]", base_prompt)
- if not columns_to_use and not media_columns:
+ if not columns_to_use and not media_columns and not is_media_archive:
self.dataset.finish_with_error(
"You need to insert column name(s) in the user prompt within brackets (e.g. '[body]' "
"or '[timestamp, author]')"
@@ -577,152 +640,52 @@ def process(self):
time_start = time.time()
with self.dataset.get_results_path().open("w", encoding="utf-8", newline="") as outfile:
- row = 0
- max_processed = min(limit, self.source_dataset.num_rows) if limit else self.source_dataset.num_rows
- for item in self.source_dataset.iterate_items():
- row += 1
+ if is_media_archive:
+ # Media archive processing: iterate over files in the zip
+ self.dataset.update_status(f"Processing {media_archive_type} files from archive")
+ staging_area = self.dataset.get_staging_area()
+ row = 0
+ max_processed = limit if limit else self.source_dataset.num_rows
- if self.interrupted:
- raise ProcessorInterruptedException("Interrupted while generating text through LLMs")
+ for item in self.source_dataset.iterate_items(staging_area=staging_area, immediately_delete=False):
+ row += 1
- # Replace with dataset values
- prompt = base_prompt
+ if self.interrupted:
+ raise ProcessorInterruptedException("Interrupted while generating text through LLMs")
- # Make sure we can match outputs with input IDs
- if "id" in item:
- item_id = item["id"]
- elif "item_id" in item:
- item_id = item["item_id"]
- else:
- item_id = str(i + 1)
+ # Skip metadata and non-media files
+ filename = item["id"] if "id" in item else str(item.get("filename", ""))
+ if filename.startswith(".") or filename.split(".")[-1].lower() in ("json", "log", "txt"):
+ continue
- # Store dataset values in batches. Store just one item when we're not batching.
- item_values = {}
- for column_to_use in columns_to_use:
- if column_to_use not in batched_data:
- batched_data[column_to_use] = []
+ item_id = filename
+ media_file_path = item.file if hasattr(item, "file") else Path(item.get("path", ""))
- try:
- # Columns can be comma-separated within the bracket
- if "," in column_to_use:
- item_value = []
- bracket_cols = [c.strip() for c in column_to_use.split(",")]
- for bracket_col in bracket_cols:
- col_value = str(item[bracket_col]).strip()
- if col_value:
- item_value.append(col_value)
- item_value = ", ".join(item_value)
-
- # Else just get the single item
- else:
- item_value = str(item[column_to_use]).strip()
-
- except KeyError:
- self.dataset.finish_with_error(f"Column(s) '{column_to_use}' not in the parent dataset")
- return
-
- # Skip row if we encounter *any* empty value in *different* brackets in the
- # prompt *when batching*. This is because lists with different length in the prompt cause asymmetry
- # in the input values, and it's though to then output the correct number of values.
- if not item_value and use_batches:
- item_values = {}
- self.dataset.update_status(f"Skipping row {row} because of empty value(s) in {column_to_use}")
- break
- else:
- item_values[column_to_use] = item_value
-
- # Get media URL values; split links on comma.
- media_urls = []
- for media_column in media_columns:
- media_url = item.get(media_column, [])
- if media_url:
- if isinstance(media_url, list):
- media_urls += media_url
- else:
- media_urls += [url.strip() for url in media_url.split(",")]
-
- # Skip with empty items
- empty_items = True if not any(v for v in item_values.values()) and columns_to_use else False
- if (empty_items and not media_urls) or (media_columns and not media_urls):
- if item_values.keys():
- missing_columns = andify(columns_to_use) if len(columns_to_use) > 1 else columns_to_use[0]
- self.dataset.update_status(f"Skipping row {row} because of empty value(s) in {missing_columns}")
- if media_columns and not media_urls:
- missing_media_columns = andify(media_columns) if len(media_columns) > 1 else media_columns[0]
- self.dataset.update_status(f"Skipping row {row} because of empty value(s) in {missing_media_columns}")
- skipped += 1
- # (but not if we've reached the end of the dataset; we want to process the last batch)
- if row != self.source_dataset.num_rows:
+ if not media_file_path or not media_file_path.exists():
+ self.dataset.log(f"Skipping {filename}: file not found")
+ skipped += 1
continue
- # Else add the values to the batch
- else:
- for item_column, item_value in item_values.items():
- if max_input_len > 0:
- item_value = item_value[:max_input_len]
- batched_data[item_column].append(item_value)
- n_batched += 1
- batched_ids.append(item_id) # Also store IDs, so we can match them to the output
-
- i += 1
- if limit and i >= max_processed:
- limit_reached = True
-
- # Generate text when there's something to process and when we've reached 1) the batch length (which can
- # be 1) or 2) the end of the dataset or 3) the custom limit.
- if n_batched and (n_batched % batches == 0 or row == self.source_dataset.num_rows or limit_reached):
- # Insert dataset values into prompt. Insert as list for batched data, else just insert the value.
- for column_to_use in columns_to_use:
- prompt_values = batched_data[column_to_use]
- prompt_values = prompt_values[0] if len(prompt_values) == 1 else f"```{json.dumps(prompt_values)}```"
- prompt = prompt.replace(f"[{column_to_use}]", prompt_values)
-
- # Possibly use a different batch size when we've reached the end of the dataset.
- if row == self.source_dataset.num_rows and use_batches:
- # Get a new JSON schema for a batch of different length at the end of the iteration
- if n_batched != batches and json_schema:
- json_schema = self.get_json_schema_for_batch(n_batched, custom_schema=json_schema_original)
- # `llm` becomes a RunnableSequence when used, so we'll need to reset it here
- llm = LLMAdapter(
- provider=provider,
- model=model,
- api_key=api_key,
- base_url=base_url,
- temperature=temperature,
- max_tokens=max_tokens,
- client_kwargs=client_kwargs
- )
- llm.set_structure(json_schema)
-
- # For batched_output, make sure the exact length of outputs is mentioned in the system prompt
- if use_batches:
- system_prompt = system_prompt_base.replace("{batch_size}", str(n_batched))
- else:
- system_prompt = system_prompt_base
+ prompt = base_prompt
+ system_prompt = system_prompt_base
- batch_str = f" and {n_batched} items batched into the prompt" if use_batches else ""
- self.dataset.update_status(f"Generating text at row {row:,}/"
- f"{max_processed:,} with {model}{batch_str}")
- # Now finally generate some text!
+ self.dataset.update_status(f"Processing {media_archive_type} file {row:,}/{max_processed:,} "
+ f"({filename}) with {model}")
try:
response = llm.generate_text(
- prompt,
+ prompt if prompt else f"Analyze this {media_archive_type} file.",
system_prompt=system_prompt,
temperature=temperature,
- files=media_urls
+ media_files=[media_file_path],
)
-
- # Catch 404 errors with media URLs, we simply skip these
- except requests.exceptions.HTTPError as e:
- if e.response.status_code == 404 and media_urls:
- self.dataset.log(f"Skipping row {row} because of media URL is not reachable, ({e})")
- skipped += 1
- continue
- else:
- self.dataset.finish_with_warning(outputs, f"{e}")
- return
- # Broad exception, but necessary with all the different LLM providers and options...
except Exception as e:
+ error_str = str(e).lower()
+ if "vision" in error_str or "image" in error_str or "multimodal" in error_str or "media" in error_str:
+ self.dataset.finish_with_error(
+ f"The model '{model}' does not appear to support {media_archive_type} input. "
+ f"Please use a model with {media_archive_type} support (e.g. a vision model for images): {e}"
+ )
+ return
self.dataset.finish_with_warning(outputs, f"Not all items processed: {e}")
return
@@ -738,48 +701,25 @@ def process(self):
self.dataset.finish_with_warning(outputs, warning)
return
- # Always parse JSON outputs in the case of batches.
- if use_batches or structured_output:
+ # Parse structured or plain output
+ if structured_output:
if isinstance(response, str):
response = json.loads(response)
-
- # Check whether input/output value lengths match
- if use_batches:
- output = self.parse_batched_response(response)
-
- if len(output) != n_batched:
- self.dataset.update_status(f"Output did not result in {n_batched} item(s).\nInput:\n"
- f"{prompt}\nOutput:\n{response}")
- self.dataset.finish_with_warning(outputs, "Model could not output as many values as the batch. See log "
- "for incorrect output. Try lowering the batch size, "
- "editing the prompt, or using a different model.")
- return
- else:
- output = [response]
-
- # Also validate whether the JSON schema and the output match
+ output = [response]
try:
jsonschema.validate(instance=response, schema=json_schema)
except (ValidationError, SchemaError) as e:
self.dataset.finish_with_error(f"Invalid JSON schema and/or LLM output: `{e}`")
return
-
- # Else we'll just store the output in a list
else:
-
output = response.content
-
if not isinstance(output, list):
output = [output]
- # More cleaning
- # Newer OpenAI models and Magistral return annoying nested dict with 'thinking'/'reasoning and
- # 'text', flatten it
- if len(output) > 0 and isinstance(output[0], dict) and output[0].get("type") in ["thinking",
- "reasoning"]:
- reasoning_string = output[0].get("type") # "thinking" or "reasoning"
+ # Flatten nested thinking/reasoning dicts
+ if len(output) > 0 and isinstance(output[0], dict) and output[0].get("type") in ["thinking", "reasoning"]:
+ reasoning_string = output[0].get("type")
output_flat = {reasoning_string: "", "text": []}
-
for output_part in output:
if output_part.get("type") == reasoning_string:
if reasoning_string in output_part and isinstance(output_part[reasoning_string], list):
@@ -789,21 +729,10 @@ def process(self):
output_flat[reasoning_string] += output_part.get("text", "")
else:
output_flat["text"].append(output_part.get("text", ""))
-
output_flat["text"] = "\n".join(output_flat["text"])
output = [output_flat]
- for n, output_item in enumerate(output):
-
- # Retrieve the input values used
- if use_batches:
- input_value = [v[n] for v in batched_data.values()]
- else:
- input_value = [v[0] for v in batched_data.values()]
-
- time_created = int(time.time())
-
- # remove reasoning if so desired
+ for output_item in output:
if hide_think:
if isinstance(output_item, str):
output_item = re.sub(r".*", "", output_item, flags=re.DOTALL).strip()
@@ -811,59 +740,345 @@ def process(self):
if "thinking" in output_item:
del output_item["thinking"]
+ time_created = int(time.time())
result = {
- "id": batched_ids[n],
+ "id": item_id,
"output": output_item,
- "input_value": input_value,
- "prompt": prompt if not use_batches else base_prompt, # Insert dataset values if not batching
+ "input_value": [filename],
+ "prompt": prompt,
"temperature": temperature,
"max_tokens": max_tokens,
"model": model,
"time_created": datetime.fromtimestamp(time_created).strftime("%Y-%m-%d %H:%M:%S"),
"time_created_utc": time_created,
- "batch_number": n + 1 if use_batches else "",
+ "batch_number": "",
"system_prompt": system_prompt,
}
outfile.write(json.dumps(result) + "\n")
outputs += 1
if save_annotations:
- # Save annotations for every value produced by the LLM, in case of structured output.
- # Else this will just save one string.
if isinstance(output_item, dict):
annotation_output = flatten_dict({model: output_item})
elif self.parameters.get("annotation_label"):
annotation_output = {self.parameters.get("annotation_label"): output_item}
else:
annotation_output = {model + "_output": output_item}
-
for output_key, output_value in annotation_output.items():
- annotation = {
+ annotations.append({
"label": output_key,
- "item_id": batched_ids[n],
+ "item_id": item_id,
"value": remove_nuls(output_value),
"type": "text",
- }
+ })
+
+ i += 1
+ if limit and i >= max_processed:
+ limit_reached = True
- annotations.append(annotation)
+ # Write annotations in batches
+ if (i % 1000 == 0 and annotations) or limit_reached:
+ self.save_annotations(annotations)
+ annotations = []
- # Remove batched data and store what row we've left off
- batched_ids = []
- batched_data = {}
- n_batched = 0
+ self.dataset.update_progress(row / max_processed)
# Rate limits for different providers
if provider == "mistral":
time.sleep(1)
- # Write annotations in batches
- if (i % 1000 == 0 and annotations) or limit_reached:
- self.save_annotations(annotations)
- annotations = []
+ if limit_reached:
+ break
+
+ else:
+ # Text-based dataset processing: original behavior
+ row = 0
+ max_processed = min(limit, self.source_dataset.num_rows) if limit else self.source_dataset.num_rows
+ for item in self.source_dataset.iterate_items():
+ row += 1
+
+ if self.interrupted:
+ raise ProcessorInterruptedException("Interrupted while generating text through LLMs")
+
+ # Replace with dataset values
+ prompt = base_prompt
+
+ # Make sure we can match outputs with input IDs
+ if "id" in item:
+ item_id = item["id"]
+ elif "item_id" in item:
+ item_id = item["item_id"]
+ else:
+ item_id = str(i + 1)
+
+ # Store dataset values in batches. Store just one item when we're not batching.
+ item_values = {}
+ for column_to_use in columns_to_use:
+ if column_to_use not in batched_data:
+ batched_data[column_to_use] = []
+
+ try:
+ # Columns can be comma-separated within the bracket
+ if "," in column_to_use:
+ item_value = []
+ bracket_cols = [c.strip() for c in column_to_use.split(",")]
+ for bracket_col in bracket_cols:
+ col_value = str(item[bracket_col]).strip()
+ if col_value:
+ item_value.append(col_value)
+ item_value = ", ".join(item_value)
+
+ # Else just get the single item
+ else:
+ item_value = str(item[column_to_use]).strip()
+
+ except KeyError:
+ self.dataset.finish_with_error(f"Column(s) '{column_to_use}' not in the parent dataset")
+ return
+
+ # Skip row if we encounter *any* empty value in *different* brackets in the
+ # prompt *when batching*. This is because lists with different length in the prompt cause asymmetry
+ # in the input values, and it's though to then output the correct number of values.
+ if not item_value and use_batches:
+ item_values = {}
+ self.dataset.update_status(f"Skipping row {row} because of empty value(s) in {column_to_use}")
+ break
+ else:
+ item_values[column_to_use] = item_value
+
+ # Get media URL values; split links on comma.
+ media_urls = []
+ for media_column in media_columns:
+ media_url = item.get(media_column, [])
+ if media_url:
+ if isinstance(media_url, list):
+ media_urls += media_url
+ else:
+ media_urls += [url.strip() for url in media_url.split(",")]
+
+ # Skip with empty items
+ empty_items = True if not any(v for v in item_values.values()) and columns_to_use else False
+ if (empty_items and not media_urls) or (media_columns and not media_urls):
+ if item_values.keys():
+ missing_columns = andify(columns_to_use) if len(columns_to_use) > 1 else columns_to_use[0]
+ self.dataset.update_status(f"Skipping row {row} because of empty value(s) in {missing_columns}")
+ if media_columns and not media_urls:
+ missing_media_columns = andify(media_columns) if len(media_columns) > 1 else media_columns[0]
+ self.dataset.update_status(f"Skipping row {row} because of empty value(s) in {missing_media_columns}")
+ skipped += 1
+ # (but not if we've reached the end of the dataset; we want to process the last batch)
+ if row != self.source_dataset.num_rows:
+ continue
+ # Else add the values to the batch
+ else:
+ for item_column, item_value in item_values.items():
+ if max_input_len > 0:
+ item_value = item_value[:max_input_len]
+ batched_data[item_column].append(item_value)
+ n_batched += 1
+ batched_ids.append(item_id) # Also store IDs, so we can match them to the output
+
+ i += 1
+ if limit and i >= max_processed:
+ limit_reached = True
+
+ # Generate text when there's something to process and when we've reached 1) the batch length (which can
+ # be 1) or 2) the end of the dataset or 3) the custom limit.
+ if n_batched and (n_batched % batches == 0 or row == self.source_dataset.num_rows or limit_reached):
+
+ # Insert dataset values into prompt. Insert as list for batched data, else just insert the value.
+ for column_to_use in columns_to_use:
+ prompt_values = batched_data[column_to_use]
+ prompt_values = prompt_values[0] if len(prompt_values) == 1 else f"```{json.dumps(prompt_values)}```"
+ prompt = prompt.replace(f"[{column_to_use}]", prompt_values)
+
+ # Possibly use a different batch size when we've reached the end of the dataset.
+ if row == self.source_dataset.num_rows and use_batches:
+ # Get a new JSON schema for a batch of different length at the end of the iteration
+ if n_batched != batches and json_schema:
+ json_schema = self.get_json_schema_for_batch(n_batched, custom_schema=json_schema_original)
+ # `llm` becomes a RunnableSequence when used, so we'll need to reset it here
+ llm = LLMAdapter(
+ provider=provider,
+ model=model,
+ api_key=api_key,
+ base_url=base_url,
+ temperature=temperature,
+ max_tokens=max_tokens,
+ client_kwargs=client_kwargs
+ )
+ llm.set_structure(json_schema)
+
+ # For batched_output, make sure the exact length of outputs is mentioned in the system prompt
+ if use_batches:
+ system_prompt = system_prompt_base.replace("{batch_size}", str(n_batched))
+ else:
+ system_prompt = system_prompt_base
+
+ batch_str = f" and {n_batched} items batched into the prompt" if use_batches else ""
+ self.dataset.update_status(f"Generating text at row {row:,}/"
+ f"{max_processed:,} with {model}{batch_str}")
+ # Now finally generate some text!
+ try:
+ response = llm.generate_text(
+ prompt,
+ system_prompt=system_prompt,
+ temperature=temperature,
+ files=media_urls
+ )
+
+ # Catch 404 errors with media URLs, we simply skip these
+ except requests.exceptions.HTTPError as e:
+ if e.response.status_code == 404 and media_urls:
+ self.dataset.log(f"Skipping row {row} because of media URL is not reachable, ({e})")
+ skipped += 1
+ continue
+ else:
+ self.dataset.finish_with_warning(outputs, f"{e}")
+ return
+ # Broad exception, but necessary with all the different LLM providers and options...
+ except Exception as e:
+ self.dataset.finish_with_warning(outputs, f"Not all items processed: {e}")
+ return
+
+ # Set model name from the response for more details
+ if hasattr(response, "response_metadata"):
+ model = response.response_metadata.get("model_name", model)
+ if "models/" in model:
+ model = model.replace("models/", "")
+
+ if not response:
+ structured_warning = " with your specified JSON schema" if structured_output else ""
+ warning = f"{model} could not return text{structured_warning}. Consider editing your prompt or changing settings."
+ self.dataset.finish_with_warning(outputs, warning)
+ return
+
+ # Always parse JSON outputs in the case of batches.
+ if use_batches or structured_output:
+ if isinstance(response, str):
+ response = json.loads(response)
+
+ # Check whether input/output value lengths match
+ if use_batches:
+ output = self.parse_batched_response(response)
+
+ if len(output) != n_batched:
+ self.dataset.update_status(f"Output did not result in {n_batched} item(s).\nInput:\n"
+ f"{prompt}\nOutput:\n{response}")
+ self.dataset.finish_with_warning(outputs, "Model could not output as many values as the batch. See log "
+ "for incorrect output. Try lowering the batch size, "
+ "editing the prompt, or using a different model.")
+ return
+ else:
+ output = [response]
+
+ # Also validate whether the JSON schema and the output match
+ try:
+ jsonschema.validate(instance=response, schema=json_schema)
+ except (ValidationError, SchemaError) as e:
+ self.dataset.finish_with_error(f"Invalid JSON schema and/or LLM output: `{e}`")
+ return
- self.dataset.update_progress(row / max_processed)
- if limit_reached:
- break
+ # Else we'll just store the output in a list
+ else:
+
+ output = response.content
+
+ if not isinstance(output, list):
+ output = [output]
+
+ # More cleaning
+ # Newer OpenAI models and Magistral return annoying nested dict with 'thinking'/'reasoning and
+ # 'text', flatten it
+ if len(output) > 0 and isinstance(output[0], dict) and output[0].get("type") in ["thinking",
+ "reasoning"]:
+ reasoning_string = output[0].get("type") # "thinking" or "reasoning"
+ output_flat = {reasoning_string: "", "text": []}
+
+ for output_part in output:
+ if output_part.get("type") == reasoning_string:
+ if reasoning_string in output_part and isinstance(output_part[reasoning_string], list):
+ output_flat[reasoning_string] += "\n".join(
+ [think.get("text", "") for think in output_part.get(reasoning_string, [])])
+ else:
+ output_flat[reasoning_string] += output_part.get("text", "")
+ else:
+ output_flat["text"].append(output_part.get("text", ""))
+
+ output_flat["text"] = "\n".join(output_flat["text"])
+ output = [output_flat]
+
+ for n, output_item in enumerate(output):
+
+ # Retrieve the input values used
+ if use_batches:
+ input_value = [v[n] for v in batched_data.values()]
+ else:
+ input_value = [v[0] for v in batched_data.values()]
+
+ time_created = int(time.time())
+
+ # remove reasoning if so desired
+ if hide_think:
+ if isinstance(output_item, str):
+ output_item = re.sub(r".*", "", output_item, flags=re.DOTALL).strip()
+ elif isinstance(output_item, dict):
+ if "thinking" in output_item:
+ del output_item["thinking"]
+
+ result = {
+ "id": batched_ids[n],
+ "output": output_item,
+ "input_value": input_value,
+ "prompt": prompt if not use_batches else base_prompt, # Insert dataset values if not batching
+ "temperature": temperature,
+ "max_tokens": max_tokens,
+ "model": model,
+ "time_created": datetime.fromtimestamp(time_created).strftime("%Y-%m-%d %H:%M:%S"),
+ "time_created_utc": time_created,
+ "batch_number": n + 1 if use_batches else "",
+ "system_prompt": system_prompt,
+ }
+ outfile.write(json.dumps(result) + "\n")
+ outputs += 1
+
+ if save_annotations:
+ # Save annotations for every value produced by the LLM, in case of structured output.
+ # Else this will just save one string.
+ if isinstance(output_item, dict):
+ annotation_output = flatten_dict({model: output_item})
+ elif self.parameters.get("annotation_label"):
+ annotation_output = {self.parameters.get("annotation_label"): output_item}
+ else:
+ annotation_output = {model + "_output": output_item}
+
+ for output_key, output_value in annotation_output.items():
+ annotation = {
+ "label": output_key,
+ "item_id": batched_ids[n],
+ "value": remove_nuls(output_value),
+ "type": "text",
+ }
+
+ annotations.append(annotation)
+
+ # Remove batched data and store what row we've left off
+ batched_ids = []
+ batched_data = {}
+ n_batched = 0
+
+ # Rate limits for different providers
+ if provider == "mistral":
+ time.sleep(1)
+
+ # Write annotations in batches
+ if (i % 1000 == 0 and annotations) or limit_reached:
+ self.save_annotations(annotations)
+ annotations = []
+
+ self.dataset.update_progress(row / max_processed)
+ if limit_reached:
+ break
outfile.close()
@@ -967,12 +1182,16 @@ def validate_query(query, request, config):
if query["api_or_local"] == "api" and not query.get("api_key"):
raise QueryParametersException("You need to enter an API key when using third-party models.")
- if not query["prompt"].strip():
- raise QueryParametersException("The user prompt cannot be empty.")
+ # For media archive datasets, use_media won't be present in the query
+ is_media_archive = "use_media" not in query
+
+ if not query.get("prompt", "").strip():
+ if not (query.get("system_prompt", "").strip() and is_media_archive):
+ raise QueryParametersException("The user prompt cannot be empty.")
# Get column values in prompt. These can be one or multiple, and multiple within a bracket as well.
- columns_to_use = re.findall(r"\[.*?]", query["prompt"])
- if not columns_to_use and not query["use_media"]:
+ columns_to_use = re.findall(r"\[.*?]", query.get("prompt", ""))
+ if not columns_to_use and not query.get("use_media") and not is_media_archive:
raise QueryParametersException("You need to insert column name(s) in the user prompt within brackets "
"(e.g. '[body]' or '[timestamp, author]')")
From 8782a595fbeb4b260fdedbe6fefc644897c9d9fd Mon Sep 17 00:00:00 2001
From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com>
Date: Mon, 9 Mar 2026 17:26:49 +0000
Subject: [PATCH 03/13] Address code review: fix filename checks, prompt
fallback, MIME type defaults, and error detection comments
Co-authored-by: sal-uva <10960315+sal-uva@users.noreply.github.com>
---
common/lib/llm.py | 8 ++------
processors/machine_learning/llm_prompter.py | 8 +++++---
2 files changed, 7 insertions(+), 9 deletions(-)
diff --git a/common/lib/llm.py b/common/lib/llm.py
index 581b312d3..52c67ac30 100644
--- a/common/lib/llm.py
+++ b/common/lib/llm.py
@@ -196,7 +196,7 @@ def create_multimodal_content(
if not isinstance(url, str):
raise ValueError(f"Media URL must be a string, got {type(url)}")
- mime_type = mimetypes.guess_type(url)[0] or "image/jpeg"
+ mime_type = mimetypes.guess_type(url)[0] or "application/octet-stream"
media_category = mime_type.split("/")[0] # "image", "video", or "audio"
content.append(self._format_media_block(url=url, mime_type=mime_type, media_category=media_category))
@@ -260,11 +260,7 @@ def _format_media_block(
return {"type": "image_url", "image_url": {"url": url}}
else:
data_uri = f"data:{mime_type};base64,{b64_data}"
- if media_category == "image":
- return {"type": "image_url", "image_url": {"url": data_uri}}
- else:
- # Google Gemini supports inline_data for all media types via data URIs
- return {"type": "image_url", "image_url": {"url": data_uri}}
+ return {"type": "image_url", "image_url": {"url": data_uri}}
else:
# OpenAI-style format (OpenAI, Mistral, DeepSeek, Ollama, LM Studio, vLLM)
if url:
diff --git a/processors/machine_learning/llm_prompter.py b/processors/machine_learning/llm_prompter.py
index 651bbcb13..3483c9268 100644
--- a/processors/machine_learning/llm_prompter.py
+++ b/processors/machine_learning/llm_prompter.py
@@ -655,7 +655,7 @@ def process(self):
# Skip metadata and non-media files
filename = item["id"] if "id" in item else str(item.get("filename", ""))
- if filename.startswith(".") or filename.split(".")[-1].lower() in ("json", "log", "txt"):
+ if not filename or filename.startswith(".") or filename.rsplit(".", 1)[-1].lower() in ("json", "log", "txt"):
continue
item_id = filename
@@ -666,19 +666,21 @@ def process(self):
skipped += 1
continue
- prompt = base_prompt
+ prompt = base_prompt if base_prompt else f"Analyze this {media_archive_type} file."
system_prompt = system_prompt_base
self.dataset.update_status(f"Processing {media_archive_type} file {row:,}/{max_processed:,} "
f"({filename}) with {model}")
try:
response = llm.generate_text(
- prompt if prompt else f"Analyze this {media_archive_type} file.",
+ prompt,
system_prompt=system_prompt,
temperature=temperature,
media_files=[media_file_path],
)
except Exception as e:
+ # Best-effort heuristic to detect model incompatibility with media type.
+ # Error messages vary by provider; this catches common patterns.
error_str = str(e).lower()
if "vision" in error_str or "image" in error_str or "multimodal" in error_str or "media" in error_str:
self.dataset.finish_with_error(
From 1857e75fbef3fbb3f16b3d2dffe7b297f7427c9a Mon Sep 17 00:00:00 2001
From: sal-phd-desktop
Date: Mon, 9 Mar 2026 20:25:50 +0100
Subject: [PATCH 04/13] feat: update LLMs.json
---
common/assets/llms.json | 14 +++++++-------
1 file changed, 7 insertions(+), 7 deletions(-)
diff --git a/common/assets/llms.json b/common/assets/llms.json
index 61694aa7a..835dbaa09 100644
--- a/common/assets/llms.json
+++ b/common/assets/llms.json
@@ -10,9 +10,9 @@
"model_card": "",
"provider": ""
},
- "gpt-5.2": {
- "name": "[OpenAI] GPT-5.2",
- "model_card": "https://platform.openai.com/docs/models/gpt-5.2",
+ "gpt-5.4": {
+ "name": "[OpenAI] GPT-5.4",
+ "model_card": "https://platform.openai.com/docs/models/gpt-5.4",
"provider": "openai"
},
"gpt-5-mini": {
@@ -25,9 +25,9 @@
"model_card": "https://platform.openai.com/docs/models/gpt-5-nano",
"provider": "openai"
},
- "gpt-5.2-pro": {
- "name": "[OpenAI] GPT-5.2 Pro",
- "model_card": "https://platform.openai.com/docs/models/gpt-5.2-pro",
+ "gpt-5.4-pro": {
+ "name": "[OpenAI] GPT-5.4 Pro",
+ "model_card": "https://platform.openai.com/docs/models/gpt-5.4-pro",
"provider": "openai"
},
"gpt-4.1-mini": {
@@ -65,7 +65,7 @@
"model_card": "https://cloud.google.com/vertex-ai/generative-ai/docs/models/gemini/3-flash",
"provider": "google"
},
- "gemini-3.1-flash-lite": {
+ "gemini-3.1-flash-lite-preview": {
"name": "[Google] Gemini 3.1 Flash Lite",
"provider": "google",
"model_card": "https://docs.cloud.google.com/vertex-ai/generative-ai/docs/models/gemini/3-1-flash-lite"
From 35c9c97ed23365d1b60360353673cfde99ccbff6 Mon Sep 17 00:00:00 2001
From: sal-phd-desktop
Date: Mon, 9 Mar 2026 20:26:00 +0100
Subject: [PATCH 05/13] fix: styling on explorer
---
.../static/css/explorer-annotation-generic.css | 1 +
webtool/templates/explorer/item-annotations.html | 16 ++++++++--------
2 files changed, 9 insertions(+), 8 deletions(-)
diff --git a/webtool/static/css/explorer-annotation-generic.css b/webtool/static/css/explorer-annotation-generic.css
index 56671d5be..fad8858f0 100644
--- a/webtool/static/css/explorer-annotation-generic.css
+++ b/webtool/static/css/explorer-annotation-generic.css
@@ -34,6 +34,7 @@
word-wrap: break-word;
color: var(--accent);
margin: 0;
+ word-break: break-all;
}
/* Link styling for processor-generated labels */
diff --git a/webtool/templates/explorer/item-annotations.html b/webtool/templates/explorer/item-annotations.html
index b94391de5..957a5e9a7 100644
--- a/webtool/templates/explorer/item-annotations.html
+++ b/webtool/templates/explorer/item-annotations.html
@@ -20,7 +20,7 @@
{% set annotation = an.an %}
{# Show empty values for human-made annotations, not for processor-made ones #}
{% if not (from_dataset and not annotation.value) %}
-
+
{# If generated by a processor, link to the dataset and simply output the text; it can't be edited '#}
{% if from_dataset %}
@@ -32,13 +32,13 @@
{% if from_dataset %}
{{ annotation.value }}
- {% set item = from_datasets[annotation.from_dataset] %}
- {% if item.type in processors %}
- {% set processor_options = processors[item.type].get_options(config=__config) %}
+ {% set from_dataset_item = from_datasets[annotation.from_dataset] %}
+ {% if from_dataset_item.type in processors %}
+ {% set processor_options = processors[from_dataset_item.type].get_options(config=__config) %}
{% endif %}
- {% for option in item.parameters %}
+ {% for option in from_dataset_item.parameters %}
{% if option in processor_options and processor_options[option].type not in ("annotation", "annotations") %}
{% set extra_tooltip_id = annotation.id %}
{% include 'components/result-parameter.html' %}
@@ -66,7 +66,7 @@
{% for option_id, option_label in annotation_fields[field_id]["options"].items() %}
{% set checked = "checked" if option_label in annotation.value else "" %}
-
+
{% endfor %}
@@ -75,8 +75,8 @@
{# Tooltip with metadata on the annotation #}
{% if annotation.author or annotation.author_original or annotation.timestamp or annotation.metadata %}
-
-
+
+
{% if annotation.author_original %}
Created by {% if annotation.by_processor %} processor{% endif %} {% if annotation.author_original %}{{ annotation.author_original }}{% endif %}
{% if annotation.timestamp_created %}
From 815750d213801c8c290e4b6ff7b48adcfef27723 Mon Sep 17 00:00:00 2001
From: sal-phd-desktop
Date: Mon, 9 Mar 2026 21:04:08 +0100
Subject: [PATCH 06/13] fix: add media type to audio extractor
---
processors/audio/audio_extractor.py | 1 +
1 file changed, 1 insertion(+)
diff --git a/processors/audio/audio_extractor.py b/processors/audio/audio_extractor.py
index 8159c8d2d..da105b779 100644
--- a/processors/audio/audio_extractor.py
+++ b/processors/audio/audio_extractor.py
@@ -29,6 +29,7 @@ class AudioExtractor(BasicProcessor):
title = "Extract audio from videos" # title displayed in UI
description = "Create audio files per video" # description displayed in UI
extension = "zip" # extension of result file, used internally and in UI
+ media_type = "audio"
followups = ["audio-to-text"]
From 34d3dbc75d93397daff1ccebefcb58d60f3f65ca Mon Sep 17 00:00:00 2001
From: sal-phd-desktop
Date: Mon, 9 Mar 2026 21:04:48 +0100
Subject: [PATCH 07/13] fix: merge annotations from top-dataset->media
dataset->llm prompt dataset, where multiple media files belong to one post id
---
processors/machine_learning/llm_prompter.py | 85 +++++++++++++++++----
1 file changed, 71 insertions(+), 14 deletions(-)
diff --git a/processors/machine_learning/llm_prompter.py b/processors/machine_learning/llm_prompter.py
index 3483c9268..a3afa9412 100644
--- a/processors/machine_learning/llm_prompter.py
+++ b/processors/machine_learning/llm_prompter.py
@@ -220,11 +220,10 @@ def get_options(cls, parent_dataset=None, config=None) -> dict:
# Media-specific options: show info about media files being attached
options["media_info"] = {
"type": UserInput.OPTION_INFO,
- "help": f"📎 Media files attached "
- f"The parent dataset contains {media_type} files that will be sent "
+ "help": f"The parent dataset contains {media_type} files that will be sent "
f"to the LLM with each prompt. Make sure to use a model that supports "
f"{media_type} input (e.g. vision models for images). "
- f"Not all models support all media types — if the model cannot process "
+ f"Not all models support all media types. If the model cannot process "
f"{media_type} files, an error will be returned during processing.",
}
options["system_prompt"] = {
@@ -511,7 +510,7 @@ def process(self):
if not api_model:
self.dataset.finish_with_error("Select an API model or insert one manually")
return
- # Models can be set manually already
+ # Models can be set manually
if api_model == "custom":
model = self.parameters.get("api_custom_model_id", "")
provider = self.parameters.get("api_custom_model_provider", "")
@@ -593,6 +592,7 @@ def process(self):
# Setup annotation saving
annotations = []
+ media_annotations = {}
save_annotations = self.parameters.get("save_annotations", False)
i = 0
@@ -647,6 +647,35 @@ def process(self):
row = 0
max_processed = limit if limit else self.source_dataset.num_rows
+ # Load metadata to map filenames back to original post IDs for annotations.
+ filename_to_post_ids = {}
+ if save_annotations:
+ try:
+ self.extract_archived_file_by_name(".metadata.json", self.source_file, staging_area)
+ with open(staging_area.joinpath(".metadata.json")) as meta_file:
+ archive_metadata = json.load(meta_file)
+ for url, data in archive_metadata.items():
+ if data.get("success") and data.get("post_ids"):
+ post_ids = [str(pid) for pid in data["post_ids"]]
+ # A single URL may map to one filename or multiple files (e.g. video + thumbnail)
+ filenames_for_url = []
+ if data.get("filename"):
+ filenames_for_url.append(data["filename"])
+ for file_entry in data.get("files", []):
+ if file_entry.get("success") and file_entry.get("filename"):
+ filenames_for_url.append(file_entry["filename"])
+ # Merge post_ids per filename; extend rather than overwrite so that
+ # multiple URLs pointing to the same file don't lose earlier post_ids.
+ for filename in filenames_for_url:
+ existing = filename_to_post_ids.setdefault(filename, [])
+ for post_id in post_ids:
+ if post_id not in existing:
+ existing.append(post_id)
+
+ except (FileNotFoundError, json.JSONDecodeError, KeyError) as e:
+ self.dataset.log(f"Could not load .metadata.json for annotation mapping: {e}. "
+ f"Annotations will use filenames as item IDs.")
+
for item in self.source_dataset.iterate_items(staging_area=staging_area, immediately_delete=False):
row += 1
@@ -669,8 +698,8 @@ def process(self):
prompt = base_prompt if base_prompt else f"Analyze this {media_archive_type} file."
system_prompt = system_prompt_base
- self.dataset.update_status(f"Processing {media_archive_type} file {row:,}/{max_processed:,} "
- f"({filename}) with {model}")
+ self.dataset.update_status(f"Processing {media_archive_type} file {row - 1:,}/{max_processed:,} "
+ f"with {model}")
try:
response = llm.generate_text(
prompt,
@@ -766,20 +795,38 @@ def process(self):
annotation_output = {self.parameters.get("annotation_label"): output_item}
else:
annotation_output = {model + "_output": output_item}
+
+ # Resolve filename to original post IDs from .metadata.json
+ # so annotations are saved against the top-level dataset's item IDs.
+ annotation_item_ids = filename_to_post_ids.get(item_id, [item_id])
+
+ # Accumulate each file's output into a merged annotation per post_id.
+ # Multiple files for the same post are combined into one text annotation,
+ # with each line prefixed by the filename, separated by newlines.
+ file_basename = Path(item_id).name
for output_key, output_value in annotation_output.items():
- annotations.append({
- "label": output_key,
- "item_id": item_id,
- "value": remove_nuls(output_value),
- "type": "text",
- })
+ for annotation_item_id in annotation_item_ids:
+ key = (annotation_item_id, output_key)
+ media_annotations.setdefault(key, []).append(
+ f"{file_basename}: {remove_nuls(output_value)}"
+ )
i += 1
if limit and i >= max_processed:
limit_reached = True
# Write annotations in batches
- if (i % 1000 == 0 and annotations) or limit_reached:
+ if (i % 1000 == 0 and media_annotations) or limit_reached:
+ for (annotation_item_id, label), lines in media_annotations.items():
+ # If the post only has one media file, don't prepend the filename
+ value = lines[0].split(": ", 1)[1] if len(lines) == 1 else "\n".join(lines)
+ annotations.append({
+ "label": label,
+ "item_id": annotation_item_id,
+ "value": value,
+ "type": "text",
+ })
+ media_annotations = {}
self.save_annotations(annotations)
annotations = []
@@ -793,7 +840,7 @@ def process(self):
break
else:
- # Text-based dataset processing: original behavior
+ # Text-based dataset processing (CSV or NDJSON)
row = 0
max_processed = min(limit, self.source_dataset.num_rows) if limit else self.source_dataset.num_rows
for item in self.source_dataset.iterate_items():
@@ -1089,6 +1136,16 @@ def process(self):
return
# Write leftover annotations
+ if media_annotations:
+ for (annotation_item_id, label), lines in media_annotations.items():
+ # If the post only has one media file, don't prepend the filename
+ value = lines[0].split(": ", 1)[1] if len(lines) == 1 else "\n".join(lines)
+ annotations.append({
+ "label": label,
+ "item_id": annotation_item_id,
+ "value": value,
+ "type": "text",
+ })
if annotations:
self.save_annotations(annotations)
From 16acdd05d07647401f179b5653bea6369be04bbb Mon Sep 17 00:00:00 2001
From: sal-phd-desktop
Date: Mon, 9 Mar 2026 21:17:18 +0100
Subject: [PATCH 08/13] fix: remove signature outputs for google responses
---
processors/machine_learning/llm_prompter.py | 11 +++++++++++
1 file changed, 11 insertions(+)
diff --git a/processors/machine_learning/llm_prompter.py b/processors/machine_learning/llm_prompter.py
index a3afa9412..af9b70dca 100644
--- a/processors/machine_learning/llm_prompter.py
+++ b/processors/machine_learning/llm_prompter.py
@@ -805,6 +805,11 @@ def process(self):
# with each line prefixed by the filename, separated by newlines.
file_basename = Path(item_id).name
for output_key, output_value in annotation_output.items():
+
+ # Skip 'signature' and 'type' annotations for Google
+ if provider == "google" and output_key in (".signature", ".type"):
+ continue
+
for annotation_item_id in annotation_item_ids:
key = (annotation_item_id, output_key)
media_annotations.setdefault(key, []).append(
@@ -818,6 +823,7 @@ def process(self):
# Write annotations in batches
if (i % 1000 == 0 and media_annotations) or limit_reached:
for (annotation_item_id, label), lines in media_annotations.items():
+
# If the post only has one media file, don't prepend the filename
value = lines[0].split(": ", 1)[1] if len(lines) == 1 else "\n".join(lines)
annotations.append({
@@ -1102,6 +1108,11 @@ def process(self):
annotation_output = {model + "_output": output_item}
for output_key, output_value in annotation_output.items():
+
+ # Skip 'signature' and 'type' annotations for Google
+ if provider == "google" and output_key in ("extras.signature", ".type"):
+ continue
+
annotation = {
"label": output_key,
"item_id": batched_ids[n],
From f3e379698114d18d4f7a2ae889dc4443c4947b7d Mon Sep 17 00:00:00 2001
From: sal-uva
Date: Thu, 12 Mar 2026 19:48:06 +0100
Subject: [PATCH 09/13] fix: rename .video-metadata.json to metadata.json in
audio extractor, accurately show the processed items
---
processors/audio/audio_extractor.py | 35 ++++++++++++++++++++---------
1 file changed, 25 insertions(+), 10 deletions(-)
diff --git a/processors/audio/audio_extractor.py b/processors/audio/audio_extractor.py
index da105b779..91c68fc7b 100644
--- a/processors/audio/audio_extractor.py
+++ b/processors/audio/audio_extractor.py
@@ -5,6 +5,8 @@
https://ffmpeg.org/
"""
import shutil
+import zipfile
+from pathlib import Path
import oslex
from backend.lib.processor import BasicProcessor
@@ -73,20 +75,31 @@ def process(self):
# Prepare staging areas for videos and video tracking
output_dir = self.dataset.get_staging_area()
- total_possible_videos = max_files if max_files != 0 and max_files < self.source_dataset.num_rows - 1 \
- else self.source_dataset.num_rows
+ # Estimate how many actual video files we will attempt, excluding archive metadata.
+ total_possible_videos = self.source_dataset.num_rows
+ source_archive = self.source_dataset.get_results_path()
+ if source_archive.exists() and source_archive.suffix.lower() == ".zip":
+ with zipfile.ZipFile(source_archive, "r") as archive_file:
+ total_possible_videos = sum(
+ 1
+ for archived_file in archive_file.infolist()
+ if not archived_file.is_dir() and Path(archived_file.filename).name != ".metadata.json"
+ )
+
+ if max_files != 0:
+ total_possible_videos = min(total_possible_videos, max_files)
processed_videos = 0
written = 0
self.dataset.update_status("Extracting video audio")
- for item in self.source_dataset.iterate_items():
+ for item in self.source_dataset.iterate_items(processor=self, get_annotations=False):
if self.interrupted:
raise ProcessorInterruptedException("Interrupted while determining image wall order")
# Check for 4CAT's metadata JSON and copy it
if item.file.name == '.metadata.json':
- shutil.copy(item.file, output_dir.joinpath(".video_metadata.json"))
+ shutil.copy(item.file, output_dir.joinpath(".metadata.json"))
continue
if max_files != 0 and processed_videos >= max_files:
@@ -103,6 +116,9 @@ def process(self):
result = self.run_interruptable_process(command, cleanup_paths=(output_dir,))
+ # Count attempted conversions separately from successful outputs.
+ processed_videos += 1
+
# Capture logs
ffmpeg_output = result.stdout.decode("utf-8")
ffmpeg_error = result.stderr.decode("utf-8")
@@ -124,11 +140,10 @@ def process(self):
error = 'Error Return Code with video %s: %s' % (vid_name, str(result.returncode))
self.dataset.log(error)
- processed_videos += 1
- self.dataset.update_status(f"Extracted audio from {processed_videos} of {total_possible_videos} videos")
- self.dataset.update_progress(processed_videos / total_possible_videos)
+ self.dataset.update_status(f"Extracted audio from {written} of {processed_videos} attempted videos")
+ self.dataset.update_progress(min(1, processed_videos / max(total_possible_videos, 1)))
# Finish up
- warning = f"Extracted {written}/{total_possible_videos} audio files, check the logs for errors." \
- if written < total_possible_videos else None
- self.write_archive_and_finish(output_dir, num_items=processed_videos, warning=warning)
+ warning = f"Extracted {written}/{processed_videos} audio files, check the logs for errors." \
+ if written < processed_videos else None
+ self.write_archive_and_finish(output_dir, num_items=written, warning=warning)
From ab35c7c6c5ed6044996524f9e57789f2e132f230 Mon Sep 17 00:00:00 2001
From: sal-uva
Date: Thu, 12 Mar 2026 19:48:30 +0100
Subject: [PATCH 10/13] fix: truncate URLs in video downloader status update
---
processors/visualisation/download_videos.py | 3 ++-
1 file changed, 2 insertions(+), 1 deletion(-)
diff --git a/processors/visualisation/download_videos.py b/processors/visualisation/download_videos.py
index 0bd9532db..87e128731 100644
--- a/processors/visualisation/download_videos.py
+++ b/processors/visualisation/download_videos.py
@@ -1026,7 +1026,8 @@ def _write_direct_response(self, original_url, response, results_path, max_video
elif not self.config.get("video-downloader.allow-unknown-size", False):
raise FilesizeException("Video size unknown; not allowed to download per 4CAT settings")
- self.dataset.update_status(f"Downloading {self.downloaded_videos + 1}/{self.total_possible_videos} via requests: {original_url}")
+ original_url_str = original_url if len(original_url) < 100 else original_url[:97] + "..."
+ self.dataset.update_status(f"Downloading {self.downloaded_videos + 1}/{self.total_possible_videos} via requests: {original_url_str}")
bytes_written = 0
max_bytes = max_video_size * 1000000 if max_video_size else 0
From cc23cb297f900f1b3e4970a42ab2f40d24150b3c Mon Sep 17 00:00:00 2001
From: sal-uva
Date: Thu, 12 Mar 2026 19:48:47 +0100
Subject: [PATCH 11/13] fix: more robust showing of processor options in
item-annotations.html
---
webtool/templates/explorer/item-annotations.html | 1 +
1 file changed, 1 insertion(+)
diff --git a/webtool/templates/explorer/item-annotations.html b/webtool/templates/explorer/item-annotations.html
index 957a5e9a7..f399e9363 100644
--- a/webtool/templates/explorer/item-annotations.html
+++ b/webtool/templates/explorer/item-annotations.html
@@ -33,6 +33,7 @@
{% if from_dataset %}
{{ annotation.value }}
{% set from_dataset_item = from_datasets[annotation.from_dataset] %}
+ {% set processor_options = {} %}
{% if from_dataset_item.type in processors %}
{% set processor_options = processors[from_dataset_item.type].get_options(config=__config) %}
{% endif %}
From 0ea8db130630bb528c680cf4903f17b517474b3d Mon Sep 17 00:00:00 2001
From: sal-uva
Date: Thu, 12 Mar 2026 19:49:26 +0100
Subject: [PATCH 12/13] fix: video_metadata.json -> metadata.json in speech to
text processor
---
processors/machine_learning/whisper_speech_to_text.py | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
diff --git a/processors/machine_learning/whisper_speech_to_text.py b/processors/machine_learning/whisper_speech_to_text.py
index 87b20a803..90e653ccb 100644
--- a/processors/machine_learning/whisper_speech_to_text.py
+++ b/processors/machine_learning/whisper_speech_to_text.py
@@ -402,8 +402,8 @@ def process(self):
# Load the video metadata if available
video_metadata = None
- if staging_area.joinpath(".video_metadata.json").is_file():
- with open(staging_area.joinpath(".video_metadata.json")) as file:
+ if staging_area.joinpath(".metadata.json").is_file():
+ with open(staging_area.joinpath(".metadata.json")) as file:
video_metadata = json.load(file)
self.dataset.log("Found and loaded video metadata")
From 67adf2780915f1e1f18544e47e8cdceba26a5aa5 Mon Sep 17 00:00:00 2001
From: sal-uva
Date: Thu, 12 Mar 2026 19:49:45 +0100
Subject: [PATCH 13/13] fix: various fixes and optimizations in media
annotation through LLMs
---
common/lib/llm.py | 4 ++--
processors/machine_learning/llm_prompter.py | 13 ++++++++-----
2 files changed, 10 insertions(+), 7 deletions(-)
diff --git a/common/lib/llm.py b/common/lib/llm.py
index 52c67ac30..ccd159a99 100644
--- a/common/lib/llm.py
+++ b/common/lib/llm.py
@@ -129,7 +129,7 @@ def generate_text(
messages: Union[str, List[BaseMessage]],
system_prompt: Optional[str] = None,
temperature: float = 0.1,
- files: Optional[List[Union[str, Path, dict]]] = None,
+ files: Optional[List[str]] = None,
media_files: Optional[List[Union[str, Path]]] = None,
) -> BaseMessage:
"""
@@ -196,7 +196,7 @@ def create_multimodal_content(
if not isinstance(url, str):
raise ValueError(f"Media URL must be a string, got {type(url)}")
- mime_type = mimetypes.guess_type(url)[0] or "application/octet-stream"
+ mime_type = mimetypes.guess_type(url.split("?")[0])[0] or "application/octet-stream"
media_category = mime_type.split("/")[0] # "image", "video", or "audio"
content.append(self._format_media_block(url=url, mime_type=mime_type, media_category=media_category))
diff --git a/processors/machine_learning/llm_prompter.py b/processors/machine_learning/llm_prompter.py
index af9b70dca..3fa7bd15a 100644
--- a/processors/machine_learning/llm_prompter.py
+++ b/processors/machine_learning/llm_prompter.py
@@ -645,7 +645,7 @@ def process(self):
self.dataset.update_status(f"Processing {media_archive_type} files from archive")
staging_area = self.dataset.get_staging_area()
row = 0
- max_processed = limit if limit else self.source_dataset.num_rows
+ max_processed = min(limit, self.source_dataset.num_rows) if limit else self.source_dataset.num_rows
# Load metadata to map filenames back to original post IDs for annotations.
filename_to_post_ids = {}
@@ -676,8 +676,7 @@ def process(self):
self.dataset.log(f"Could not load .metadata.json for annotation mapping: {e}. "
f"Annotations will use filenames as item IDs.")
- for item in self.source_dataset.iterate_items(staging_area=staging_area, immediately_delete=False):
- row += 1
+ for item in self.source_dataset.iterate_items(staging_area=staging_area, immediately_delete=True, get_annotations=False):
if self.interrupted:
raise ProcessorInterruptedException("Interrupted while generating text through LLMs")
@@ -686,6 +685,7 @@ def process(self):
filename = item["id"] if "id" in item else str(item.get("filename", ""))
if not filename or filename.startswith(".") or filename.rsplit(".", 1)[-1].lower() in ("json", "log", "txt"):
continue
+ row += 1
item_id = filename
media_file_path = item.file if hasattr(item, "file") else Path(item.get("path", ""))
@@ -698,7 +698,7 @@ def process(self):
prompt = base_prompt if base_prompt else f"Analyze this {media_archive_type} file."
system_prompt = system_prompt_base
- self.dataset.update_status(f"Processing {media_archive_type} file {row - 1:,}/{max_processed:,} "
+ self.dataset.update_status(f"Processing {media_archive_type} file {row:,}/{max_processed:,} "
f"with {model}")
try:
response = llm.generate_text(
@@ -807,7 +807,10 @@ def process(self):
for output_key, output_value in annotation_output.items():
# Skip 'signature' and 'type' annotations for Google
- if provider == "google" and output_key in (".signature", ".type"):
+ if provider == "google" and (
+ output_key.endswith(".signature")
+ or output_key.endswith(".type")
+ ):
continue
for annotation_item_id in annotation_item_ids: