From 93342614e2eaff6fc33070b72a20a9a64dae6038 Mon Sep 17 00:00:00 2001 From: lewtun Date: Mon, 22 Dec 2025 07:59:34 +0100 Subject: [PATCH] Remove 'file_path' from document metadata MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This PR removes a spurious `file_path` from document metadata. When resuming inference from checkpoints, documents get an extra `file_path` metadata field that wasn't present in the original data. This causes schema mismatches when loading the final dataset, since some parquet files have the `file_path` column and others don't. Error example: ``` CastError: Couldn't cast text: string id: string solution: string schema_0: list> child 0, element: struct child 0, desc: string child 1, points: int64 child 2, title: string raw_responses: list child 0, element: string dataset: string rollout_results: list>> child 0, element: struct> child 0, finish_reason: string child 1, text: string child 2, usage: struct child 0, completion_tokens: int64 child 1, prompt_tokens: int64 child 2, prompt_tokens_details: null child 3, total_tokens: int64 file_path: string to {'text': Value('string'), 'id': Value('string'), 'solution': Value('string'), 'schema_0': List({'desc': Value('string'), 'points': Value('int64'), 'title': Value('string')}), 'raw_responses': List(Value('string')), 'dataset': Value('string'), 'rollout_results': List({'finish_reason': Value('string'), 'text': Value('string'), 'usage': {'completion_tokens': Value('int64'), 'prompt_tokens': Value('int64'), 'prompt_tokens_details': Value('null'), 'total_tokens': Value('int64')}})} because column names don't match ``` This metadata arises because `JsonlReader` extends `BaseDiskReader`, which automatically injects `file_path` into document metadata: ``` # base.py line 163 document.metadata.setdefault("file_path", self.data_folder.resolve_paths(source_file)) ``` This means: * Documents processed fresh → no `file_path` * Documents restored from checkpoint → `file_path` added --- src/datatrove/pipeline/inference/checkpointing.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/datatrove/pipeline/inference/checkpointing.py b/src/datatrove/pipeline/inference/checkpointing.py index 2160bdee..60ce9286 100644 --- a/src/datatrove/pipeline/inference/checkpointing.py +++ b/src/datatrove/pipeline/inference/checkpointing.py @@ -292,6 +292,7 @@ async def parse_existing_checkpoints(self, rank: int, output_writer_context: Dis # not strictly needed but just to be safe for the future async with self.file_locks[chunk_index]: for document in reader.read_file(filename): + document.metadata.pop("file_path", None) # Remove any injected file_path if "__no_rollouts_remove" not in document.metadata: output_writer_context.write(document, rank=rank, chunk_index=chunk_index) all_ids.add(document.id)