-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtransform_chat_data.py
More file actions
177 lines (146 loc) · 5.92 KB
/
transform_chat_data.py
File metadata and controls
177 lines (146 loc) · 5.92 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
import asyncio
import base64
import json
import os
from io import BytesIO
from typing import Dict, List
from typing import Any
from chat_data_transform_utils.batch_api_utils import chunk_batch_lines, write_batch_jsonl_file, create_and_submit_batch, poll_batches_until_done, process_completed_batches
from chat_data_transform_utils.system_prompt import system_prompt
from pdf2image import convert_from_path
from prisma import Prisma, Base64
from prisma.types import (
DocumentUpsertInput,
)
from chat_data_transform_utils.build_batch_line import build_batch_line
from prisma_utils.prisma_utils import get_prisma_db, disconnect_db
JsonDict = Dict[str, Any]
InputDataType = Dict[str, Dict[str, Any]]
"""
Example shape:
{
"doc_id_1": {
"messages": ["...","..."],
"pdf_filepath": "/path/to/cv.pdf"
},
"doc_id_2": {
"messages": ["..."],
"pdf_filepath": "/path/to/another.pdf"
}
}
"""
def load_input_data(file_path: str) -> InputDataType:
"""
Load the JSON from `file_path` and return the dict structure.
"""
with open(file_path, "r", encoding="utf-8") as f:
data = json.load(f)
return data
async def process_document(
db: Prisma,
doc_id: str,
doc_content: Dict[str, Any]
) -> str:
"""
Convert PDF to PNG (if needed), store as DocumentBlob in DB,
upsert a Document record, and return the base64 string for
embedding in the prompt.
"""
pdf_filepath = doc_content.get("pdf_filepath", "")
png_bytes_b64 = ""
# If the file is a PDF, convert first page -> PNG, store bytes in DB
if pdf_filepath.lower().endswith(".pdf"):
images = convert_from_path(pdf_filepath, dpi=200)
if images:
# Convert first page to PNG in memory
buffer = BytesIO()
images[0].save(buffer, format="PNG")
buffer.seek(0)
png_bytes = buffer.read()
# Base64-encode for the model prompt
base64_bytes = base64.b64encode(png_bytes)
png_bytes_b64 = Base64(base64_bytes)
# Upsert Document
upsert_data: DocumentUpsertInput = {
"create": {
"id": doc_id,
"mime_type": "application/pdf",
"documentBlob": png_bytes_b64,
},
"update": {}
}
upserted_data = await db.document.upsert(
where={"id": doc_id},
data=upsert_data
)
print(f"Upserted a Document record with ID: {upserted_data.id}")
return png_bytes_b64
async def main():
"""
Translated the messages to english
1) Connect to DB.
2) Load input data (doc_id -> { messages, pdf_filepath }).
3) For each doc, create PNG (if PDF), store in DB as base64, build
prompt line -> collect all in 'all_batch_lines'.
4) Split those lines into multiple .jsonl chunk files if they exceed ~200MB. TODO: there was a problem when using GPT-4, because of the number of tokens in the batch
5) Create a separate OpenAI batch for each chunk file.
6) Poll all batch jobs until done.
7) For each completed batch, retrieve & process output (store in DB).
8) Disconnect from DB.
"""
# 7.1 Connect DB & load environment variables
db = await get_prisma_db()
# We assume this JSON file contains the "doc_id -> {...}" mapping
# input_json_path = "../data/json_files/pdf_children_texts_W4uy6_min650.json"
input_json_path = "../data/json_files/pdf_children_texts_W4uy6_min650.json"
# 7.2 Read input data
data: InputDataType = load_input_data(input_json_path)
# 7.3 For each document, process PDF->PNG, store in DB, build user content
all_batch_lines = []
model_name = "gpt-4o-mini"
for doc_id, doc_content in data.items():
# 7.3.1 Convert PDF -> PNG, store DocumentBlob/Document
png_b64 = await process_document(db, doc_id, doc_content)
# 7.3.2 Merge user messages into single string
messages_list = doc_content.get("messages", [])
review_conversation = "\n".join(str(m) for m in messages_list)
# 7.3.3 Build one .jsonl line for the OpenAI batch
line = build_batch_line(
doc_id=doc_id,
review_conversation=review_conversation,
png_bytes_b64=png_b64,
system_prompt=system_prompt,
model=model_name
)
all_batch_lines.append(line)
## TODO: each batch should be no more than 90k tokens for gpt-4o.
# 7.3.4 Split the lines into multiple chunked lists if the size is too large
chunked_batches = chunk_batch_lines(all_batch_lines, max_batch_file_size_bytes=50 * 1024 * 1024)
batch_dir = '../data/batches'
os.makedirs(batch_dir, exist_ok=True)
# 7.4 Write each chunk to its own .jsonl file, then create & track the batch
batch_ids: List[str] = []
for i, chunk_lines in enumerate(chunked_batches, start=1):
batch_input_file = os.path.join(batch_dir, f"batchinput_{i}.jsonl")
write_batch_jsonl_file(chunk_lines, batch_input_file)
batch_id = create_and_submit_batch(batch_input_file)
batch_ids.append(batch_id)
# 7.5 Poll all batches until done
final_batches = poll_batches_until_done(batch_ids, sleep_seconds=10)
# 7.6 Process results from completed batches
api_call_result_dir = '../data/api_call_results'
await process_completed_batches(db, final_batches, api_call_result_dir)
# 7.7 Disconnect
await disconnect_db(db)
async def after_batches_completed(batches_ids: List[str]):
"""
After all batches are completed, process the output files.
"""
db = await get_prisma_db()
completed_batches = poll_batches_until_done(batches_ids, sleep_seconds=10)
api_call_result_dir = '../data/api_call_results'
await process_completed_batches(db, completed_batches, api_call_result_dir)
# 7.7 Disconnect
await disconnect_db(db)
if __name__ == "__main__":
asyncio.run(main())