Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,14 @@ Each line in `conversations.jsonl` is one session:
"start_time": "2025-06-15T10:00:00+00:00",
"end_time": "2025-06-15T10:30:00+00:00",
"messages": [
{"role": "user", "content": "Fix the login bug", "timestamp": "..."},
{
"role": "user",
"content": "Fix the login bug",
"content_parts": [
{"type": "image", "source": {"type": "base64", "media_type": "image/png", "data": "..."}}
],
"timestamp": "..."
},
{
"role": "assistant",
"content": "I'll investigate the login flow.",
Expand All @@ -201,6 +208,8 @@ Each line in `conversations.jsonl` is one session:
}
```

`messages[].content_parts` is optional and preserves structured user content such as attachments when the source provides them. The canonical human-readable user text remains in `messages[].content`.

`tool_uses[].output.raw` is optional and preserves extra structured tool-result fields when the source provides them. The canonical human-readable result text remains in `tool_uses[].output.text`.

Each HF repo also includes a `metadata.json` with aggregate stats.
Expand Down
22 changes: 22 additions & 0 deletions dataclaw/_cli/exporting.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
"""Export and publish helpers for the DataClaw CLI."""

import hashlib
import json as std_json
import sys
import urllib.error
import urllib.request
Expand All @@ -12,6 +14,17 @@
from .common import HF_TAG, REPO_URL, SKILL_URL, _format_token_count, _provider_dataset_tags


def _gemini_dedupe_fingerprint(session: dict, source: str) -> str | None:
if source != "gemini":
return None

canonical = dict(session)
canonical["source"] = source
canonical.pop("project", None)
payload = std_json.dumps(canonical, sort_keys=True, separators=(",", ":"))
return hashlib.sha256(payload.encode()).hexdigest()


def export_to_jsonl(
selected_projects: list[dict],
output_path: Path,
Expand All @@ -28,6 +41,7 @@ def export_to_jsonl(
total_input_tokens = 0
total_output_tokens = 0
project_names = []
seen_fingerprints: set[str] = set()

try:
fh = open(output_path, "wb")
Expand All @@ -46,14 +60,22 @@ def export_to_jsonl(
)
proj_count = 0
for session in sessions:
source = session.get("source") or project.get("source", default_source)
model = session.get("model")
if not model or model == "<synthetic>":
skipped += 1
continue

fingerprint = _gemini_dedupe_fingerprint(session, source)
if fingerprint is not None and fingerprint in seen_fingerprints:
continue

session, n_redacted = redact_session(session, custom_strings=custom_strings)
total_redactions += n_redacted

if fingerprint is not None:
seen_fingerprints.add(fingerprint)

f.write(json.dumps_bytes(session))
f.write(b"\n")
total += 1
Expand Down
174 changes: 157 additions & 17 deletions dataclaw/parsers/gemini.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,21 @@
import hashlib
import logging
import os
from collections import defaultdict, deque
from pathlib import Path
from typing import Any, Callable

from .. import _json as json
from ..anonymizer import Anonymizer
from .common import collect_project_sessions, make_session_result, make_stats, update_time_bounds
from ..secrets import should_skip_large_binary_string
from .common import (
anonymize_value,
collect_project_sessions,
make_session_result,
make_stats,
parse_tool_input,
update_time_bounds,
)

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -282,6 +291,145 @@ def parse_tool_call(tool_call: dict, anonymizer: Anonymizer) -> dict:
return {"tool": name, "input": inp, "output": out, "status": status}


def anonymize_text_preserving_blobs(
text: Any,
anonymizer: Anonymizer,
*,
strip: bool = False,
drop_empty: bool = True,
) -> str | None:
if not isinstance(text, str):
return None
if should_skip_large_binary_string(text):
return text
normalized = text.strip() if strip else text
if drop_empty and not normalized.strip():
return None
return anonymizer.text(normalized)


def build_gemini_call_id(name: str, args: Any, counters: dict[str, int]) -> str:
counters[name] += 1
return f"fc_{name}_{counters[name]}"


def anonymize_file_uri(file_uri: Any, anonymizer: Anonymizer) -> str | None:
if not isinstance(file_uri, str):
return None
if file_uri.startswith("file://"):
return f"file://{anonymizer.path(file_uri[7:])}"
return anonymizer.text(file_uri)


def parse_gemini_user_part(
part: Any,
anonymizer: Anonymizer,
pending_call_ids: dict[str, deque[str]],
call_counters: dict[str, int],
) -> tuple[str | None, dict[str, Any] | None]:
if isinstance(part, str):
text = anonymize_text_preserving_blobs(part, anonymizer, drop_empty=False)
if text is None:
return None, None
if should_skip_large_binary_string(part):
return None, {"type": "text", "text": text}
return text, None

if not isinstance(part, dict):
return None, None

if "text" in part:
text = anonymize_text_preserving_blobs(part.get("text"), anonymizer, drop_empty=False)
if text is None:
return None, None
if should_skip_large_binary_string(part.get("text", "")):
return None, {"type": "text", "text": text}
return text, None

inline = part.get("inlineData")
if isinstance(inline, dict):
mime_type = inline.get("mimeType", "")
return None, {
"type": "image" if isinstance(mime_type, str) and mime_type.startswith("image/") else "document",
"source": {
"type": "base64",
"media_type": mime_type,
"data": inline.get("data", ""),
},
}

file_data = part.get("fileData")
if isinstance(file_data, dict):
source: dict[str, Any] = {"type": "url"}
url = anonymize_file_uri(file_data.get("fileUri"), anonymizer)
if url:
source["url"] = url
mime_type = file_data.get("mimeType")
if mime_type:
source["media_type"] = mime_type
return None, {"type": "document", "source": source}

function_call = part.get("functionCall")
if isinstance(function_call, dict):
name = function_call.get("name", "unknown")
args = function_call.get("args", {})
call_id = function_call.get("id") or build_gemini_call_id(name, args, call_counters)
pending_call_ids[name].append(call_id)
return None, {
"type": "tool_use",
"id": call_id,
"name": name,
"input": parse_tool_input(name, args, anonymizer),
}

function_response = part.get("functionResponse")
if isinstance(function_response, dict):
name = function_response.get("name", "unknown")
tool_use_id = function_response.get("id") or (
pending_call_ids[name].popleft() if pending_call_ids.get(name) else f"fc_{name}"
)
response = function_response.get("response")
content: Any = None
if isinstance(response, dict) and "output" in response:
content = anonymize_text_preserving_blobs(response.get("output"), anonymizer)
elif response is not None:
content = anonymize_value("response", response, anonymizer)
part_result: dict[str, Any] = {"type": "tool_result", "tool_use_id": tool_use_id}
if content not in (None, "", [], {}):
part_result["content"] = content
return None, part_result

return None, None


def parse_gemini_user_content(content: Any, anonymizer: Anonymizer) -> tuple[str | None, list[dict[str, Any]]]:
if isinstance(content, str):
text = anonymize_text_preserving_blobs(content, anonymizer, drop_empty=False)
if text is None:
return None, []
if should_skip_large_binary_string(content):
return None, [{"type": "text", "text": text}]
return text, []

if not isinstance(content, list):
return None, []

text_parts: list[str] = []
content_parts: list[dict[str, Any]] = []
pending_call_ids: dict[str, deque[str]] = defaultdict(deque)
call_counters: dict[str, int] = defaultdict(int)

for part in content:
text, content_part = parse_gemini_user_part(part, anonymizer, pending_call_ids, call_counters)
if text is not None:
text_parts.append(text)
if content_part:
content_parts.append(content_part)

text_content = "\n".join(text_parts) if text_parts else None
return text_content, content_parts


def parse_session_file(
filepath: Path,
anonymizer: Anonymizer,
Expand Down Expand Up @@ -313,23 +461,15 @@ def parse_session_file(
timestamp = msg_data.get("timestamp")

if msg_type == "user":
content = msg_data.get("content")
if isinstance(content, list):
text_parts = [part.get("text", "") for part in content if isinstance(part, dict) and "text" in part]
text = "\n".join(text_parts)
elif isinstance(content, str):
text = content
else:
continue
if not text.strip():
text, content_parts = parse_gemini_user_content(msg_data.get("content"), anonymizer)
if text is None and not content_parts:
continue
messages.append(
{
"role": "user",
"content": anonymizer.text(text.strip()),
"timestamp": timestamp,
}
)
message: dict[str, Any] = {"role": "user", "timestamp": timestamp}
if text is not None:
message["content"] = text
if content_parts:
message["content_parts"] = content_parts
messages.append(message)
stats["user_messages"] += 1
update_time_bounds(metadata, timestamp)

Expand Down
3 changes: 3 additions & 0 deletions dataclaw/secrets.py
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,9 @@ def redact_session(session: dict, custom_strings: list[str] | None = None) -> tu
if custom_strings:
msg[field], count = redact_custom_strings(msg[field], custom_strings)
total += count
if msg.get("content_parts"):
msg["content_parts"], count = _redact_value(msg["content_parts"], custom_strings)
total += count
for tool_use in msg.get("tool_uses", []):
for field in ("input", "output"):
if tool_use.get(field):
Expand Down
Loading
Loading